02 Og Presence
Jupyter notebook from the Subsurface Bacillota_B Specialization project.
NB02 — Genome × eggNOG OG Presence Matrix¶
Goal: For each cohort genome (10 anchor + 62 baseline = 72), pull its gene clusters via gene + gene_genecluster_junction, map each cluster to its eggNOG_OGs annotation, parse the hierarchical OG string to extract the Firmicutes-level OG (eggNOG tax_id 1239 — the cross-species orthology surrogate appropriate for within-Bacillota_B comparisons; bacteria-level is too coarse, family-level varies in resolution per organism). Aggregate to a sparse genome × og_id boolean matrix.
Inputs: data/cohort_assignments.tsv, kbase_ke_pangenome.{gene, gene_genecluster_junction, eggnog_mapper_annotations}.
Output: data/cohort_og_presence.parquet — long format (genome_id, og_id) pairs, plus a per-genome metadata join.
from pathlib import Path
import re
import pandas as pd
from pyspark.sql import functions as F
try:
spark
except NameError:
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
DATA_DIR = Path('../data')
cohort = pd.read_csv(DATA_DIR / 'cohort_assignments.tsv', sep='\t')
cohort_genomes = cohort['genome_id'].dropna().unique().tolist()
print(f'cohort: {len(cohort_genomes)} genomes')
cohort: 72 genomes
1. genome → cluster (BROADCAST temp view of cohort genomes)¶
The performance pattern from docs/pitfalls.md (cofitness_coinheritance lesson): create a small temp view of cohort genome IDs and rely on Spark's broadcast for the join into the 1B-row gene + junction tables.
spark.createDataFrame([(g,) for g in cohort_genomes], ['genome_id']).createOrReplaceTempView('cohort_genomes_v')
gc_df = spark.sql("""
SELECT g.genome_id, j.gene_cluster_id
FROM kbase_ke_pangenome.gene g
JOIN kbase_ke_pangenome.gene_genecluster_junction j ON g.gene_id = j.gene_id
JOIN cohort_genomes_v c ON c.genome_id = g.genome_id
""")
gc_df.cache()
n_pairs = gc_df.count()
n_genomes_with = gc_df.select('genome_id').distinct().count()
n_clusters = gc_df.select('gene_cluster_id').distinct().count()
print(f'gene-cluster pairs: {n_pairs}; genomes covered: {n_genomes_with}/{len(cohort_genomes)}; unique clusters: {n_clusters}')
gene-cluster pairs: 224422; genomes covered: 72/72; unique clusters: 112526
2. cluster → eggNOG_OGs string¶
gc_df.select('gene_cluster_id').distinct().createOrReplaceTempView('cohort_clusters_v')
ann_df = spark.sql("""
SELECT a.query_name AS gene_cluster_id, a.eggNOG_OGs
FROM kbase_ke_pangenome.eggnog_mapper_annotations a
JOIN cohort_clusters_v c ON a.query_name = c.gene_cluster_id
WHERE a.eggNOG_OGs IS NOT NULL
""")
ann_pdf = ann_df.toPandas()
print(f'clusters with eggNOG_OGs: {len(ann_pdf)} / {n_clusters}')
clusters with eggNOG_OGs: 88245 / 112526
3. Parse hierarchical eggNOG_OGs → Firmicutes-level OG¶
Each eggNOG_OGs value is a comma-separated list of OG_id@tax_id|tax_name tokens, e.g.:
COG1249@1|root,COG1249@2|Bacteria,1TP1W@1239|Firmicutes,4HB3K@91061|Bacilli,...
Strategy: pick the most specific OG that is at-or-above the Firmicutes (1239) level. If a Firmicutes-level OG exists, use it; else fall back to Bacteria-level (@2|Bacteria); else root (@1|root). This gives a within-phylum-comparable orthology unit while remaining defined for outliers (e.g., archaeal members).
PREFERRED_TAXIDS = ['1239', '2', '1'] # Firmicutes, Bacteria, root
def pick_og(ogs_str):
if not ogs_str:
return None
tokens = ogs_str.split(',')
by_tax = {}
for tok in tokens:
m = re.match(r'^([^@]+)@(\d+)\|', tok)
if m:
by_tax[m.group(2)] = m.group(1)
for t in PREFERRED_TAXIDS:
if t in by_tax:
return by_tax[t]
return None
ann_pdf['og_id'] = ann_pdf['eggNOG_OGs'].apply(pick_og)
n_with_og = ann_pdf['og_id'].notna().sum()
print(f'clusters with parseable OG: {n_with_og} / {len(ann_pdf)}')
print(f'unique OG IDs in cohort: {ann_pdf["og_id"].nunique()}')
print(ann_pdf[['gene_cluster_id','og_id']].head(5))
clusters with parseable OG: 88192 / 88245
unique OG IDs in cohort: 14109
gene_cluster_id og_id
0 CAKMJW010000012.1_18 1V3YY
1 JAMXHS010000015.1_1059 1TQEG
2 JAMXHS010000015.1_629 1UFH3
3 JAMXHS010000012.1_67 1V3JW
4 JAMXHS010000011.1_277 1V3JW
4. genome × OG presence matrix¶
gc_pdf = gc_df.toPandas().drop_duplicates()
merged = gc_pdf.merge(ann_pdf[['gene_cluster_id','og_id']], on='gene_cluster_id', how='left')
merged = merged.dropna(subset=['og_id'])
presence = merged[['genome_id','og_id']].drop_duplicates().reset_index(drop=True)
presence.to_parquet(DATA_DIR / 'cohort_og_presence.parquet', index=False)
print(f'genome × OG pairs: {len(presence)}')
print(f'genomes covered: {presence["genome_id"].nunique()}; unique OGs: {presence["og_id"].nunique()}')
per_genome_og_count = presence.groupby('genome_id').size().describe()
print('\n=== OG count per genome ===')
print(per_genome_og_count)
# Quick gut check: cohort_class × mean OG count
cohort_summary = (presence.groupby('genome_id').size().reset_index(name='n_ogs')
.merge(cohort[['genome_id','cohort_class']], on='genome_id'))
print('\n=== mean OGs per genome by cohort ===')
print(cohort_summary.groupby('cohort_class')['n_ogs'].agg(['mean','median','count']).round(1))
genome × OG pairs: 156850
genomes covered: 72; unique OGs: 14109
=== OG count per genome ===
count 72.000000
mean 2178.472222
std 439.922560
min 1261.000000
25% 1995.250000
50% 2114.500000
75% 2302.250000
max 3475.000000
dtype: float64
=== mean OGs per genome by cohort ===
mean median count
cohort_class
anchor_deep_clay 2630.1 2747.5 10
soil_baseline 2105.6 2090.5 62