01 Cohort Assembly
Jupyter notebook from the Self-Sufficiency, Anaerobic Toolkit, and Cultivation Bias in Clay-Confined Cultured Bacterial Genomes project.
NB01 — Cohort Assembly¶
Goal: Identify clay-confined / shallow-clay anchor + contrast cohorts and link them to BERDL pangenome genome_id. Tag each genome with compartment, depth_class, and sub_cohort based on biosample isolation_source / env_* keywords.
Inputs: kbase_ke_pangenome.ncbi_env (biosample env metadata, 4.1M rows), kbase_ke_pangenome.genome (293K), kbase_ke_pangenome.gtdb_metadata (293K), kbase_ke_pangenome.gtdb_taxonomy_r214v1 (293K).
Output: data/cohort_assignments.tsv — one row per anchor / shallow-contrast genome with full taxonomy + classification. Soil baseline is built separately in NB03.
Expected scale (from lit-review survey): 56 genomes mention 'clay' in ncbi_env; deep-confined sub-cohort (Opalinus + bentonite + Callovo + kaolin) ≈ 12–15; shallow-clay sub-cohort (Coalvale + Cerrado + agricultural) ≈ 12–15; remainder unclassified.
1. Spark session + imports¶
import re
import os
from pathlib import Path
import pandas as pd
from pyspark.sql import functions as F
try:
spark
except NameError:
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
DATA_DIR = Path('../data')
DATA_DIR.mkdir(parents=True, exist_ok=True)
print(f'spark version: {spark.version}')
print(f'data dir: {DATA_DIR.resolve()}')
spark version: 4.0.1 data dir: /home/dauglyon/BERIL-research-observatory/projects/clay_confined_subsurface/data
2. Pull clay-mentioning biosample accessions¶
We want any biosample whose isolation_source, env_feature, env_medium, env_local_scale, or env_biome field contains a clay-related token. We collect the raw content per accession so we can apply our classifier in step 3.
ENV_ATTRS = ('isolation_source', 'isolation-source', 'env_feature', 'env_medium', 'env_local_scale', 'env_biome', 'sample_type')
clay_env_df = spark.sql(f"""
SELECT accession, attribute_name, content
FROM kbase_ke_pangenome.ncbi_env
WHERE attribute_name IN {ENV_ATTRS}
AND LOWER(content) LIKE '%clay%'
""")
clay_env_df.cache()
n_rows = clay_env_df.count()
n_acc = clay_env_df.select('accession').distinct().count()
print(f'clay-mentioning ncbi_env rows: {n_rows}')
print(f'distinct accessions: {n_acc}')
clay_env_df.show(10, truncate=80)
clay-mentioning ncbi_env rows: 71 distinct accessions: 61
+------------+----------------+---------------------------------------------------------+ | accession| attribute_name| content| +------------+----------------+---------------------------------------------------------+ |SAMN10868585| sample_type| Grey/brownish clay| |SAMN26307981|isolation_source| clay| |SAMN16244548|isolation_source| clay soil| |SAMN02953213| env_feature| Opalinus Clay rock| |SAMN02953213|isolation_source| Opalinus Clay rock BIC-A1 borehole| |SAMN02953437| env_feature| Opalinus Clay rock| |SAMN02953437|isolation_source| Opalinus Clay rock BIC-A1 borehole| |SAMN02953445| env_feature| Opalinus Clay rock| |SAMN02953445|isolation_source| Opalinus Clay rock BIC-A1 borehole| |SAMN10721045|isolation_source|clay soil with high temperature around 45 degrees Celsius| +------------+----------------+---------------------------------------------------------+ only showing top 10 rows
3. Per-accession classification¶
For each biosample accession, aggregate all clay-mentioning attribute content into a single string, then apply keyword-based classifiers for:
sub_cohort—opalinus/bentonite/callovo_oxfordian/kaolin/coalvale/cerrado/agricultural_clay/geothermal_clay/marine_clay/lab_clay/other_claycompartment—porewater_borehole/rock_attached/bentonite_buffer/clay_sediment/clay_soil_surface/unknowndepth_class—surface/shallow/deep/deep_rock/unknowncohort_class—anchor_deep/anchor_shallow/excluded(engineered/lab) /unclassified
Classifier is keyword-driven; ambiguous cases default to unknown / unclassified. Source strings are kept in raw_isolation_source so downstream notebooks can sanity-check by hand.
# Aggregate clay-mentioning content per accession into a single string
agg_df = (
clay_env_df
.groupBy('accession')
.agg(F.concat_ws(' || ', F.collect_set('content')).alias('raw_isolation_source'))
)
agg_pdf = agg_df.toPandas()
print(f'distinct accessions to classify: {len(agg_pdf)}')
agg_pdf.head()
distinct accessions to classify: 61
| accession | raw_isolation_source | |
|---|---|---|
| 0 | SAMN07627093 | red clay soil |
| 1 | SAMN16249741 | clay soil |
| 2 | SAMN10994883 | Clayey mud of a drained pond |
| 3 | SAMN07626386 | red clay soil |
| 4 | SAMN07765385 | Clay soil |
def classify(text: str) -> dict:
"""Keyword classifier for clay-isolated biosamples. Lowered text in."""
t = (text or '').lower()
sub_cohort = 'other_clay'
compartment = 'unknown'
depth_class = 'unknown'
cohort_class = 'unclassified'
# ---- sub_cohort ----
if 'opalinus' in t or 'mont terri' in t or 'mont-terri' in t:
sub_cohort = 'opalinus'
elif 'bentonite' in t or 'mx-80' in t or 'mx80' in t:
sub_cohort = 'bentonite'
elif 'callovo' in t or 'oxfordian' in t:
sub_cohort = 'callovo_oxfordian'
elif 'kaolin' in t:
sub_cohort = 'kaolin'
elif 'coalvale' in t:
sub_cohort = 'coalvale'
elif 'cerrado' in t:
sub_cohort = 'cerrado'
elif 'kerosene' in t or 'lab enrichment' in t or 'laboratory' in t:
sub_cohort = 'lab_clay'
elif 'geothermal' in t or 'thermal spring' in t or 'high temperature' in t:
sub_cohort = 'geothermal_clay'
elif 'marine' in t or 'sea bottom' in t or 'seawater' in t:
sub_cohort = 'marine_clay'
elif any(k in t for k in ['rhizosphere', 'agricult', 'cauliflower', 'grass', 'garden', 'pasture', 'plant', 'crop', 'field']):
sub_cohort = 'agricultural_clay'
elif 'clay soil' in t or 'silty clay' in t or 'sandy clay' in t:
sub_cohort = 'agricultural_clay' # soil-typed clay default
# ---- compartment ----
if 'porewater' in t or 'pore water' in t or 'borehole' in t or 'brc-' in t or 'bic-' in t:
compartment = 'porewater_borehole'
elif 'rock' in t and 'opalinus' in t:
compartment = 'rock_attached' # rare; usually rock-drilled is still porewater-cultured
elif 'crushed rock' in t or 'compacted' in t or 'bentonite' in t or 'buffer' in t:
compartment = 'bentonite_buffer'
elif 'sediment' in t or 'mud' in t or 'lake' in t or 'pond' in t:
compartment = 'clay_sediment'
elif 'clay soil' in t or 'soil' in t or 'rhizosphere' in t or 'agricult' in t:
compartment = 'clay_soil_surface'
# ---- depth_class ----
if sub_cohort in ('opalinus', 'callovo_oxfordian'):
depth_class = 'deep_rock'
elif sub_cohort in ('bentonite', 'kaolin'):
# bentonite/kaolin in BERDL biosamples are repository-relevant by default; lit treats all as deep-confined (Pedersen, Engel, Beaver)
depth_class = 'deep'
elif sub_cohort in ('coalvale', 'cerrado', 'agricultural_clay'):
depth_class = 'surface'
if compartment == 'unknown':
compartment = 'clay_soil_surface'
elif sub_cohort == 'marine_clay':
depth_class = 'shallow' # sea-bottom; not deep terrestrial
elif sub_cohort in ('geothermal_clay',):
depth_class = 'shallow'
# ---- cohort_class ----
if depth_class in ('deep', 'deep_rock'):
cohort_class = 'anchor_deep'
elif depth_class == 'surface':
cohort_class = 'anchor_shallow'
elif sub_cohort in ('lab_clay',):
cohort_class = 'excluded'
# else stays 'unclassified'
return dict(sub_cohort=sub_cohort, compartment=compartment,
depth_class=depth_class, cohort_class=cohort_class)
classified = agg_pdf['raw_isolation_source'].apply(classify).apply(pd.Series)
agg_pdf = pd.concat([agg_pdf, classified], axis=1)
print('cohort_class distribution:')
print(agg_pdf['cohort_class'].value_counts())
print('\nsub_cohort distribution:')
print(agg_pdf['sub_cohort'].value_counts())
print('\ncompartment distribution:')
print(agg_pdf['compartment'].value_counts())
print('\ndepth_class distribution:')
print(agg_pdf['depth_class'].value_counts())
cohort_class distribution: cohort_class anchor_shallow 30 unclassified 20 anchor_deep 9 excluded 2 Name: count, dtype: int64 sub_cohort distribution: sub_cohort agricultural_clay 21 other_clay 16 opalinus 8 coalvale 8 marine_clay 2 lab_clay 2 geothermal_clay 2 bentonite 1 cerrado 1 Name: count, dtype: int64 compartment distribution: compartment clay_soil_surface 32 unknown 15 porewater_borehole 8 clay_sediment 5 bentonite_buffer 1 Name: count, dtype: int64 depth_class distribution: depth_class surface 30 unknown 18 deep_rock 8 shallow 4 deep 1 Name: count, dtype: int64
4. Spot-check the classifier¶
for grp, sub in agg_pdf.groupby('cohort_class'):
print(f'\n=== cohort_class = {grp} (n={len(sub)}) ===')
for _, row in sub.head(15).iterrows():
src = (row['raw_isolation_source'] or '')[:90]
print(f" {row['accession']:14} | {row['sub_cohort']:18} | {row['compartment']:20} | {row['depth_class']:10} | {src}")
=== cohort_class = anchor_deep (n=9) === SAMN03400149 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole SAMN03400155 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole SAMN04126960 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole SAMN05804953 | bentonite | bentonite_buffer | deep | bentonite clay formations SAMN02953213 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock BIC-A1 borehole SAMN02953437 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock BIC-A1 borehole SAMN03400145 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole SAMN02953445 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock BIC-A1 borehole SAMN03400144 | opalinus | porewater_borehole | deep_rock | Opalinus Clay rock || Opalinus Clay rock porewater BRC-3 borehole === cohort_class = anchor_shallow (n=30) === SAMN07627093 | agricultural_clay | clay_soil_surface | surface | red clay soil SAMN16249741 | agricultural_clay | clay_soil_surface | surface | clay soil SAMN07626386 | agricultural_clay | clay_soil_surface | surface | red clay soil SAMN07765385 | agricultural_clay | clay_soil_surface | surface | Clay soil SAMN11093716 | coalvale | clay_soil_surface | surface | Coalvale silty clay SAMN07627101 | agricultural_clay | clay_soil_surface | surface | red clay soil SAMN11093723 | coalvale | clay_soil_surface | surface | Coalvale silty clay SAMN11093714 | coalvale | clay_soil_surface | surface | Coalvale silty clay SAMN07627173 | agricultural_clay | clay_soil_surface | surface | red clay soil SAMN07627055 | agricultural_clay | clay_soil_surface | surface | red clay soil SAMN11093717 | coalvale | clay_soil_surface | surface | Coalvale silty clay SAMN16244548 | agricultural_clay | clay_soil_surface | surface | clay soil SAMN11093719 | coalvale | clay_soil_surface | surface | Coalvale silty clay SAMN16249740 | agricultural_clay | clay_soil_surface | surface | clay soil SAMN07627098 | agricultural_clay | clay_soil_surface | surface | red clay soil === cohort_class = excluded (n=2) === SAMD00018760 | lab_clay | unknown | unknown | clay || Maas River clay in kerosene SAMN12697585 | lab_clay | unknown | unknown | Clay suspended in kerosene for three weeks === cohort_class = unclassified (n=20) === SAMN10994883 | other_clay | clay_sediment | unknown | Clayey mud of a drained pond SAMN10868585 | other_clay | unknown | unknown | Grey/brownish clay SAMN06451076 | other_clay | unknown | unknown | clay SAMEA85840168 | other_clay | clay_sediment | unknown | The genome from this organism was enriched from a creosote-contaminated wood preservation SAMN06641968 | other_clay | clay_sediment | unknown | Clayey mud of a drained pond SAMN26307981 | other_clay | unknown | unknown | clay SAMN23829263 | other_clay | unknown | unknown | Clay SAMN19594129 | marine_clay | unknown | shallow | dilbit-coated clay beads in seawater SAMN12638699 | other_clay | unknown | unknown | clay mineral SAMN19594130 | marine_clay | unknown | shallow | dilbit-coated clay beads in seawater SAMN20284516 | other_clay | unknown | unknown | loamy clay SAMN06545309 | other_clay | clay_sediment | unknown | Clay sediments SAMEA85840918 | other_clay | clay_sediment | unknown | The genome from this organism was enriched from a creosote-contaminated wood preservation SAMN06451077 | other_clay | unknown | unknown | clay SAMN06455912 | other_clay | unknown | unknown | clay
5. Join to pangenome genome and gtdb_metadata / gtdb_taxonomy_r214v1¶
# Push the classified pdf into Spark as a temp view so we can join
spark_classified = spark.createDataFrame(agg_pdf)
spark_classified.createOrReplaceTempView('classified_clay')
joined = spark.sql("""
SELECT
c.accession AS ncbi_biosample_id,
c.raw_isolation_source,
c.sub_cohort,
c.compartment,
c.depth_class,
c.cohort_class,
g.genome_id,
g.gtdb_species_clade_id,
g.gtdb_taxonomy_id,
m.checkm_completeness,
m.checkm_contamination,
m.gc_percentage,
m.genome_size,
m.ncbi_assembly_level AS assembly_level,
t.domain AS tax_domain,
t.phylum AS tax_phylum,
t.class AS tax_class,
t.order AS tax_order,
t.family AS tax_family,
t.genus AS tax_genus,
t.species AS tax_species
FROM classified_clay c
LEFT JOIN kbase_ke_pangenome.genome g
ON c.accession = g.ncbi_biosample_id
LEFT JOIN kbase_ke_pangenome.gtdb_metadata m
ON g.genome_id = m.accession
LEFT JOIN kbase_ke_pangenome.gtdb_taxonomy_r214v1 t
ON g.genome_id = t.genome_id
""")
joined.cache()
joined_pdf = joined.toPandas()
print(f'joined rows (one per accession-genome match, may include unmatched): {len(joined_pdf)}')
print(f'rows with genome_id (in pangenome): {joined_pdf["genome_id"].notna().sum()}')
print(f'rows without genome_id (not in pangenome): {joined_pdf["genome_id"].isna().sum()}')
joined rows (one per accession-genome match, may include unmatched): 61 rows with genome_id (in pangenome): 61 rows without genome_id (not in pangenome): 0
6. Filter to genomes-with-pangenome-data and report cohort breakdown¶
cohort = joined_pdf[joined_pdf['genome_id'].notna()].copy()
cohort['cohort_class'] = cohort['cohort_class'].fillna('unclassified')
print('=== ANCHOR + CONTRAST COHORT BREAKDOWN ===\n')
print('cohort_class × sub_cohort:')
ct = pd.crosstab(cohort['cohort_class'], cohort['sub_cohort'])
print(ct)
print('\ncohort_class × depth_class:')
print(pd.crosstab(cohort['cohort_class'], cohort['depth_class']))
print('\nphyla represented in anchor_deep:')
print(cohort.loc[cohort['cohort_class'] == 'anchor_deep', 'tax_phylum'].value_counts())
print('\nphyla represented in anchor_shallow:')
print(cohort.loc[cohort['cohort_class'] == 'anchor_shallow', 'tax_phylum'].value_counts())
print('\ngenera represented in anchor_deep:')
print(cohort.loc[cohort['cohort_class'] == 'anchor_deep', 'tax_genus'].value_counts().head(15))
=== ANCHOR + CONTRAST COHORT BREAKDOWN === cohort_class × sub_cohort: sub_cohort agricultural_clay bentonite cerrado coalvale \ cohort_class anchor_deep 0 1 0 0 anchor_shallow 21 0 1 8 excluded 0 0 0 0 unclassified 0 0 0 0 sub_cohort geothermal_clay lab_clay marine_clay opalinus other_clay cohort_class anchor_deep 0 0 0 8 0 anchor_shallow 0 0 0 0 0 excluded 0 2 0 0 0 unclassified 2 0 2 0 16 cohort_class × depth_class: depth_class deep deep_rock shallow surface unknown cohort_class anchor_deep 1 8 0 0 0 anchor_shallow 0 0 0 30 0 excluded 0 0 0 0 2 unclassified 0 0 4 0 16 phyla represented in anchor_deep: tax_phylum p__Bacillota_B 5 p__Pseudomonadota 2 p__Bacteroidota 2 Name: count, dtype: int64 phyla represented in anchor_shallow: tax_phylum p__Pseudomonadota 16 p__Bacillota 10 p__Actinomycetota 4 Name: count, dtype: int64 genera represented in anchor_deep: tax_genus g__BRH-c8a 2 g__Desulfosporosinus 2 g__Roseovarius 1 g__BRH-c54 1 g__Lutibacter 1 g__Stenotrophomonas 1 g__BRHc4a 1 Name: count, dtype: int64
7. Save cohort_assignments.tsv¶
out_cols = [
'genome_id', 'gtdb_species_clade_id', 'ncbi_biosample_id',
'cohort_class', 'sub_cohort', 'compartment', 'depth_class',
'tax_domain', 'tax_phylum', 'tax_class', 'tax_order',
'tax_family', 'tax_genus', 'tax_species',
'checkm_completeness', 'checkm_contamination',
'gc_percentage', 'genome_size', 'assembly_level',
'raw_isolation_source',
]
out_path = DATA_DIR / 'cohort_assignments.tsv'
cohort[out_cols].to_csv(out_path, sep='\t', index=False)
print(f'wrote {len(cohort)} rows to {out_path}')
# Also write a compact summary for downstream notebooks
summary = (
cohort.groupby(['cohort_class', 'sub_cohort', 'compartment', 'depth_class'])
.size().reset_index(name='n_genomes')
.sort_values(['cohort_class', 'n_genomes'], ascending=[True, False])
)
summary_path = DATA_DIR / 'cohort_summary.tsv'
summary.to_csv(summary_path, sep='\t', index=False)
print(f'wrote {len(summary)} summary rows to {summary_path}')
summary
wrote 61 rows to ../data/cohort_assignments.tsv wrote 10 summary rows to ../data/cohort_summary.tsv
| cohort_class | sub_cohort | compartment | depth_class | n_genomes | |
|---|---|---|---|---|---|
| 1 | anchor_deep | opalinus | porewater_borehole | deep_rock | 8 |
| 0 | anchor_deep | bentonite | bentonite_buffer | deep | 1 |
| 2 | anchor_shallow | agricultural_clay | clay_soil_surface | surface | 21 |
| 4 | anchor_shallow | coalvale | clay_soil_surface | surface | 8 |
| 3 | anchor_shallow | cerrado | clay_soil_surface | surface | 1 |
| 5 | excluded | lab_clay | unknown | unknown | 2 |
| 9 | unclassified | other_clay | unknown | unknown | 11 |
| 8 | unclassified | other_clay | clay_sediment | unknown | 5 |
| 6 | unclassified | geothermal_clay | clay_soil_surface | shallow | 2 |
| 7 | unclassified | marine_clay | unknown | shallow | 2 |