03 Soil Baseline
Jupyter notebook from the Self-Sufficiency, Anaerobic Toolkit, and Cultivation Bias in Clay-Confined Cultured Bacterial Genomes project.
NB03 — Soil/Sediment Baseline¶
Goal: Build a phylum-stratified random sample of soil/sediment-isolated genomes from kbase_ke_pangenome.ncbi_env, then extract the same marker + GapMind features as NB02. Used as the reference distribution for H1, H2, H3 statistical tests in NB04–06.
Strategy: For each phylum represented in the anchor cohort, sample target ≈ 4× the anchor cohort size per phylum (capped) from the broader soil/sediment pool. This controls for phylogenetic confounding — the literature flags this as a strong concern (Mitzscherling 2023; Beller 2012 Pelosinus).
Output: data/baseline_features.parquet — same schema as genome_features.parquet.
import re
from pathlib import Path
import pandas as pd
from pyspark.sql import functions as F
try:
spark
except NameError:
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
DATA_DIR = Path('../data')
cohort = pd.read_csv(DATA_DIR / 'cohort_assignments.tsv', sep='\t')
anchor = cohort[cohort['cohort_class'].isin(['anchor_deep','anchor_shallow'])].copy()
anchor_phyla = anchor['tax_phylum'].dropna().unique().tolist()
print(f'anchor cohort: {len(anchor)}; phyla: {anchor_phyla}')
# Target sample size per phylum: 4x the anchor count in that phylum, capped at 50
phy_targets = (
anchor.groupby('tax_phylum').size()
.apply(lambda n: min(50, max(20, 4 * n)))
.to_dict()
)
print(f'per-phylum baseline targets: {phy_targets}')
anchor cohort: 39; phyla: ['p__Pseudomonadota', 'p__Bacillota', 'p__Actinomycetota', 'p__Bacteroidota', 'p__Bacillota_B']
per-phylum baseline targets: {'p__Actinomycetota': 20, 'p__Bacillota': 40, 'p__Bacillota_B': 20, 'p__Bacteroidota': 20, 'p__Pseudomonadota': 50}
2. Identify candidate soil/sediment biosamples and link to pangenome¶
Match ncbi_env.content for soil/sediment keywords (excluding clay-specific terms used in the anchor cohort). Cap at a manageable random subset before joining.
# Pull soil/sediment biosamples that do NOT mention clay (avoid overlap with anchor)
soil_acc_df = spark.sql("""
SELECT DISTINCT accession
FROM kbase_ke_pangenome.ncbi_env
WHERE attribute_name IN ('isolation_source', 'env_feature', 'env_medium', 'env_local_scale', 'env_biome')
AND (LOWER(content) LIKE '%soil%' OR LOWER(content) LIKE '%sediment%')
AND LOWER(content) NOT LIKE '%clay%'
""")
soil_acc_df.cache()
n_soil_acc = soil_acc_df.count()
print(f'soil/sediment biosamples (clay excluded): {n_soil_acc}')
# Join to pangenome.genome and pull phylum
soil_acc_df.createOrReplaceTempView('soil_accs')
soil_genomes_df = spark.sql("""
SELECT g.genome_id, g.gtdb_species_clade_id, t.phylum AS tax_phylum, t.class AS tax_class,
t.order AS tax_order, t.family AS tax_family, t.genus AS tax_genus, t.species AS tax_species,
m.checkm_completeness, m.checkm_contamination, m.gc_percentage, m.genome_size,
m.ncbi_assembly_level AS assembly_level
FROM soil_accs s
JOIN kbase_ke_pangenome.genome g ON s.accession = g.ncbi_biosample_id
LEFT JOIN kbase_ke_pangenome.gtdb_taxonomy_r214v1 t ON g.genome_id = t.genome_id
LEFT JOIN kbase_ke_pangenome.gtdb_metadata m ON g.genome_id = m.accession
""")
soil_genomes_pdf = soil_genomes_df.toPandas()
print(f'soil/sediment genomes joined to pangenome: {len(soil_genomes_pdf)}')
print('phylum distribution (top 10):')
print(soil_genomes_pdf['tax_phylum'].value_counts().head(10))
soil/sediment biosamples (clay excluded): 14327
soil/sediment genomes joined to pangenome: 14338 phylum distribution (top 10): tax_phylum p__Pseudomonadota 4572 p__Bacillota 2233 p__Actinomycetota 2121 p__Bacteroidota 688 p__Patescibacteria 547 p__Acidobacteriota 435 p__Thermoproteota 327 p__Desulfobacterota 309 p__Chloroflexota 282 p__Thermoplasmatota 270 Name: count, dtype: int64
3. Phylum-stratified random sample¶
import numpy as np
rng = np.random.default_rng(20260430)
samples = []
for phylum in anchor_phyla:
target = phy_targets.get(phylum, 20)
pool = soil_genomes_pdf[soil_genomes_pdf['tax_phylum'] == phylum]
n_take = min(target, len(pool))
if n_take == 0:
print(f' {phylum}: 0 baseline candidates — skipping')
continue
sampled = pool.sample(n=n_take, random_state=int(rng.integers(0, 2**31 - 1)))
samples.append(sampled)
print(f' {phylum}: target={target}, pool={len(pool)}, took={n_take}')
baseline = pd.concat(samples, ignore_index=True)
baseline['cohort_class'] = 'soil_baseline'
baseline['sub_cohort'] = 'soil_random'
baseline['compartment'] = 'soil'
baseline['depth_class'] = 'surface'
print(f'\nbaseline cohort: {len(baseline)}')
p__Pseudomonadota: target=50, pool=4572, took=50 p__Bacillota: target=40, pool=2233, took=40 p__Actinomycetota: target=20, pool=2121, took=20 p__Bacteroidota: target=20, pool=688, took=20 p__Bacillota_B: target=20, pool=30, took=20 baseline cohort: 150
4. Pull cluster mappings for baseline genomes (same pipeline as NB02)¶
baseline_genomes = baseline['genome_id'].dropna().unique().tolist()
genome_in = ','.join([f"'{g}'" for g in baseline_genomes])
gene_cluster_df = spark.sql(f"""
SELECT g.genome_id, j.gene_cluster_id
FROM kbase_ke_pangenome.gene g
JOIN kbase_ke_pangenome.gene_genecluster_junction j
ON g.gene_id = j.gene_id
WHERE g.genome_id IN ({genome_in})
""")
gene_cluster_df.cache()
n_pairs = gene_cluster_df.count()
n_genomes_with_clusters = gene_cluster_df.select('genome_id').distinct().count()
n_unique_clusters = gene_cluster_df.select('gene_cluster_id').distinct().count()
print(f'gene-cluster pairs: {n_pairs}; genomes with clusters: {n_genomes_with_clusters}/{len(baseline_genomes)}; unique clusters: {n_unique_clusters}')
gene-cluster pairs: 637314; genomes with clusters: 150/150; unique clusters: 573003
gene_cluster_df.select('gene_cluster_id').distinct().createOrReplaceTempView('baseline_clusters')
ann_df = spark.sql("""
SELECT a.query_name AS gene_cluster_id,
a.KEGG_ko, a.COG_category, a.PFAMs, a.EC, a.Preferred_name
FROM kbase_ke_pangenome.eggnog_mapper_annotations a
JOIN baseline_clusters c
ON a.query_name = c.gene_cluster_id
""")
ann_df.cache()
n_ann = ann_df.count()
print(f'eggnog annotations for baseline clusters: {n_ann}')
eggnog annotations for baseline clusters: 503405
5. Marker calling (same definitions as NB02)¶
MARKERS = {
'WL_acsA': 'K00198', 'WL_acsB': 'K14138',
'WL_acsC': 'K00197', 'WL_acsD': 'K15022', 'WL_acsE': 'K15023',
'NiFe_hyaA': 'K06281', 'NiFe_hyaB': 'K06282',
'SR_dsrA': 'K11180', 'SR_dsrB': 'K11181',
'SR_aprA': 'K00394', 'SR_aprB': 'K00395', 'SR_sat': 'K00958',
'IR_omcS': 'K07811', 'IR_mtrC': 'K17324', 'IR_mtrA': 'K17323',
'NifH': 'K02588', 'NifD': 'K02586', 'NifK': 'K02591',
}
ann_pdf = ann_df.toPandas()
ann_pdf['KEGG_ko'] = ann_pdf['KEGG_ko'].fillna('')
for marker, ko in MARKERS.items():
ann_pdf[f'has_{marker}'] = ann_pdf['KEGG_ko'].str.contains(ko, regex=False)
marker_cols = [f'has_{m}' for m in MARKERS]
cluster_markers = ann_pdf.groupby('gene_cluster_id')[marker_cols].any().reset_index()
gc_pdf = gene_cluster_df.toPandas().drop_duplicates()
merged = gc_pdf.merge(cluster_markers, on='gene_cluster_id', how='left')
for c in marker_cols:
merged[c] = merged[c].fillna(False)
genome_markers = merged.groupby('genome_id')[marker_cols].any().reset_index()
WL_cols = [f'has_WL_{x}' for x in ['acsA','acsB','acsC','acsD','acsE']]
NiFe_cols = [f'has_NiFe_{x}' for x in ['hyaA','hyaB']]
SR_cols = [f'has_SR_{x}' for x in ['dsrA','dsrB','aprA','aprB','sat']]
IR_cols = [f'has_IR_{x}' for x in ['omcS','mtrC','mtrA']]
Nif_cols = ['has_NifH','has_NifD','has_NifK']
for name, cols in [('WL',WL_cols),('NiFe',NiFe_cols),('SR',SR_cols),('IR',IR_cols),('Nif',Nif_cols)]:
genome_markers[f'{name}_count'] = genome_markers[cols].sum(axis=1)
genome_markers['WL_complete'] = genome_markers['WL_count'] >= 3
genome_markers['NiFe_complete'] = genome_markers['NiFe_count'] >= 1
genome_markers['SR_complete'] = genome_markers['SR_count'] >= 3
genome_markers['IR_complete'] = genome_markers['IR_count'] >= 1
genome_markers['Nif_complete'] = genome_markers['Nif_count'] >= 2
genome_markers['toolkit_score'] = (
genome_markers['WL_complete'].astype(int)
+ genome_markers['NiFe_complete'].astype(int)
+ genome_markers['SR_complete'].astype(int)
)
print(f'baseline genome_markers shape: {genome_markers.shape}')
print('module completeness rates:')
for c in ['WL_complete','NiFe_complete','SR_complete','IR_complete','Nif_complete']:
print(f' {c}: {genome_markers[c].mean():.3f}')
print(f'mean toolkit_score: {genome_markers["toolkit_score"].mean():.3f}')
baseline genome_markers shape: (150, 30) module completeness rates: WL_complete: 0.100 NiFe_complete: 0.247 SR_complete: 0.040 IR_complete: 0.200 Nif_complete: 0.127 mean toolkit_score: 0.387
6. GapMind aa-pathway completeness for baseline (per pitfalls.md)¶
def strip_prefix(g):
return re.sub(r'^(GB_|RS_)', '', g)
gapmind_in = ','.join([f"'{strip_prefix(g)}'" for g in baseline_genomes])
gapmind_df = spark.sql(f"""
WITH best_per_pathway AS (
SELECT genome_id, pathway, MAX(score_simplified) AS best_score
FROM kbase_ke_pangenome.gapmind_pathways
WHERE genome_id IN ({gapmind_in}) AND metabolic_category = 'aa'
GROUP BY genome_id, pathway
)
SELECT genome_id AS gapmind_genome_id,
COUNT(*) AS aa_pathway_total,
SUM(CASE WHEN best_score >= 1 THEN 1 ELSE 0 END) AS aa_pathway_complete
FROM best_per_pathway
GROUP BY genome_id
""")
gapmind_pdf = gapmind_df.toPandas()
prefix_map = {strip_prefix(g): g for g in baseline_genomes}
gapmind_pdf['genome_id'] = gapmind_pdf['gapmind_genome_id'].map(prefix_map)
print(f'baseline genomes with gapmind data: {len(gapmind_pdf)}')
print(gapmind_pdf['aa_pathway_complete'].describe())
baseline genomes with gapmind data: 150 count 150.000000 mean 16.660000 std 2.519268 min 6.000000 25% 17.000000 50% 18.000000 75% 18.000000 max 18.000000 Name: aa_pathway_complete, dtype: float64
7. Combine baseline features and write parquet¶
features = baseline.merge(genome_markers, on='genome_id', how='left').merge(gapmind_pdf, on='genome_id', how='left')
for c in marker_cols + ['WL_complete','NiFe_complete','SR_complete','IR_complete','Nif_complete']:
features[c] = features[c].fillna(False)
for c in ['WL_count','NiFe_count','SR_count','IR_count','Nif_count','toolkit_score','aa_pathway_total','aa_pathway_complete']:
features[c] = features[c].fillna(0)
out_path = DATA_DIR / 'baseline_features.parquet'
features.to_parquet(out_path, index=False)
print(f'wrote {len(features)} rows × {len(features.columns)} cols to {out_path}')
summary = features.groupby('tax_phylum').agg(
n=('genome_id', 'count'),
WL=('WL_complete', 'mean'),
NiFe=('NiFe_complete', 'mean'),
SR=('SR_complete', 'mean'),
IR=('IR_complete', 'mean'),
Nif=('Nif_complete', 'mean'),
toolkit=('toolkit_score', 'mean'),
aa_complete=('aa_pathway_complete', 'mean'),
).round(3)
summary
wrote 150 rows × 49 cols to ../data/baseline_features.parquet
| n | WL | NiFe | SR | IR | Nif | toolkit | aa_complete | |
|---|---|---|---|---|---|---|---|---|
| tax_phylum | ||||||||
| p__Actinomycetota | 20 | 0.00 | 0.300 | 0.00 | 0.05 | 0.00 | 0.300 | 16.100 |
| p__Bacillota | 40 | 0.00 | 0.025 | 0.00 | 0.00 | 0.00 | 0.025 | 17.675 |
| p__Bacillota_B | 20 | 0.75 | 0.700 | 0.20 | 0.00 | 0.55 | 1.650 | 16.600 |
| p__Bacteroidota | 20 | 0.00 | 0.150 | 0.00 | 0.00 | 0.05 | 0.150 | 14.150 |
| p__Pseudomonadota | 50 | 0.00 | 0.260 | 0.04 | 0.58 | 0.14 | 0.300 | 17.100 |