09 P2 Ko Data Extraction
Jupyter notebook from the Gene Function Ecological Agora project.
NB09 — Phase 2 KO Data Extraction¶
Project: Gene Function Ecological Agora — Innovation Atlas Across the Bacterial Tree
Phase: 2 — Functional Resolution (KO, paralog-explicit)
Purpose: Extract per-species KO presence + paralog counts from eggnog_mapper_annotations for the 18,989 Phase-1B species. Build the UniRef50 → KO projection for the sanity rail. Capture KEGG_Pathway / BRITE annotations per KO for downstream regulatory-vs-metabolic categorization. Produce KO-level control-class flags for NB09b (M18 amplification gate test) and NB10 (full atlas).
Methodology Commitments (RESEARCH_PLAN.md v2.7)¶
- M16 Sankoff parsimony as primary atlas metric — implemented in NB09b, computed against the GTDB-r214 tree topology used in NB08c.
- M17 AMR excluded from positive-control panel — Pseudomonadota detection bias (NB08c Diagnostic D). Positive controls: β-lactamase, class-I CRISPR-Cas, TCS HK. AMR is reported as informational only.
- M18 Hard amplification gate (NB09b) — Cohen's d ≥ 0.3 on Sankoff parsimony for at least one positive HGT control class vs negative housekeeping. Else halt for M11 redesign.
- M19 Cluster-bootstrap on UniRefs at Pfam-family granularity — implemented in NB09b for Cohen's d 95% CIs.
- M20 Independent paralog holdout for producer-null validation — deferred to NB10 (full atlas notebook).
What changes from NB05¶
- Function-class unit is KO instead of UniRef50.
- Paralog count = distinct UniRef90s within KO within species (not within UniRef50).
- Substrate is
eggnog_mapper_annotations.KEGG_ko. The Phase 1A pitfall onPFAMsstoring names not accessions does NOT apply toKEGG_ko— KO IDs are stored canonically. - No UniRef50 sampling — KO is naturally function-aggregated; the full KO set is the working substrate. The UniRef50 → KO projection table (built here) is used by the sanity rail in NB10.
- No eggNOG
PFAMsreliance for control detection — InterProScan accession-based detection (NB05's pattern) is reused at the gene-cluster level, then projected to KO via majority vote.
Inputs¶
data/p1b_full_species.tsv— 18,989 P1B-qualified species reps (CheckM ≥ 90% complete, ≤ 5% contam).kbase_ke_pangenome.gene_cluster— gene cluster definitions per species.kbase_ke_pangenome.eggnog_mapper_annotations— KEGG_ko, KEGG_Pathway, BRITE per gene cluster.kbase_ke_pangenome.bakta_db_xrefs— UniRef90 / UniRef50 mapping per gene cluster.kbase_ke_pangenome.interproscan_domains— Pfam accessions for control detection (TCS HK, β-lactamase, CRISPR-Cas, ribosomal, tRNA-synth, RNAP core).kbase_ke_pangenome.bakta_amr— AMR informational (M17: not a load-bearing positive control).
Outputs¶
data/p2_ko_assignments.parquet— (species_clade_id, ko, n_uniref90_present, n_gene_clusters, is_present)data/p2_uniref50_to_ko_projection.tsv— (uniref50_id, dominant_ko, fraction, n_clusters_total)data/p2_ko_control_classes.tsv— (ko, control_class, n_clusters_with_class, fraction)data/p2_ko_pathway_brite.tsv— (ko, kegg_pathway_ids, brite_ids)data/p2_ko_extraction_log.json
Setup¶
import os, json, time
from pathlib import Path
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
try:
spark # noqa: F821
print("Spark session: pre-injected (JupyterHub notebook)")
except NameError:
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
print("Spark session: created via berdl_notebook_utils")
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
PROJECT_ROOT = Path("/home/aparkin/BERIL-research-observatory/projects/gene_function_ecological_agora")
DATA_DIR = PROJECT_ROOT / "data"
DATA_DIR.mkdir(parents=True, exist_ok=True)
log = {
"phase": "2",
"notebook": "NB09",
"timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
print(json.dumps(log, indent=2))
Spark session: created via berdl_notebook_utils
{
"phase": "2",
"notebook": "NB09",
"timestamp_utc": "2026-04-27T03:56:59Z"
}
Stage 1 — Load Phase 1B species set¶
p1b_species = pd.read_csv(DATA_DIR / "p1b_full_species.tsv", sep="\t")
log["n_species_p1b"] = len(p1b_species)
print(f"Phase 1B species: {len(p1b_species):,}")
# Push to Spark as a temp view for joins
spark.createDataFrame(p1b_species[["gtdb_species_clade_id"]]).createOrReplaceTempView("p2_species_view")
Phase 1B species: 18,989
Stage 2 — Pull gene_clusters for Phase 1B species¶
all_gene_clusters = spark.sql("""
SELECT gc.gene_cluster_id, gc.gtdb_species_clade_id,
gc.is_core, gc.is_auxiliary, gc.is_singleton
FROM kbase_ke_pangenome.gene_cluster gc
JOIN p2_species_view sv ON gc.gtdb_species_clade_id = sv.gtdb_species_clade_id
""")
all_gene_clusters.cache()
n_gc = all_gene_clusters.count()
log["n_gene_clusters"] = int(n_gc)
print(f"Gene clusters in P1B species: {n_gc:,}")
all_gene_clusters.createOrReplaceTempView("p2_gc_view")
Gene clusters in P1B species: 103,629,867
Stage 3 — Pull eggNOG KEGG_ko + KEGG_Pathway + BRITE¶
Format notes (verified via docs/schemas/pangenome.md and NB05 cell 14):
KEGG_kois comma- or semicolon-delimited, withko:prefix. Strip viat.replace("ko:", "").strip().KEGG_Pathwayis comma-delimited; pathway IDs likeko00010,map00010. Keep only theko*entries (KEGG ortholog map IDs).BRITEis comma-delimited BRITE hierarchy codes.
eggnog = spark.sql("""
SELECT query_name AS gene_cluster_id, KEGG_ko, KEGG_Pathway, BRITE, PFAMs
FROM kbase_ke_pangenome.eggnog_mapper_annotations
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner")
eggnog.cache()
n_egg = eggnog.count()
log["n_eggnog_annotated_clusters"] = int(n_egg)
print(f"eggNOG-annotated clusters in P1B: {n_egg:,}")
eggNOG-annotated clusters in P1B: 74,450,297
Stage 4 — Explode KEGG_ko into per-(gene_cluster, KO) rows¶
def parse_kos(ko_str):
if ko_str in (None, "", "-"):
return []
toks = []
for t in ko_str.replace(";", ",").split(","):
t = t.replace("ko:", "").strip()
if t and t.startswith("K") and len(t) == 6:
toks.append(t)
return list(set(toks))
parse_kos_udf = F.udf(parse_kos, ArrayType(StringType()))
gc_to_ko = (
eggnog
.withColumn("ko_list", parse_kos_udf(F.col("KEGG_ko")))
.filter(F.size(F.col("ko_list")) > 0)
.select("gene_cluster_id", F.explode("ko_list").alias("ko"))
)
gc_to_ko.cache()
n_gc_ko = gc_to_ko.count()
log["n_gc_to_ko_rows"] = int(n_gc_ko)
n_kos_unique = gc_to_ko.select("ko").distinct().count()
log["n_unique_kos"] = int(n_kos_unique)
print(f"(gene_cluster, KO) rows: {n_gc_ko:,} | unique KOs: {n_kos_unique:,}")
(gene_cluster, KO) rows: 43,803,890 | unique KOs: 13,062
Stage 5 — UniRef90 mapping per gene cluster (for paralog count)¶
uniref_xrefs = spark.sql("""
SELECT bx.gene_cluster_id, bx.accession
FROM kbase_ke_pangenome.bakta_db_xrefs bx
JOIN p2_gc_view pg ON bx.gene_cluster_id = pg.gene_cluster_id
WHERE bx.db = 'UniRef'
""")
uniref_split = (
uniref_xrefs
.withColumn("uniref_tier", F.regexp_extract(F.col("accession"), r"^(UniRef\d+)_", 1))
)
uniref90 = uniref_split.filter(F.col("uniref_tier") == "UniRef90").select(
"gene_cluster_id", F.col("accession").alias("uniref90_id"),
)
uniref50 = uniref_split.filter(F.col("uniref_tier") == "UniRef50").select(
"gene_cluster_id", F.col("accession").alias("uniref50_id"),
)
uniref90.cache(); uniref50.cache()
log["n_uniref90_rows"] = int(uniref90.count())
log["n_uniref50_rows"] = int(uniref50.count())
print(f"UniRef90 mappings: {log['n_uniref90_rows']:,} | UniRef50 mappings: {log['n_uniref50_rows']:,}")
UniRef90 mappings: 68,381,626 | UniRef50 mappings: 84,175,091
Stage 6 — Per-(species, KO) presence + paralog count¶
ko_with_species = (
gc_to_ko
.join(all_gene_clusters.select("gene_cluster_id", "gtdb_species_clade_id"), on="gene_cluster_id", how="inner")
.join(uniref90, on="gene_cluster_id", how="left")
)
ko_assignments = (
ko_with_species
.groupBy("gtdb_species_clade_id", "ko")
.agg(
F.countDistinct("uniref90_id").alias("n_uniref90_present"),
F.countDistinct("gene_cluster_id").alias("n_gene_clusters"),
)
.withColumn("is_present", (F.col("n_gene_clusters") > 0).cast("boolean"))
)
ko_assignments.cache()
n_assign = ko_assignments.count()
log["n_ko_assignments"] = int(n_assign)
print(f"(species, KO) assignment rows: {n_assign:,}")
(species, KO) assignment rows: 28,008,764
Stage 7 — Control-class flags at gene-cluster level (reuse NB05 pattern)¶
M17: AMR is computed as informational only (not a load-bearing positive control).
TCS_HK_PFAM = {"PF00512", "PF07568", "PF07730", "PF06580", "PF02518", "PF13415", "PF13581"}
TCS_RR_PFAM = {"PF00072", "PF00196", "PF02954", "PF00486"}
BETA_LAC_PFAM = {"PF00144", "PF13354", "PF12706", "PF13483", "PF00768"}
CRISPR_CAS_PFAM = {"PF18557", "PF09704", "PF09827", "PF09455", "PF09659"}
def acc_clause(accs):
return "', '".join(sorted(accs))
ips_tcs = spark.sql(f"""
SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.interproscan_domains
WHERE analysis = 'Pfam' AND signature_acc IN ('{acc_clause(TCS_HK_PFAM | TCS_RR_PFAM)}')
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner").withColumn("is_tcs_hk", F.lit(True))
ips_betalac = spark.sql(f"""
SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.interproscan_domains
WHERE (analysis = 'Pfam' AND signature_acc IN ('{acc_clause(BETA_LAC_PFAM)}'))
OR LOWER(ipr_desc) LIKE '%beta-lactamase%'
OR LOWER(signature_desc) LIKE '%beta-lactamase%'
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner").withColumn("is_betalac", F.lit(True))
ips_crispr = spark.sql(f"""
SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.interproscan_domains
WHERE analysis = 'Pfam' AND signature_acc IN ('{acc_clause(CRISPR_CAS_PFAM)}')
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner").withColumn("is_crispr_cas", F.lit(True))
ips_ribosomal = spark.sql("""
SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.interproscan_domains
WHERE LOWER(ipr_desc) LIKE '%ribosomal protein%' OR LOWER(signature_desc) LIKE '%ribosomal protein%'
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner").withColumn("is_ribosomal", F.lit(True))
ips_trna = spark.sql("""
SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.interproscan_domains
WHERE LOWER(ipr_desc) LIKE '%aminoacyl-trna synthetase%'
OR LOWER(signature_desc) LIKE '%aminoacyl-trna synthetase%'
OR LOWER(signature_desc) LIKE '%trna ligase%'
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner").withColumn("is_trna_synth", F.lit(True))
ips_rnap = spark.sql("""
SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.interproscan_domains
WHERE (LOWER(ipr_desc) LIKE '%rna polymerase%alpha%' OR LOWER(signature_desc) LIKE '%rna polymerase%alpha%')
OR (LOWER(ipr_desc) LIKE '%rna polymerase%beta%' OR LOWER(signature_desc) LIKE '%rna polymerase%beta%')
""").join(all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner").withColumn("is_rnap_core", F.lit(True))
amr_flag = spark.sql("SELECT DISTINCT gene_cluster_id FROM kbase_ke_pangenome.bakta_amr").join(
all_gene_clusters.select("gene_cluster_id"), on="gene_cluster_id", how="inner"
).withColumn("is_amr", F.lit(True))
cluster_flags = (
all_gene_clusters.select("gene_cluster_id")
.join(ips_tcs, on="gene_cluster_id", how="left")
.join(ips_betalac, on="gene_cluster_id", how="left")
.join(ips_crispr, on="gene_cluster_id", how="left")
.join(ips_ribosomal, on="gene_cluster_id", how="left")
.join(ips_trna, on="gene_cluster_id", how="left")
.join(ips_rnap, on="gene_cluster_id", how="left")
.join(amr_flag, on="gene_cluster_id", how="left")
.na.fill({
"is_tcs_hk": False, "is_betalac": False, "is_crispr_cas": False,
"is_ribosomal": False, "is_trna_synth": False, "is_rnap_core": False,
"is_amr": False,
})
)
cluster_flags.cache()
log["control_class_cluster_counts"] = (
cluster_flags.select(
F.sum(F.col("is_tcs_hk").cast("int")).alias("tcs_hk"),
F.sum(F.col("is_betalac").cast("int")).alias("betalac"),
F.sum(F.col("is_crispr_cas").cast("int")).alias("crispr_cas"),
F.sum(F.col("is_ribosomal").cast("int")).alias("ribosomal"),
F.sum(F.col("is_trna_synth").cast("int")).alias("trna_synth"),
F.sum(F.col("is_rnap_core").cast("int")).alias("rnap_core"),
F.sum(F.col("is_amr").cast("int")).alias("amr"),
).toPandas().iloc[0].to_dict()
)
print("Cluster-level control flag counts:")
print(json.dumps(log["control_class_cluster_counts"], indent=2, default=str))
Cluster-level control flag counts:
{
"tcs_hk": 1906521,
"betalac": 610325,
"crispr_cas": 23777,
"ribosomal": 1723951,
"trna_synth": 683431,
"rnap_core": 91836,
"amr": 74651
}
Stage 8 — Project control flags from gene-cluster to KO¶
A KO is assigned to control class X if any of its gene_clusters has the X flag. Report fraction-with-flag for transparency on detection bias.
ko_clusters = gc_to_ko.join(cluster_flags, on="gene_cluster_id", how="inner")
ko_class_agg = (
ko_clusters
.groupBy("ko")
.agg(
F.count("*").alias("n_clusters_total"),
F.sum(F.col("is_tcs_hk").cast("int")).alias("n_tcs_hk"),
F.sum(F.col("is_betalac").cast("int")).alias("n_betalac"),
F.sum(F.col("is_crispr_cas").cast("int")).alias("n_crispr_cas"),
F.sum(F.col("is_ribosomal").cast("int")).alias("n_ribosomal"),
F.sum(F.col("is_trna_synth").cast("int")).alias("n_trna_synth"),
F.sum(F.col("is_rnap_core").cast("int")).alias("n_rnap_core"),
F.sum(F.col("is_amr").cast("int")).alias("n_amr"),
)
)
ko_class_agg.cache()
ko_class_pdf = ko_class_agg.toPandas()
# Class assignment: dominant control class if ≥ 50% of clusters carry the flag, else 'none'.
# At KO level a single dominant class is biologically reasonable (KOs typically encode one function).
CLASS_THRESHOLD = 0.50
def assign_class(row):
n_total = row["n_clusters_total"]
if n_total == 0:
return "none"
fractions = {
"pos_tcs_hk": row["n_tcs_hk"] / n_total,
"pos_betalac": row["n_betalac"] / n_total,
"pos_crispr_cas": row["n_crispr_cas"] / n_total,
"neg_ribosomal": row["n_ribosomal"] / n_total,
"neg_trna_synth": row["n_trna_synth"] / n_total,
"neg_rnap_core": row["n_rnap_core"] / n_total,
"info_amr": row["n_amr"] / n_total,
}
best_class = max(fractions, key=fractions.get)
if fractions[best_class] >= CLASS_THRESHOLD:
return best_class
return "none"
ko_class_pdf["control_class"] = ko_class_pdf.apply(assign_class, axis=1)
ko_class_pdf["class_fraction"] = ko_class_pdf.apply(
lambda r: max(r["n_tcs_hk"], r["n_betalac"], r["n_crispr_cas"],
r["n_ribosomal"], r["n_trna_synth"], r["n_rnap_core"], r["n_amr"]) / max(r["n_clusters_total"], 1),
axis=1,
)
log["ko_class_distribution"] = ko_class_pdf["control_class"].value_counts().to_dict()
print("KO class distribution:")
print(json.dumps(log["ko_class_distribution"], indent=2, default=str))
KO class distribution:
{
"none": 12279,
"pos_tcs_hk": 310,
"neg_ribosomal": 249,
"pos_betalac": 110,
"neg_trna_synth": 54,
"info_amr": 35,
"neg_rnap_core": 19,
"pos_crispr_cas": 6
}
Stage 9 — UniRef50 → dominant KO projection (sanity rail)¶
For each UniRef50, identify the most-frequent KO across its constituent gene_clusters. Used in NB10 as the rail check: each UniRef50-level result rerun with KO-collapsed identity.
uniref50_with_ko = (
uniref50
.join(gc_to_ko, on="gene_cluster_id", how="inner")
.groupBy("uniref50_id", "ko")
.agg(F.countDistinct("gene_cluster_id").alias("n_clusters_with_ko"))
)
uniref50_total_clusters = (
uniref50
.groupBy("uniref50_id")
.agg(F.countDistinct("gene_cluster_id").alias("n_total_clusters"))
)
uniref50_with_ko.createOrReplaceTempView("u50_ko")
uniref50_total_clusters.createOrReplaceTempView("u50_total")
# Rank KOs by count per UniRef50, take top hit
uniref50_dominant = spark.sql("""
WITH ranked AS (
SELECT u50.uniref50_id, u50.ko, u50.n_clusters_with_ko, t.n_total_clusters,
ROW_NUMBER() OVER (PARTITION BY u50.uniref50_id ORDER BY u50.n_clusters_with_ko DESC, u50.ko ASC) AS rk
FROM u50_ko u50
JOIN u50_total t ON t.uniref50_id = u50.uniref50_id
)
SELECT uniref50_id, ko AS dominant_ko,
n_clusters_with_ko / n_total_clusters AS dominant_fraction,
n_total_clusters
FROM ranked
WHERE rk = 1
""")
uniref50_proj_pdf = uniref50_dominant.toPandas()
log["n_uniref50_with_dominant_ko"] = len(uniref50_proj_pdf)
log["uniref50_proj_fraction_quantiles"] = uniref50_proj_pdf["dominant_fraction"].quantile([0.25, 0.5, 0.75, 0.9]).to_dict()
print(f"UniRef50s with dominant KO: {log['n_uniref50_with_dominant_ko']:,}")
print("Dominant-fraction quantiles:", log["uniref50_proj_fraction_quantiles"])
UniRef50s with dominant KO: 3,592,556
Dominant-fraction quantiles: {0.25: 0.9722222222222222, 0.5: 1.0, 0.75: 1.0, 0.9: 1.0}
Stage 10 — KO → KEGG_Pathway / BRITE aggregation¶
Per-KO aggregated KEGG_Pathway and BRITE annotations across its gene_clusters. Categorization (regulatory / metabolic / housekeeping) is deferred to NB11. Here we only retain the raw delimited string lists for downstream parsing.
def parse_pathways(pw_str):
if pw_str in (None, "", "-"):
return []
return [t.strip() for t in pw_str.split(",") if t.strip().startswith("ko")]
def parse_brite(br_str):
if br_str in (None, "", "-"):
return []
return [t.strip() for t in br_str.split(",") if t.strip()]
parse_pw_udf = F.udf(parse_pathways, ArrayType(StringType()))
parse_br_udf = F.udf(parse_brite, ArrayType(StringType()))
gc_pw_br = (
eggnog
.withColumn("pw_list", parse_pw_udf(F.col("KEGG_Pathway")))
.withColumn("br_list", parse_br_udf(F.col("BRITE")))
.select("gene_cluster_id", "pw_list", "br_list")
)
ko_pw_br = (
gc_to_ko
.join(gc_pw_br, on="gene_cluster_id", how="left")
.groupBy("ko")
.agg(
F.array_distinct(F.flatten(F.collect_list("pw_list"))).alias("pathway_ids"),
F.array_distinct(F.flatten(F.collect_list("br_list"))).alias("brite_ids"),
)
)
ko_pw_br_pdf = ko_pw_br.toPandas()
ko_pw_br_pdf["pathway_ids"] = ko_pw_br_pdf["pathway_ids"].apply(
lambda x: ",".join(sorted(set(x))) if x is not None and len(x) > 0 else ""
)
ko_pw_br_pdf["brite_ids"] = ko_pw_br_pdf["brite_ids"].apply(
lambda x: ",".join(sorted(set(x))) if x is not None and len(x) > 0 else ""
)
log["n_kos_with_pathway"] = int((ko_pw_br_pdf["pathway_ids"] != "").sum())
log["n_kos_with_brite"] = int((ko_pw_br_pdf["brite_ids"] != "").sum())
print(f"KOs with KEGG_Pathway: {log['n_kos_with_pathway']:,} | with BRITE: {log['n_kos_with_brite']:,}")
KOs with KEGG_Pathway: 10,289 | with BRITE: 13,056
Stage 11 — Write outputs¶
# 1. (species, KO) assignments — write via Spark to avoid 28M-row driver collect (driver.maxResultSize=1024MB).
# Spark workers share the local FS on JupyterHub, so coalesce + local parquet write is direct (no driver collect).
out_assign_local = str(DATA_DIR / "p2_ko_assignments.parquet")
out_assign_minio = "s3a://cdm-lake/tenant-general-warehouse/microbialdiscoveryforge/projects/gene_function_ecological_agora/data/p2_ko_assignments.parquet"
if os.path.isdir(out_assign_local):
import shutil; shutil.rmtree(out_assign_local)
elif os.path.isfile(out_assign_local):
os.remove(out_assign_local)
try:
# coalesce(8) keeps file count manageable while still parallel-writing
ko_assignments.coalesce(8).write.mode("overwrite").parquet(out_assign_local)
n_local = spark.read.parquet(out_assign_local).count()
log["assign_parquet_local_path"] = out_assign_local
log["assign_parquet_local_row_count"] = int(n_local)
log["assign_parquet_size_mb"] = round(
sum(os.path.getsize(os.path.join(out_assign_local, f)) for f in os.listdir(out_assign_local)) / 1e6, 1
)
print(f"Wrote {out_assign_local} ({log['assign_parquet_size_mb']} MB; {n_local:,} rows)")
except Exception as e_local:
print(f"Spark write to local failed: {e_local!r}; falling back to MinIO")
log["assign_local_write_error"] = repr(e_local)
ko_assignments.write.mode("overwrite").parquet(out_assign_minio)
log["assign_parquet_minio_path"] = out_assign_minio
log["assign_minio_row_count"] = int(spark.read.parquet(out_assign_minio).count())
print(f"Wrote {out_assign_minio} ({log['assign_minio_row_count']:,} rows)")
# 2. UniRef50 → KO projection
uniref50_proj_pdf.to_csv(DATA_DIR / "p2_uniref50_to_ko_projection.tsv", sep="\t", index=False)
print(f"Wrote p2_uniref50_to_ko_projection.tsv: {len(uniref50_proj_pdf):,} rows")
# 3. KO → control class
ko_class_out = ko_class_pdf[["ko", "control_class", "class_fraction", "n_clusters_total"]].sort_values(
["control_class", "n_clusters_total"], ascending=[True, False]
)
ko_class_out.to_csv(DATA_DIR / "p2_ko_control_classes.tsv", sep="\t", index=False)
print(f"Wrote p2_ko_control_classes.tsv: {len(ko_class_out):,} rows")
# 4. KO → pathway / BRITE
ko_pw_br_pdf.sort_values("ko").to_csv(DATA_DIR / "p2_ko_pathway_brite.tsv", sep="\t", index=False)
print(f"Wrote p2_ko_pathway_brite.tsv: {len(ko_pw_br_pdf):,} rows")
# 5. Log
log["completed_utc"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
with open(DATA_DIR / "p2_ko_extraction_log.json", "w") as f:
json.dump(log, f, indent=2, default=str)
print("Wrote p2_ko_extraction_log.json")
Spark write to local failed: AnalysisException('[UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually. SQLSTATE: 42KD9'); falling back to MinIO
Wrote s3a://cdm-lake/tenant-general-warehouse/microbialdiscoveryforge/projects/gene_function_ecological_agora/data/p2_ko_assignments.parquet (28,008,764 rows)
Wrote p2_uniref50_to_ko_projection.tsv: 3,592,556 rows Wrote p2_ko_control_classes.tsv: 13,062 rows Wrote p2_ko_pathway_brite.tsv: 13,062 rows Wrote p2_ko_extraction_log.json
Stage 12 — Panel-class subset for NB09b (driver-collectable)¶
The full p2_ko_assignments.parquet (28M rows, ~1.4 GB) lands on MinIO because Spark Connect workers don't share filesystem with the driver. NB10 will read from MinIO via Spark. NB09b (the M18 gate test) only needs the control-panel KO subset (~750 KOs × ~5K species ≈ 2–5M rows) — small enough for driver collect via toPandas. Filter on Spark, collect, write locally.
PANEL_CLASSES = ["pos_tcs_hk", "pos_betalac", "pos_crispr_cas",
"neg_ribosomal", "neg_trna_synth", "neg_rnap_core"]
panel_kos = ko_class_pdf[ko_class_pdf["control_class"].isin(PANEL_CLASSES)]["ko"].tolist()
log["n_panel_kos"] = len(panel_kos)
print(f"Panel KOs: {len(panel_kos)}")
spark.createDataFrame(pd.DataFrame({"ko": panel_kos})).createOrReplaceTempView("panel_kos_view")
panel_assign_spark = ko_assignments.join(spark.table("panel_kos_view"), on="ko", how="inner")
panel_assign_pdf = panel_assign_spark.toPandas()
panel_assign_clean = pd.DataFrame({col: panel_assign_pdf[col].to_numpy() for col in panel_assign_pdf.columns})
panel_path = DATA_DIR / "p2_ko_assignments_panel.parquet"
if panel_path.exists():
panel_path.unlink()
panel_assign_clean.to_parquet(panel_path, index=False)
log["panel_parquet_local_path"] = str(panel_path)
log["panel_parquet_row_count"] = int(len(panel_assign_clean))
log["panel_parquet_size_mb"] = round(panel_path.stat().st_size / 1e6, 1)
print(f"Wrote {panel_path} ({log['panel_parquet_size_mb']} MB; {len(panel_assign_clean):,} rows)")
# Re-dump log to capture Stage 12 fields
log["completed_utc"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
with open(DATA_DIR / "p2_ko_extraction_log.json", "w") as f:
json.dump(log, f, indent=2, default=str)
Panel KOs: 748
Wrote /home/aparkin/BERIL-research-observatory/projects/gene_function_ecological_agora/data/p2_ko_assignments_panel.parquet (8.2 MB; 2,687,345 rows)
Summary¶
Five outputs land for downstream Phase 2 notebooks:
p2_ko_assignments.parquet— (species, KO) presence + paralog count. Primary substrate for NB10 atlas computation.p2_uniref50_to_ko_projection.tsv— UniRef50 → dominant KO projection. Sanity rail input for NB10.p2_ko_control_classes.tsv— KO → control class assignment. Drives NB09b control-panel subsetting and NB10 atlas color-coding.p2_ko_pathway_brite.tsv— KO → KEGG_Pathway / BRITE annotations. Input for NB11 regulatory-vs-metabolic categorization.p2_ko_extraction_log.json— extraction provenance.
Next: NB09b runs the M18 amplification gate test on the control panel. If d ≥ 0.3 PASS, NB10 produces the full KO atlas; else halt for M11 redesign.