01 Global Distribution
Jupyter notebook from the Global Biogeography of Environmental Bacterial Metal Resistance and Spatial Sampling Gaps project.
In [1]:
# Spark Session Setup
import sys
import pandas as pd
from pyspark.sql import functions as F
# Hardcoded Spark environment – requires internal cluster access
try:
spark
except NameError:
sys.path.append('/opt/conda/lib/python3.13/site-packages')
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
print("Spark session ready.")
Spark session ready.
In [2]:
# Extract High-Quality MAGs and Metal Resistance Genes (Including sample_accession)
print("Extracting strain-level MAG metal diversity from MGnify...")
# 1. Load high-quality genomes (Completeness > 90%, Contamination < 5%)
genomes = spark.table("kescience_mgnify.genome").filter(
(F.col("completeness") >= 90.0) &
(F.col("contamination") <= 5.0)
)
# 2. Extract METAL resistance genes from the AMR table
metal_amr = spark.table("kescience_mgnify.gene_amr").filter(
F.col("element_subtype") == "METAL"
)
# 3. Calculate Metal Diversity (distinct metal classes per MAG)
mag_metal_diversity = metal_amr.groupBy("genome_id").agg(
F.countDistinct("amr_class").alias("n_metal_types"),
F.count("gene_id").alias("total_metal_genes")
)
# 4. Join Genomes with Metal Diversity and Biome information
mag_master = genomes.join(
mag_metal_diversity, on="genome_id", how="left"
).fillna({"n_metal_types": 0, "total_metal_genes": 0})
biomes = spark.table("kescience_mgnify.biome").select("biome_id", "biome_name", "biome_lineage")
mag_master = mag_master.join(biomes, on="biome_id", how="inner")
# 5. Select final columns – INCLUDING sample_accession
final_mag_df = mag_master.select(
"genome_id",
"sample_accession", # <--- ADDED
"lineage",
"biome_name",
"biome_lineage",
"length",
"gc_content",
"n_metal_types",
"total_metal_genes"
)
# Convert to Pandas
df_mags_local = final_mag_df.toPandas()
print(f"Successfully extracted {len(df_mags_local)} high-quality MAGs.")
Extracting strain-level MAG metal diversity from MGnify... Successfully extracted 260652 high-quality MAGs.
In [4]:
# Split GTDB lineage string into separate taxonomic columns
tax_levels = df_mags_local['lineage'].str.split(';', expand=True)
tax_levels.columns = ['domain', 'phylum', 'class', 'order', 'family', 'genus', 'species']
# Clean prefixes (e.g., 'p__Firmicutes' -> 'Firmicutes')
for col in tax_levels.columns:
tax_levels[col] = tax_levels[col].str.replace(r'^[a-z]__', '', regex=True)
# Join back to main dataframe
df_mags_local = pd.concat([df_mags_local, tax_levels], axis=1)
mags = pd.read_csv('data/mgnify_mag_metal_traits.csv')
# Filter out host‑associated MAGs
env_mags = mags[~mags['biome_lineage'].str.contains('Host-associated', na=False, case=False)].copy()
print(f"Environmental MAGs remaining: {len(env_mags)}")
Environmental MAGs remaining: 30497
In [5]:
# Fetch Sample Coordinates from ENA API
import time
import requests
from tqdm.auto import tqdm
print("Preparing unique accessions...")
unique_accessions = env_mags['sample_accession'].dropna().unique().tolist()
n_samples = len(unique_accessions)
ena_spatial_data = []
batch_size = 500
with tqdm(total=n_samples, desc="Retrieving ENA Metadata") as pbar:
for i in range(0, n_samples, batch_size):
batch = unique_accessions[i:i+batch_size]
query_str = " OR ".join([f'accession="{acc}"' for acc in batch])
payload = {
"result": "sample",
"query": query_str,
"fields": "accession,location,lat,lon,country,environment_biome",
"format": "json",
"limit": 0
}
try:
response = requests.post("https://www.ebi.ac.uk/ena/portal/api/search", data=payload)
if response.status_code == 200 and response.text.strip():
for rec in response.json():
ena_spatial_data.append({
"sample_accession": rec.get("accession"),
"lat_raw": rec.get("lat"),
"lon_raw": rec.get("lon"),
"location": rec.get("location"),
"country": rec.get("country"),
"biome": rec.get("environment_biome")
})
pbar.update(len(batch))
time.sleep(0.1)
except Exception as e:
pbar.write(f"Batch error at index {i}: {e}")
continue
df_ena_geo = pd.DataFrame(ena_spatial_data)
df_ena_geo_cleaned = df_ena_geo.dropna(subset=['lat_raw', 'lon_raw', 'location'], how='all')
print(f"Total records: {len(df_ena_geo)}; with spatial info: {len(df_ena_geo_cleaned)}")
Preparing unique accessions...
Retrieving ENA Metadata: 0%| | 0/25104 [00:00<?, ?it/s]
Total records: 24511; with spatial info: 24511
In [6]:
# Clean Coordinates
import re
def clean_coordinate(coord):
if pd.isna(coord) or coord == "":
return None
coord = str(coord).upper().strip()
multiplier = -1 if ('S' in coord or 'W' in coord) else 1
match = re.search(r"([-+]?\d*\.\d+|\d+)", coord)
if match:
try:
return float(match.group(1)) * multiplier
except ValueError:
return None
return None
df_ena_geo['lat'] = df_ena_geo['lat_raw'].apply(clean_coordinate)
df_ena_geo['lon'] = df_ena_geo['lon_raw'].apply(clean_coordinate)
df_final_geo = df_ena_geo[['sample_accession', 'lat', 'lon']].dropna()
df_final_geo.to_csv('data/ena_sample_coordinates.csv', index=False)
print(f"Saved {len(df_final_geo)} coordinate pairs.")
# Merge Traits with Coordinates
coords = pd.read_csv('data/ena_sample_coordinates.csv')
# env_mags already contains sample_accession
mags_geo = pd.merge(env_mags, coords, on='sample_accession', how='inner')
print(f"Merged dataset: {len(mags_geo)} MAGs with coordinates.")
Saved 16964 coordinate pairs. Merged dataset: 22356 MAGs with coordinates.
In [1]:
# Global Map Emphasizing High‑Diversity MAGs
import geopandas as gpd
from shapely.geometry import Point
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
# Convert to GeoDataFrame
geometry = [Point(xy) for xy in zip(mags_geo['lon'], mags_geo['lat'])]
gdf = gpd.GeoDataFrame(mags_geo, geometry=geometry, crs="EPSG:4326")
# Load world base map
world_url = "https://raw.githubusercontent.com/nvkelso/natural-earth-vector/master/geojson/ne_110m_admin_0_countries.geojson"
world = gpd.read_file(world_url)
# Split into zero‑resistance and armored MAGs
gdf_0 = gdf[gdf['n_metal_types'] == 0]
gdf_plus = gdf[gdf['n_metal_types'] > 0]
fig, ax = plt.subplots(figsize=(18, 12))
world.plot(ax=ax, color='#f8f9fa', edgecolor='#dee2e6', zorder=0)
# Zero resistance: subtle gray
gdf_0.plot(ax=ax, color='#adb5bd', markersize=3, alpha=0.2, zorder=1)
# Armored MAGs: scaled by diversity, with distinct colormap
sc = ax.scatter(gdf_plus['lon'], gdf_plus['lat'],
c=gdf_plus['n_metal_types'], s=gdf_plus['n_metal_types'] * 40,
cmap='plasma', alpha=0.8, edgecolor='white', linewidth=0.3,
zorder=2)
plt.title("Global Distribution of Heavy Metal Resistance (0 vs 1+ Types)", fontsize=20)
grey_patch = mpatches.Patch(color='#adb5bd', label='0 Metal Resistance Types')
cb = plt.colorbar(sc, ax=ax, orientation='horizontal', pad=0.05, shrink=0.4)
cb.set_label("Number of Metal Resistance Types", fontsize=12)
plt.legend(handles=[grey_patch], loc='lower left')
ax.set_ylim(-60, 85)
plt.axis('off')
plt.tight_layout()
plt.show()
print(f"Stats: {len(gdf_0)} MAGs with 0 types, {len(gdf_plus)} with ≥1 type.")
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[1], line 9 5 import matplotlib.pyplot as plt 6 import matplotlib.patches as mpatches 7 8 # Convert to GeoDataFrame ----> 9 geometry = [Point(xy) for xy in zip(mags_geo['lon'], mags_geo['lat'])] 10 gdf = gpd.GeoDataFrame(mags_geo, geometry=geometry, crs="EPSG:4326") 11 12 # Load world base map NameError: name 'mags_geo' is not defined
In [ ]: