02 Extract Features
Jupyter notebook from the Harvard Forest Long-Term Warming — DNA vs RNA Functional Response project.
NB02 — Extract per-sample feature tables (Spark)¶
Project: Harvard Forest long-term warming, DNA vs RNA functional response
Goal¶
Build per-sample feature tables for downstream analysis:
- KEGG KO counts per sample — DNA (metagenome) and RNA (metatranscriptome)
- Pfam domain counts per sample — DNA only (RNA Pfam not in pipeline outputs)
- Kraken2 read-based taxonomy abundances per sample (phylum + genus)
- GTDB-Tk MAG taxonomy per sample
- ChEBI metabolite identifications per sample
All filtered through the workflow_runs.tsv produced by NB01.
In [1]:
import os
import pandas as pd
from pyspark.sql import functions as F
spark = get_spark_session()
STUDY_ID = 'nmdc:sty-11-8ws97026'
DATA_DIR = os.path.abspath('../data')
FIG_DIR = os.path.abspath('../figures')
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(FIG_DIR, exist_ok=True)
# Load NB01 outputs
wf = pd.read_csv(os.path.join(DATA_DIR, 'workflow_runs.tsv'), sep='\t')
design = pd.read_csv(os.path.join(DATA_DIR, 'sample_design.tsv'), sep='\t')
print(f'Workflow runs: {len(wf)}')
print(f'Samples: {len(design)}')
# Register a Spark view for the workflow run table (used as filter in joins)
wf_spark = spark.createDataFrame(wf)
wf_spark.createOrReplaceTempView('hf_wf')
Workflow runs: 477 Samples: 42
1. KEGG KO counts per sample, by source (DNA / RNA)¶
We aggregate annotation_kegg_orthology rows by (biosample_id, source, annotation_id) and count gene-level hits as a proxy for KO abundance.
Source = DNA when the workflow type is MetagenomeAnnotation, RNA when MetatranscriptomeAnnotation.
In [2]:
ko_counts = spark.sql("""
SELECT bw.biosample_id,
CASE
WHEN bw.workflow_type = 'nmdc:MetagenomeAnnotation' THEN 'DNA'
WHEN bw.workflow_type = 'nmdc:MetatranscriptomeAnnotation' THEN 'RNA'
END AS source,
ko.annotation_id AS ko,
COUNT(*) AS gene_count
FROM nmdc_results.annotation_kegg_orthology ko
JOIN hf_wf bw ON ko.workflow_run_id = bw.workflow_run_id
WHERE bw.workflow_type IN ('nmdc:MetagenomeAnnotation', 'nmdc:MetatranscriptomeAnnotation')
AND ko.annotation_id IS NOT NULL
GROUP BY bw.biosample_id, bw.workflow_type, ko.annotation_id
""").toPandas()
# Strip 'KO:' prefix for cleaner downstream use
ko_counts['ko'] = ko_counts['ko'].str.replace('^KO:', '', regex=True)
print(f'KO rows: {len(ko_counts):,}')
print('Per-source row counts:')
print(ko_counts.groupby('source').agg(rows=('gene_count', 'count'),
distinct_kos=('ko', 'nunique'),
distinct_samples=('biosample_id', 'nunique')))
ko_counts.head()
KO rows: 561,330
Per-source row counts:
rows distinct_kos distinct_samples
source
DNA 254048 12863 28
RNA 307282 14302 39
Out[2]:
| biosample_id | source | ko | gene_count | |
|---|---|---|---|---|
| 0 | nmdc:bsm-11-nq2tfm26 | RNA | K04487 | 671 |
| 1 | nmdc:bsm-11-nq2tfm26 | RNA | K03969 | 127 |
| 2 | nmdc:bsm-11-nq2tfm26 | RNA | K01426 | 564 |
| 3 | nmdc:bsm-11-nq2tfm26 | RNA | K10440 | 181 |
| 4 | nmdc:bsm-11-nq2tfm26 | RNA | K04485 | 87 |
In [3]:
ko_path = os.path.join(DATA_DIR, 'ko_counts_by_sample.tsv.gz')
ko_counts.to_csv(ko_path, sep='\t', index=False, compression='gzip')
print(f'Wrote {ko_path}: {len(ko_counts):,} rows')
Wrote /home/cmungall/BERIL-research-observatory/projects/harvard_forest_warming/data/ko_counts_by_sample.tsv.gz: 561,330 rows
2. Pfam domain counts per sample (DNA only)¶
pfam_annotation_gff only links to metagenome workflows. We aggregate by (biosample_id, pfam_accession) and count hits.
In [4]:
pfam_counts = spark.sql("""
SELECT bw.biosample_id,
pf.pfam_accession AS pfam,
COUNT(*) AS hit_count
FROM nmdc_results.pfam_annotation_gff pf
JOIN hf_wf bw ON pf.workflow_run_id = bw.workflow_run_id
WHERE bw.workflow_type = 'nmdc:MetagenomeAnnotation'
AND pf.pfam_accession IS NOT NULL
GROUP BY bw.biosample_id, pf.pfam_accession
""").toPandas()
print(f'Pfam rows: {len(pfam_counts):,}')
print(f'Distinct Pfams: {pfam_counts["pfam"].nunique():,}')
print(f'Distinct samples: {pfam_counts["biosample_id"].nunique()}')
pfam_counts.head()
Pfam rows: 257,988 Distinct Pfams: 12,637 Distinct samples: 28
Out[4]:
| biosample_id | pfam | hit_count | |
|---|---|---|---|
| 0 | nmdc:bsm-11-ha179016 | PF10397 | 633 |
| 1 | nmdc:bsm-11-ha179016 | PF00586 | 2423 |
| 2 | nmdc:bsm-11-ha179016 | PF13442 | 5274 |
| 3 | nmdc:bsm-11-ha179016 | PF00815 | 1398 |
| 4 | nmdc:bsm-11-ha179016 | PF06874 | 24 |
In [5]:
pfam_path = os.path.join(DATA_DIR, 'pfam_counts_by_sample.tsv.gz')
pfam_counts.to_csv(pfam_path, sep='\t', index=False, compression='gzip')
print(f'Wrote {pfam_path}: {len(pfam_counts):,} rows')
Wrote /home/cmungall/BERIL-research-observatory/projects/harvard_forest_warming/data/pfam_counts_by_sample.tsv.gz: 257,988 rows
3. Kraken2 read-based taxonomy per sample¶
Pull phylum and genus level abundances from kraken2_classification_report.
In [6]:
# First check what rank values exist
ranks = spark.sql("""
SELECT k.rank, COUNT(*) as n
FROM nmdc_results.kraken2_classification_report k
JOIN hf_wf bw ON k.workflow_run_id = bw.workflow_run_id
WHERE bw.workflow_type = 'nmdc:ReadBasedTaxonomyAnalysis'
GROUP BY k.rank
ORDER BY n DESC
""").toPandas()
print('Kraken2 rank distribution:')
print(ranks)
Kraken2 rank distribution: rank n 0 S 130838 1 S1 67430 2 G 38357 3 F 11645 4 S2 8730 5 O 5041 6 G1 2898 7 F1 2590 8 C 2268 9 S3 1609 10 P 1204 11 O1 873 12 P1 504 13 F2 480 14 D1 454 15 O2 392 16 D2 367 17 C1 336 18 G2 308 19 C2 196 20 P2 168 21 D 112 22 D3 107 23 F3 84 24 R1 56 25 D4 33 26 K 28 27 K3 28 28 P7 28 29 C3 28 30 C4 28 31 P8 28 32 U 28 33 P3 28 34 P4 28 35 O3 28 36 R 28 37 P5 28 38 K1 28 39 O4 28 40 K2 28 41 P6 28 42 P9 28
In [7]:
kraken_taxa = spark.sql("""
SELECT bw.biosample_id,
k.rank,
k.taxid,
k.name,
k.pct_clade,
k.clade_reads,
k.direct_reads
FROM nmdc_results.kraken2_classification_report k
JOIN hf_wf bw ON k.workflow_run_id = bw.workflow_run_id
WHERE bw.workflow_type = 'nmdc:ReadBasedTaxonomyAnalysis'
AND k.rank IN ('D', 'P', 'C', 'O', 'F', 'G', 'S')
""").toPandas()
print(f'Rows: {len(kraken_taxa):,}')
print(f'Samples: {kraken_taxa["biosample_id"].nunique()}')
print(kraken_taxa["rank"].value_counts())
kraken_taxa.head()
Rows: 189,465 Samples: 28 rank S 130838 G 38357 F 11645 O 5041 C 2268 P 1204 D 112 Name: count, dtype: int64
Out[7]:
| biosample_id | rank | taxid | name | pct_clade | clade_reads | direct_reads | |
|---|---|---|---|---|---|---|---|
| 0 | nmdc:bsm-11-g61cvw79 | S | 66219 | Lachnoclostridium phytoferme... | 0.0 | 26 | 0 |
| 1 | nmdc:bsm-11-g61cvw79 | S | 712991 | Lachnospiraceae bacterium or... | 0.0 | 383 | 383 |
| 2 | nmdc:bsm-11-g61cvw79 | S | 2109691 | Lachnospiraceae bacterium GAM79 | 0.0 | 333 | 333 |
| 3 | nmdc:bsm-11-g61cvw79 | S | 39491 | [Eubacterium] rectale | 0.0 | 147 | 0 |
| 4 | nmdc:bsm-11-g61cvw79 | S | 2109690 | Lachnospiraceae bacterium Ch... | 0.0 | 107 | 107 |
In [8]:
kraken_path = os.path.join(DATA_DIR, 'kraken2_taxa_by_sample.tsv.gz')
kraken_taxa.to_csv(kraken_path, sep='\t', index=False, compression='gzip')
print(f'Wrote {kraken_path}: {len(kraken_taxa):,} rows')
Wrote /home/cmungall/BERIL-research-observatory/projects/harvard_forest_warming/data/kraken2_taxa_by_sample.tsv.gz: 189,465 rows
4. MAG GTDB-Tk taxonomy per sample¶
In [9]:
mags = spark.sql("""
SELECT bw.biosample_id,
g.name AS mag_name,
g.classification,
g.fastani_reference,
g.fastani_ani,
g.fastani_af,
g.classification_method
FROM nmdc_results.gtdbtk_bacterial_summary g
JOIN hf_wf bw ON g.workflow_run_id = bw.workflow_run_id
WHERE bw.workflow_type = 'nmdc:MagsAnalysis'
""").toPandas()
print(f'MAGs: {len(mags):,}')
print(f'Samples: {mags["biosample_id"].nunique()}')
mags.head()
MAGs: 298 Samples: 28
Out[9]:
| biosample_id | mag_name | classification | fastani_reference | fastani_ani | fastani_af | classification_method | |
|---|---|---|---|---|---|---|---|
| 0 | nmdc:bsm-11-gr59rt72 | bins.13 | d__Bacteria;p__Acidobacteriota;c__Acidobacteri... | GCA_003225465.1 | 95.1 | 0.7 | taxonomic classification defined by topology a... |
| 1 | nmdc:bsm-11-gr59rt72 | bins.17 | d__Bacteria;p__Patescibacteria;c__Microgenomat... | NaN | NaN | NaN | taxonomic classification defined by topology a... |
| 2 | nmdc:bsm-11-gr59rt72 | bins.20 | d__Bacteria;p__Proteobacteria;c__Gammaproteoba... | NaN | NaN | NaN | taxonomic classification defined by topology a... |
| 3 | nmdc:bsm-11-gr59rt72 | bins.21 | d__Bacteria;p__Acidobacteriota;c__Acidobacteri... | NaN | NaN | NaN | taxonomic classification defined by topology a... |
| 4 | nmdc:bsm-11-gr59rt72 | bins.9 | d__Bacteria;p__Bacteroidota;c__Bacteroidia;o__... | NaN | NaN | NaN | ANI |
In [10]:
mags_path = os.path.join(DATA_DIR, 'mags_by_sample.tsv.gz')
mags.to_csv(mags_path, sep='\t', index=False, compression='gzip')
print(f'Wrote {mags_path}: {len(mags):,} rows')
Wrote /home/cmungall/BERIL-research-observatory/projects/harvard_forest_warming/data/mags_by_sample.tsv.gz: 298 rows
5. ChEBI metabolite identifications per sample¶
In [11]:
metabolites = spark.sql("""
SELECT bw.biosample_id,
m.metabolite_identified AS chebi_id,
m.highest_similarity_score AS sim_score
FROM nmdc_metadata.workflow_execution_set_has_metabolite_identifications m
JOIN hf_wf bw ON m.parent_id = bw.workflow_run_id
WHERE bw.workflow_type = 'nmdc:MetabolomicsAnalysis'
""").toPandas()
print(f'Metabolite IDs: {len(metabolites):,}')
print(f'Distinct ChEBI: {metabolites["chebi_id"].nunique():,}')
print(f'Samples with metabolites: {metabolites["biosample_id"].nunique()}')
metabolites.head()
Metabolite IDs: 4,367 Distinct ChEBI: 319 Samples with metabolites: 22
Out[11]:
| biosample_id | chebi_id | sim_score | |
|---|---|---|---|
| 0 | nmdc:bsm-11-042nd237 | chebi:58251 | 0.386905 |
| 1 | nmdc:bsm-11-042nd237 | chebi:32816 | 0.417403 |
| 2 | nmdc:bsm-11-042nd237 | chebi:57972 | 0.811952 |
| 3 | nmdc:bsm-11-042nd237 | chebi:13705 | 0.509040 |
| 4 | nmdc:bsm-11-042nd237 | chebi:16995 | 0.644147 |
In [12]:
metab_path = os.path.join(DATA_DIR, 'metabolite_ids_by_sample.tsv.gz')
metabolites.to_csv(metab_path, sep='\t', index=False, compression='gzip')
print(f'Wrote {metab_path}: {len(metabolites):,} rows')
Wrote /home/cmungall/BERIL-research-observatory/projects/harvard_forest_warming/data/metabolite_ids_by_sample.tsv.gz: 4,367 rows
6. Summary of extracted feature tables¶
In [13]:
summary = pd.DataFrame([
{'table': 'ko_counts_by_sample.tsv.gz',
'rows': len(ko_counts),
'samples': ko_counts['biosample_id'].nunique(),
'features': ko_counts['ko'].nunique()},
{'table': 'pfam_counts_by_sample.tsv.gz',
'rows': len(pfam_counts),
'samples': pfam_counts['biosample_id'].nunique(),
'features': pfam_counts['pfam'].nunique()},
{'table': 'kraken2_taxa_by_sample.tsv.gz',
'rows': len(kraken_taxa),
'samples': kraken_taxa['biosample_id'].nunique(),
'features': kraken_taxa['name'].nunique()},
{'table': 'mags_by_sample.tsv.gz',
'rows': len(mags),
'samples': mags['biosample_id'].nunique(),
'features': mags['classification'].nunique()},
{'table': 'metabolite_ids_by_sample.tsv.gz',
'rows': len(metabolites),
'samples': metabolites['biosample_id'].nunique(),
'features': metabolites['chebi_id'].nunique()},
])
print(summary.to_string(index=False))
table rows samples features
ko_counts_by_sample.tsv.gz 561330 42 15193
pfam_counts_by_sample.tsv.gz 257988 28 12637
kraken2_taxa_by_sample.tsv.gz 189465 28 8603
mags_by_sample.tsv.gz 298 28 122
metabolite_ids_by_sample.tsv.gz 4367 22 319