01 Cohort Assembly
Jupyter notebook from the Subsurface Bacillota_B Specialization project.
NB01 — Bacillota_B Universe + Cohort Assembly + IR PFAM Availability Gate¶
Goals:
- Pull all 334 Bacillota_B genomes from BERDL pangenome with CheckM QC + isolation_source.
- Identify deep-clay anchor (Mont Terri Opalinus + Coalvale + BacDive clay-strain expansion).
- Identify phylum-matched soil-baseline (Bacillota_B genomes in soil/sediment biosamples, clay excluded).
- Confirm PF02085 (Cytochrom_CIII) and PF22678 (Cytochrom_c_NrfB-like) are present in
bakta_pfam_domainsfor Bacillota_B specifically — gate for NB06.
Inputs: kbase_ke_pangenome.{gtdb_taxonomy_r214v1, genome, gtdb_metadata, ncbi_env, bakta_pfam_domains}, kescience_bacdive.{isolation, taxonomy}.
Outputs: data/bacillota_b_universe.tsv (~334 genomes), data/cohort_assignments.tsv (anchor + baseline tagged), data/ir_pfam_availability.tsv (within-Bacillota_B PFAM count gate).
In [1]:
from pathlib import Path
import re
import pandas as pd
from pyspark.sql import functions as F
try:
spark
except NameError:
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
DATA_DIR = Path('../data')
DATA_DIR.mkdir(parents=True, exist_ok=True)
print(f'spark version: {spark.version}')
spark version: 4.0.1
1. Pull Bacillota_B universe with CheckM + biosample¶
In [2]:
universe = spark.sql("""
SELECT
g.genome_id, g.gtdb_species_clade_id, g.ncbi_biosample_id,
t.domain AS tax_domain,
t.phylum AS tax_phylum,
t.class AS tax_class,
t.order AS tax_order,
t.family AS tax_family,
t.genus AS tax_genus,
t.species AS tax_species,
m.checkm_completeness,
m.checkm_contamination,
m.gc_percentage,
m.genome_size,
m.ncbi_assembly_level AS assembly_level
FROM kbase_ke_pangenome.gtdb_taxonomy_r214v1 t
JOIN kbase_ke_pangenome.genome g ON t.genome_id = g.genome_id
LEFT JOIN kbase_ke_pangenome.gtdb_metadata m ON g.genome_id = m.accession
WHERE t.phylum = 'p__Bacillota_B'
""").toPandas()
universe['checkm_completeness'] = pd.to_numeric(universe['checkm_completeness'], errors='coerce')
universe['checkm_contamination'] = pd.to_numeric(universe['checkm_contamination'], errors='coerce')
universe['genome_size'] = pd.to_numeric(universe['genome_size'], errors='coerce')
universe['gc_percentage'] = pd.to_numeric(universe['gc_percentage'], errors='coerce')
print(f'Bacillota_B universe: {len(universe)} genomes')
print(f'Top orders:'); print(universe['tax_order'].value_counts().head(8))
print(f'\nTop genera:'); print(universe['tax_genus'].value_counts().head(10))
print(f'\nCheckM-clean (>=80, <=5): {((universe["checkm_completeness"]>=80)&(universe["checkm_contamination"]<=5)).sum()}')
Bacillota_B universe: 334 genomes Top orders: tax_order o__Desulfotomaculales 71 o__Syntrophomonadales 55 o__Desulfitobacteriales 53 o__Peptococcales 38 o__Moorellales 25 o__Ammonifexales 19 o__Thermacetogeniales 18 o__UBA4068 12 Name: count, dtype: int64 Top genera: tax_genus g__Moorella 21 g__Desulfosporosinus 21 g__UBA4844 19 g__Desulfitobacterium 17 g__UBA7185 16 g__UMGS1590 15 g__Dehalobacter 12 g__Desulfotomaculum 12 g__46-80 10 g__SCADC1-2-3 9 Name: count, dtype: int64 CheckM-clean (>=80, <=5): 267
2. Pull isolation_source per genome (via ncbi_env)¶
In [3]:
biosample_ids = universe['ncbi_biosample_id'].dropna().unique().tolist()
biosample_in = ','.join([f"'{b}'" for b in biosample_ids])
env_df = spark.sql(f"""
SELECT accession, attribute_name, content
FROM kbase_ke_pangenome.ncbi_env
WHERE accession IN ({biosample_in})
AND attribute_name IN ('isolation_source', 'isolation-source', 'env_feature', 'env_medium',
'env_local_scale', 'env_biome', 'sample_type', 'geo_loc_name',
'biomaterial_provider', 'Isolation Site')
""").toPandas()
agg = (env_df.groupby('accession')['content']
.apply(lambda s: ' || '.join(sorted(set(s.dropna()))))
.reset_index()
.rename(columns={'accession':'ncbi_biosample_id', 'content':'raw_isolation_source'}))
universe = universe.merge(agg, on='ncbi_biosample_id', how='left')
universe['raw_isolation_source'] = universe['raw_isolation_source'].fillna('')
print(f'genomes with any env_string: {(universe["raw_isolation_source"]!="").sum()}/{len(universe)}')
genomes with any env_string: 300/334
3. Cohort classification¶
- anchor_deep_clay: Mont Terri Opalinus + Coalvale + bentonite + Callovo-Oxfordian + kaolin keywords
- anchor_other_clay: any other clay mention (ambiguous depth)
- soil_baseline: soil/sediment without clay mention
- unclassified: everything else
In [4]:
def classify(text):
t = (text or '').lower()
deep_clay_kw = ['opalinus', 'mont terri', 'mont-terri', 'bentonite', 'mx-80', 'mx80',
'callovo', 'oxfordian', 'kaolin', 'coalvale', 'cerrado',
'borehole', 'porewater', 'pore water', 'subsurface clay']
other_clay_kw = ['clay'] # any clay mention not caught by deep
soil_kw = ['soil', 'sediment', 'mud', 'rhizosphere', 'agricultural']
if any(k in t for k in deep_clay_kw):
return 'anchor_deep_clay'
if any(k in t for k in other_clay_kw):
return 'anchor_other_clay'
if any(k in t for k in soil_kw):
return 'soil_baseline'
return 'unclassified'
universe['cohort_class'] = universe['raw_isolation_source'].apply(classify)
qc_clean = universe[(universe['checkm_completeness']>=80) & (universe['checkm_contamination']<=5)].copy()
print('=== cohort distribution (CheckM-clean) ===')
print(qc_clean['cohort_class'].value_counts())
print('\n=== anchor_deep_clay genomes ===')
print(qc_clean[qc_clean['cohort_class']=='anchor_deep_clay'][['genome_id','tax_genus','raw_isolation_source']].to_string(index=False))
=== cohort distribution (CheckM-clean) ===
cohort_class
unclassified 195
soil_baseline 62
anchor_deep_clay 10
Name: count, dtype: int64
=== anchor_deep_clay genomes ===
genome_id tax_genus raw_isolation_source
GB_GCA_020725505.1 g__Desulforudis Russia: Borehole near Beyelii Yar in Tomsk || groundwater || groundwater filtered on 0.22 m filters || groundwater|terrestrial subsurface
GB_GCA_000961805.1 g__Desulfosporosinus Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole || Switzerland: Mt Terri URL, St-Ursanne || terrestrial
GB_GCA_003512645.1 g__Desulfosporosinus metagenomic assembly || not applicable || rock porewater
GB_GCA_000802115.1 g__BRH-c8a Opalinus Clay rock || Opalinus Clay rock BIC-A1 borehole || Switzerland: Mt Terri URL, St-Ursanne || terrestrial
GB_GCA_000961575.1 g__BRH-c8a Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole || Switzerland: Mt Terri URL, St-Ursanne || terrestrial
GB_GCA_003512665.1 g__BRH-c8a metagenomic assembly || not applicable || rock porewater
GB_GCA_003513495.1 g__Desulfosporosinus metagenomic assembly || not applicable || rock porewater
GB_GCA_020725365.1 g__Ch130 Russia: Borehole near Beyelii Yar in Tomsk || groundwater || groundwater filtered on 0.22 m filters || groundwater|terrestrial subsurface
GB_GCA_000802095.1 g__BRHc4a Opalinus Clay rock || Opalinus Clay rock BIC-A1 borehole || Switzerland: Mt Terri URL, St-Ursanne || terrestrial
GB_GCA_003514655.1 g__BRHc4a metagenomic assembly || not applicable || rock porewater
4. Optional BacDive expansion: clay-isolated Bacillota_B genera not yet captured¶
In [5]:
# Identify Bacillota_B genera in our universe — used as the BacDive lookup target.
bacillota_b_genera = sorted({g.replace('g__','') for g in universe['tax_genus'].dropna().unique() if g})
print(f'Bacillota_B genera in BERDL: {len(bacillota_b_genera)}')
print(f'examples: {bacillota_b_genera[:15]}')
# BacDive: clay sample_type rows whose taxonomy intersects with our genera.
# Pull all clay BacDive rows; cross-check genus offline.
bd = spark.sql("""
SELECT bacdive_id, sample_type, country, cat1, cat2, cat3
FROM kescience_bacdive.isolation
WHERE LOWER(sample_type) LIKE '%clay%'
""").toPandas()
print(f'\nBacDive clay rows total: {len(bd)}')
# Pull BacDive taxonomy for those bacdive_ids
bd_ids = bd['bacdive_id'].dropna().unique().tolist()
if bd_ids:
in_clause = ','.join([f"'{b}'" for b in bd_ids])
bd_tax = spark.sql(f"""
SELECT bacdive_id, genus, species
FROM kescience_bacdive.taxonomy
WHERE bacdive_id IN ({in_clause})
""").toPandas()
bd_clay_bacillota_b = bd_tax[bd_tax['genus'].isin(bacillota_b_genera)]
print(f'BacDive clay rows with Bacillota_B genus: {len(bd_clay_bacillota_b)}')
print(bd_clay_bacillota_b['genus'].value_counts())
else:
bd_clay_bacillota_b = pd.DataFrame()
Bacillota_B genera in BERDL: 67 examples: ['41-269', '46-80', '50-218', 'Avidehalobacter', 'BRH-c8a', 'BRHc4a', 'Carboxydocella', 'Carboxydothermus', 'Ch130', 'Cryptoclostridium', 'DTU018', 'DTU019', 'DTU052', 'DTU068', 'DTU073']
BacDive clay rows total: 65
BacDive clay rows with Bacillota_B genus: 2 genus Desulfitobacterium 1 Desulfosporosinus 1 Name: count, dtype: int64
5. IR PFAM availability gate (within-Bacillota_B specific)¶
PF14537 was confirmed silently absent at the table level during plan-writing. Here we confirm PF02085 (Cytochrom_CIII) and PF22678 (Cytochrom_c_NrfB-like) are also present within Bacillota_B — not just globally — before NB06 commits to the corrected IR detector.
In [6]:
univ_genome_ids = qc_clean['genome_id'].dropna().tolist()
spark.createDataFrame([(g,) for g in univ_genome_ids], ['genome_id']).createOrReplaceTempView('bb_qc_genomes')
ir_avail = spark.sql("""
WITH bb_clusters AS (
SELECT DISTINCT j.gene_cluster_id
FROM kbase_ke_pangenome.gene g
JOIN kbase_ke_pangenome.gene_genecluster_junction j ON g.gene_id = j.gene_id
JOIN bb_qc_genomes b ON b.genome_id = g.genome_id
)
SELECT p.pfam_id, p.pfam_name, COUNT(*) AS n_hits, COUNT(DISTINCT p.gene_cluster_id) AS n_clusters
FROM kbase_ke_pangenome.bakta_pfam_domains p
JOIN bb_clusters c ON p.gene_cluster_id = c.gene_cluster_id
WHERE p.pfam_id LIKE 'PF02085%' OR p.pfam_id LIKE 'PF22678%' OR p.pfam_id LIKE 'PF14537%'
OR p.pfam_id LIKE 'PF00034%' OR p.pfam_id LIKE 'PF13442%'
GROUP BY p.pfam_id, p.pfam_name
ORDER BY n_hits DESC
""").toPandas()
print('=== IR PFAM availability within Bacillota_B ===')
print(ir_avail.to_string(index=False))
ir_avail.to_csv(DATA_DIR / 'ir_pfam_availability.tsv', sep='\t', index=False)
# Gate decision
pf02085 = (ir_avail['pfam_id'].str.startswith('PF02085')).any()
pf22678 = (ir_avail['pfam_id'].str.startswith('PF22678')).any()
print(f'\nGATE: PF02085 present in Bacillota_B = {pf02085}; PF22678 present = {pf22678}')
if not (pf02085 or pf22678):
print('!!! Both alternative IR PFAMs absent within Bacillota_B — NB06 must rely on CXXCH motif counting alone.')
=== IR PFAM availability within Bacillota_B === pfam_id pfam_name n_hits n_clusters PF02085.21 Cytochrom_CIII 4 4 PF00034.26 Cytochrom_C 1 1 PF13442.11 Cytochrome_CBB3 1 1 PF22678.1 Cytochrom_c_NrfB-like 1 1 GATE: PF02085 present in Bacillota_B = True; PF22678 present = True
6. Write outputs¶
In [7]:
out_cols = ['genome_id', 'gtdb_species_clade_id', 'ncbi_biosample_id', 'cohort_class',
'tax_domain','tax_phylum','tax_class','tax_order','tax_family','tax_genus','tax_species',
'checkm_completeness','checkm_contamination','gc_percentage','genome_size','assembly_level',
'raw_isolation_source']
qc_clean[out_cols].to_csv(DATA_DIR / 'bacillota_b_universe.tsv', sep='\t', index=False)
print(f'wrote bacillota_b_universe.tsv ({len(qc_clean)} rows)')
cohort = qc_clean[qc_clean['cohort_class'].isin(['anchor_deep_clay','anchor_other_clay','soil_baseline'])].copy()
cohort[out_cols].to_csv(DATA_DIR / 'cohort_assignments.tsv', sep='\t', index=False)
print(f'wrote cohort_assignments.tsv ({len(cohort)} rows)')
summary = (cohort.groupby(['cohort_class','tax_order'])
.agg(n_genomes=('genome_id','count'),
n_genera=('tax_genus', pd.Series.nunique))
.reset_index().sort_values(['cohort_class','n_genomes'], ascending=[True, False]))
summary.to_csv(DATA_DIR / 'cohort_summary.tsv', sep='\t', index=False)
print('\n=== cohort × tax_order ===')
summary
wrote bacillota_b_universe.tsv (267 rows) wrote cohort_assignments.tsv (72 rows) === cohort × tax_order ===
Out[7]:
| cohort_class | tax_order | n_genomes | n_genera | |
|---|---|---|---|---|
| 2 | anchor_deep_clay | o__Desulfotomaculales | 5 | 2 |
| 1 | anchor_deep_clay | o__Desulfitobacteriales | 3 | 1 |
| 0 | anchor_deep_clay | o__Ammonifexales | 1 | 1 |
| 3 | anchor_deep_clay | o__Thermacetogeniales | 1 | 1 |
| 10 | soil_baseline | o__Syntrophomonadales | 29 | 5 |
| 6 | soil_baseline | o__Desulfitobacteriales | 16 | 2 |
| 9 | soil_baseline | o__Moorellales | 6 | 2 |
| 11 | soil_baseline | o__Thermacetogeniales | 4 | 2 |
| 4 | soil_baseline | o__Ammonifexales | 2 | 2 |
| 5 | soil_baseline | o__Carboxydocellales | 2 | 1 |
| 7 | soil_baseline | o__Desulfotomaculales | 1 | 1 |
| 8 | soil_baseline | o__Heliobacteriales | 1 | 1 |
| 12 | soil_baseline | o__Thermincolales | 1 | 1 |