02 Genome Features
Jupyter notebook from the Self-Sufficiency, Anaerobic Toolkit, and Cultivation Bias in Clay-Confined Cultured Bacterial Genomes project.
NB02 — Genome Feature Extraction¶
Goal: For each cohort genome, extract per-genome marker presence (Wood–Ljungdahl, group 1 [NiFe]-hydrogenase, dissimilatory sulfate reduction, iron reduction, N-fixation) from eggnog_mapper_annotations, plus GapMind amino-acid pathway completeness from gapmind_pathways.
Inputs: data/cohort_assignments.tsv (from NB01), kbase_ke_pangenome.gene, gene_genecluster_junction, eggnog_mapper_annotations, gapmind_pathways.
Output: data/genome_features.parquet with one row per cohort genome × marker booleans + amino-acid pathway-complete count.
Marker definitions (KEGG K-numbers from RESEARCH_PLAN.md):
| Module | KEGG KOs | Threshold for 'complete' |
|---|---|---|
| Wood–Ljungdahl | K00198 cooS/acsA, K14138 acsB, K00197 acsC, K15022 acsD, K15023 acsE | ≥3/5 |
| Group 1 [NiFe]-hydrogenase | K06281 hyaA, K06282 hyaB | ≥1/2 |
| Dissimilatory sulfate reduction | K11180 dsrA, K11181 dsrB, K00394 aprA, K00395 aprB, K00958 sat | ≥3/5 |
| Iron reduction (Mtr/Omc) | K07811 omcS, K17324 mtrC, K17323 mtrA | ≥1/3 |
| N-fixation | K02588 nifH, K02586 nifD, K02591 nifK | ≥2/3 |
1. Setup¶
import os
from pathlib import Path
import pandas as pd
from pyspark.sql import functions as F
try:
spark
except NameError:
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
DATA_DIR = Path('../data')
cohort = pd.read_csv(DATA_DIR / 'cohort_assignments.tsv', sep='\t')
print(f'cohort rows: {len(cohort)}')
print(cohort['cohort_class'].value_counts())
# Use all cohort genomes (anchor + shallow) for now; baseline soil sample built in NB03
cohort_genomes = cohort.loc[cohort['cohort_class'].isin(['anchor_deep', 'anchor_shallow', 'unclassified']), 'genome_id'].dropna().unique().tolist()
print(f'cohort genomes for feature extraction: {len(cohort_genomes)}')
cohort rows: 61 cohort_class anchor_shallow 30 unclassified 20 anchor_deep 9 excluded 2 Name: count, dtype: int64 cohort genomes for feature extraction: 59
2. Pull cohort gene → cluster mapping¶
Filter the 1B-row gene table by genome_id IN (cohort), then join gene_genecluster_junction to get cluster IDs.
# Push cohort genomes into a temp view (cleaner than IN clause for ~50 IDs, but IN works fine here)
genome_in = ','.join([f"'{g}'" for g in cohort_genomes])
gene_cluster_df = spark.sql(f"""
SELECT g.genome_id, j.gene_cluster_id
FROM kbase_ke_pangenome.gene g
JOIN kbase_ke_pangenome.gene_genecluster_junction j
ON g.gene_id = j.gene_id
WHERE g.genome_id IN ({genome_in})
""")
gene_cluster_df.cache()
n_pairs = gene_cluster_df.count()
n_genomes_with_clusters = gene_cluster_df.select('genome_id').distinct().count()
n_unique_clusters = gene_cluster_df.select('gene_cluster_id').distinct().count()
print(f'gene-cluster pairs: {n_pairs}')
print(f'genomes with cluster data: {n_genomes_with_clusters} of {len(cohort_genomes)}')
print(f'unique cluster IDs: {n_unique_clusters}')
gene-cluster pairs: 298093 genomes with cluster data: 59 of 59 unique cluster IDs: 222266
3. Pull eggnog annotations for cohort clusters¶
Filter eggnog_mapper_annotations to only cohort clusters via temp view + join.
gene_cluster_df.select('gene_cluster_id').distinct().createOrReplaceTempView('cohort_clusters')
ann_df = spark.sql("""
SELECT a.query_name AS gene_cluster_id,
a.KEGG_ko, a.COG_category, a.PFAMs, a.EC, a.Preferred_name
FROM kbase_ke_pangenome.eggnog_mapper_annotations a
JOIN cohort_clusters c
ON a.query_name = c.gene_cluster_id
""")
ann_df.cache()
n_ann = ann_df.count()
print(f'eggnog annotations for cohort clusters: {n_ann}')
eggnog annotations for cohort clusters: 195677
4. Define KEGG marker sets and compute per-genome marker presence¶
# KEGG ko strings in eggnog are like 'ko:K00198' or comma-separated 'ko:K00198,ko:K00197'
MARKERS = {
'WL_acsA': 'K00198', # CO dehydrogenase / acetyl-CoA synthase alpha
'WL_acsB': 'K14138', # acetyl-CoA synthase beta
'WL_acsC': 'K00197', # acetyl-CoA synthase gamma
'WL_acsD': 'K15022', # acetyl-CoA synthase delta
'WL_acsE': 'K15023', # methyltransferase
'NiFe_hyaA': 'K06281', # group 1 [NiFe]-hydrogenase small subunit
'NiFe_hyaB': 'K06282', # group 1 [NiFe]-hydrogenase large subunit
'SR_dsrA': 'K11180',
'SR_dsrB': 'K11181',
'SR_aprA': 'K00394',
'SR_aprB': 'K00395',
'SR_sat': 'K00958',
'IR_omcS': 'K07811',
'IR_mtrC': 'K17324',
'IR_mtrA': 'K17323',
'NifH': 'K02588',
'NifD': 'K02586',
'NifK': 'K02591',
}
# Build a (cluster_id × marker) presence matrix from eggnog annotations
# Search KEGG_ko string for each KO; any match → marker present at cluster level
ann_pdf = ann_df.toPandas()
ann_pdf['KEGG_ko'] = ann_pdf['KEGG_ko'].fillna('')
for marker, ko in MARKERS.items():
ann_pdf[f'has_{marker}'] = ann_pdf['KEGG_ko'].str.contains(ko, regex=False)
marker_cols = [f'has_{m}' for m in MARKERS]
# Aggregate to per-cluster (any annotation row hitting the marker)
cluster_markers = ann_pdf.groupby('gene_cluster_id')[marker_cols].any().reset_index()
print(f'cluster_markers shape: {cluster_markers.shape}')
print('\nmarker counts at cluster level:')
print(cluster_markers[marker_cols].sum())
cluster_markers shape: (195677, 19) marker counts at cluster level: has_WL_acsA 9 has_WL_acsB 3 has_WL_acsC 4 has_WL_acsD 2 has_WL_acsE 3 has_NiFe_hyaA 12 has_NiFe_hyaB 11 has_SR_dsrA 4 has_SR_dsrB 4 has_SR_aprA 9 has_SR_aprB 5 has_SR_sat 18 has_IR_omcS 0 has_IR_mtrC 12 has_IR_mtrA 13 has_NifH 11 has_NifD 10 has_NifK 10 dtype: int64
5. Aggregate to per-genome marker booleans¶
# Join genome-cluster pairs to cluster_markers to get per-genome presence
gc_pdf = gene_cluster_df.toPandas().drop_duplicates()
merged = gc_pdf.merge(cluster_markers, on='gene_cluster_id', how='left')
for c in marker_cols:
merged[c] = merged[c].fillna(False)
genome_markers = merged.groupby('genome_id')[marker_cols].any().reset_index()
# Module-completeness: count of sub-markers present
WL_cols = [f'has_WL_{x}' for x in ['acsA','acsB','acsC','acsD','acsE']]
NiFe_cols = [f'has_NiFe_{x}' for x in ['hyaA','hyaB']]
SR_cols = [f'has_SR_{x}' for x in ['dsrA','dsrB','aprA','aprB','sat']]
IR_cols = [f'has_IR_{x}' for x in ['omcS','mtrC','mtrA']]
Nif_cols = ['has_NifH','has_NifD','has_NifK']
genome_markers['WL_count'] = genome_markers[WL_cols].sum(axis=1)
genome_markers['NiFe_count'] = genome_markers[NiFe_cols].sum(axis=1)
genome_markers['SR_count'] = genome_markers[SR_cols].sum(axis=1)
genome_markers['IR_count'] = genome_markers[IR_cols].sum(axis=1)
genome_markers['Nif_count'] = genome_markers[Nif_cols].sum(axis=1)
# Module-complete booleans (≥3/5 WL, ≥1/2 NiFe, ≥3/5 SR, ≥1/3 IR, ≥2/3 Nif per plan)
genome_markers['WL_complete'] = genome_markers['WL_count'] >= 3
genome_markers['NiFe_complete'] = genome_markers['NiFe_count'] >= 1
genome_markers['SR_complete'] = genome_markers['SR_count'] >= 3
genome_markers['IR_complete'] = genome_markers['IR_count'] >= 1
genome_markers['Nif_complete'] = genome_markers['Nif_count'] >= 2
# Join the toolkit score (H2 metric): how many of WL+NiFe+SR are complete?
genome_markers['toolkit_score'] = (
genome_markers['WL_complete'].astype(int)
+ genome_markers['NiFe_complete'].astype(int)
+ genome_markers['SR_complete'].astype(int)
)
print(f'genome_markers shape: {genome_markers.shape}')
print('\nmodule completeness counts (across cohort):')
for c in ['WL_complete','NiFe_complete','SR_complete','IR_complete','Nif_complete']:
print(f' {c}: {genome_markers[c].sum()}/{len(genome_markers)}')
print('\ntoolkit score distribution:')
print(genome_markers['toolkit_score'].value_counts().sort_index())
genome_markers shape: (59, 30) module completeness counts (across cohort): WL_complete: 5/59 NiFe_complete: 8/59 SR_complete: 5/59 IR_complete: 21/59 Nif_complete: 16/59 toolkit score distribution: toolkit_score 0 51 1 3 3 5 Name: count, dtype: int64
6. GapMind amino-acid pathway completeness (H1 self-sufficiency metric)¶
Two-stage aggregation per docs/performance.md (pangenome_pathway_geography pattern). Pull only metabolic_category = 'amino_acid' and our cohort genomes.
# GapMind genome_ids drop the GB_/RS_ prefix used in pangenome.genome.
# (BERDL pitfall — documented in docs/pitfalls.md.)
# metabolic_category is 'aa' (not 'amino_acid').
import re
def strip_prefix(g):
return re.sub(r'^(GB_|RS_)', '', g)
gapmind_genome_ids = [strip_prefix(g) for g in cohort_genomes]
gapmind_in = ','.join([f"'{g}'" for g in gapmind_genome_ids])
gapmind_df = spark.sql(f"""
WITH best_per_pathway AS (
SELECT genome_id, pathway,
MAX(score_simplified) AS best_score
FROM kbase_ke_pangenome.gapmind_pathways
WHERE genome_id IN ({gapmind_in})
AND metabolic_category = 'aa'
GROUP BY genome_id, pathway
)
SELECT genome_id AS gapmind_genome_id,
COUNT(*) AS aa_pathway_total,
SUM(CASE WHEN best_score >= 1 THEN 1 ELSE 0 END) AS aa_pathway_complete
FROM best_per_pathway
GROUP BY genome_id
""")
gapmind_pdf = gapmind_df.toPandas()
# Re-attach the GB_/RS_ prefix for the join: we look up which prefix each gapmind_genome_id had in cohort
prefix_map = {strip_prefix(g): g for g in cohort_genomes}
gapmind_pdf['genome_id'] = gapmind_pdf['gapmind_genome_id'].map(prefix_map)
print(f'genomes with gapmind data: {len(gapmind_pdf)}')
print(gapmind_pdf[['genome_id', 'aa_pathway_total', 'aa_pathway_complete']].describe())
genomes with gapmind data: 59
aa_pathway_total aa_pathway_complete
count 59.0 59.000000
mean 18.0 17.118644
std 0.0 1.912615
min 18.0 9.000000
25% 18.0 17.000000
50% 18.0 18.000000
75% 18.0 18.000000
max 18.0 18.000000
7. Combine into final per-genome features and write parquet¶
# Merge all features back to cohort
features = (
cohort.merge(genome_markers, on='genome_id', how='left')
.merge(gapmind_pdf, on='genome_id', how='left')
)
# Fill NaN booleans/counts for genomes with no annotation/pathway data
for c in marker_cols + ['WL_complete','NiFe_complete','SR_complete','IR_complete','Nif_complete']:
features[c] = features[c].fillna(False)
for c in ['WL_count','NiFe_count','SR_count','IR_count','Nif_count','toolkit_score','aa_pathway_total','aa_pathway_complete']:
features[c] = features[c].fillna(0)
out_path = DATA_DIR / 'genome_features.parquet'
features.to_parquet(out_path, index=False)
print(f'wrote {len(features)} rows × {len(features.columns)} cols to {out_path}')
# Per-cohort summary of toolkit and self-sufficiency
summary = (
features.groupby('cohort_class')
.agg(
n_genomes=('genome_id', 'count'),
WL_complete_rate=('WL_complete', 'mean'),
NiFe_complete_rate=('NiFe_complete', 'mean'),
SR_complete_rate=('SR_complete', 'mean'),
IR_complete_rate=('IR_complete', 'mean'),
Nif_complete_rate=('Nif_complete', 'mean'),
mean_toolkit_score=('toolkit_score', 'mean'),
mean_aa_pathway_complete=('aa_pathway_complete', 'mean'),
)
.reset_index()
)
summary
wrote 61 rows × 52 cols to ../data/genome_features.parquet
| cohort_class | n_genomes | WL_complete_rate | NiFe_complete_rate | SR_complete_rate | IR_complete_rate | Nif_complete_rate | mean_toolkit_score | mean_aa_pathway_complete | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | anchor_deep | 9 | 0.555556 | 0.777778 | 0.555556 | 0.111111 | 0.444444 | 1.888889 | 16.222222 |
| 1 | anchor_shallow | 30 | 0.0 | 0.033333 | 0.0 | 0.5 | 0.366667 | 0.033333 | 17.866667 |
| 2 | excluded | 2 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.000000 | 0.000000 |
| 3 | unclassified | 20 | 0.0 | 0.0 | 0.0 | 0.25 | 0.05 | 0.000000 | 16.400000 |