01 Data Integration
Jupyter notebook from the SSO Subsurface Community Ecology — Spatial Structure, Functional Gradients, and Hydrogeological Drivers project.
NB01: Data Integration & Well Geometry¶
Goal: Build the complete analytical foundation — load all ASV datasets, compute inter-well distances, assign hydrogeological zones, construct community matrices, and extract regional geochemistry context from nearby wells.
Inputs: Pre-extracted parquet files from ENIGMA CORAL + Spark queries for additional data
Outputs: data/well_distances.csv, data/community_matrix_sediment.csv, data/community_matrix_gw.csv, data/nearby_geochem.csv, data/sso_isolates.csv
Environment: BERDL JupyterHub (Spark needed for geochemistry extraction); downstream analysis is local.
import pandas as pd
import numpy as np
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
DATA = Path('../data')
FIG = Path('../figures')
DATA.mkdir(exist_ok=True)
FIG.mkdir(exist_ok=True)
print("Libraries loaded")
1. Load Pre-Extracted ASV Data¶
# Load sediment, groundwater, and sample metadata
sed = pd.read_parquet(DATA / 'sso_sediment_asv.parquet')
gw = pd.read_parquet(DATA / 'sso_groundwater_asv.parquet')
meta = pd.read_parquet(DATA / 'sso_sample_metadata.parquet')
print(f"Sediment ASV: {sed.shape[0]:,} rows, {sed['sample_id'].nunique()} samples, "
f"{sed['location'].nunique()} wells, {sed['asv_id'].nunique():,} ASVs")
print(f"Groundwater ASV: {gw.shape[0]:,} rows, {gw['sample_id'].nunique()} samples, "
f"{gw['location'].nunique()} wells, {gw['asv_id'].nunique():,} ASVs")
print(f"Sample metadata: {meta.shape[0]} rows")
print(f"\nSediment wells: {sorted(sed['location'].unique())}")
print(f"Groundwater wells: {sorted(gw['location'].unique())}")
print(f"Sediment dates: {sed['date'].min()} to {sed['date'].max()}")
print(f"Groundwater dates: {gw['date'].min()} to {gw['date'].max()}")
# Taxonomy completeness summary
for label, df in [('Sediment', sed), ('Groundwater', gw)]:
print(f"\n{label} taxonomy completeness:")
for col in ['phylum', 'class', 'ord', 'family', 'genus', 'species']:
n = df[col].notna().sum()
total = len(df)
print(f" {col:10s}: {n:,}/{total:,} ({100*n/total:.1f}%)")
2. SSO Well Grid Geometry¶
The SSO is a 3x3 grid of boreholes on a hillslope at Oak Ridge Area 3:
- Upper row (uphill): U1, U2, U3
- Middle row: M4, M5, M6
- Lower row (downhill): L7, L8, L9
Columns run roughly east-west; rows run roughly north-south (uphill to downhill).
# Well coordinates from metadata
wells = meta.groupby('location')[['latitude_degree', 'longitude_degree']].first().reset_index()
wells.columns = ['well', 'lat', 'lon']
# Assign grid positions
grid_map = {
'SSO-U1': (2, 0), 'SSO-U2': (2, 1), 'SSO-U3': (2, 2), # Upper = top row
'SSO-M4': (1, 0), 'SSO-M5': (1, 1), 'SSO-M6': (1, 2), # Middle
'SSO-L7': (0, 0), 'SSO-L8': (0, 1), 'SSO-L9': (0, 2), # Lower = bottom row
}
wells['row'] = wells['well'].map(lambda w: grid_map[w][0])
wells['col'] = wells['well'].map(lambda w: grid_map[w][1])
wells['row_label'] = wells['well'].str[4] # U, M, L
print("SSO Well Coordinates and Grid Positions:")
print(wells.sort_values(['row', 'col'], ascending=[False, True]).to_string(index=False))
wells
from scipy.spatial.distance import pdist, squareform
def haversine_meters(lat1, lon1, lat2, lon2):
"""Haversine distance in meters between two points."""
R = 6_371_000 # Earth radius in meters
phi1, phi2 = np.radians(lat1), np.radians(lat2)
dphi = np.radians(lat2 - lat1)
dlam = np.radians(lon2 - lon1)
a = np.sin(dphi/2)**2 + np.cos(phi1)*np.cos(phi2)*np.sin(dlam/2)**2
return 2 * R * np.arctan2(np.sqrt(a), np.sqrt(1-a))
# Compute pairwise geographic distances
n = len(wells)
well_names = wells.sort_values(['row', 'col'], ascending=[False, True])['well'].values
geo_dist = np.zeros((n, n))
well_idx = {w: i for i, w in enumerate(well_names)}
for i, w1 in enumerate(well_names):
r1 = wells[wells['well'] == w1].iloc[0]
for j, w2 in enumerate(well_names):
r2 = wells[wells['well'] == w2].iloc[0]
geo_dist[i, j] = haversine_meters(r1['lat'], r1['lon'], r2['lat'], r2['lon'])
geo_dist_df = pd.DataFrame(geo_dist, index=well_names, columns=well_names).round(2)
geo_dist_df.to_csv(DATA / 'well_distances.csv')
print("Inter-well geographic distances (meters):")
print(geo_dist_df.to_string())
print(f"\nMin non-zero distance: {geo_dist[geo_dist > 0].min():.2f} m")
print(f"Max distance: {geo_dist.max():.2f} m")
print(f"Grid span: ~{geo_dist.max():.1f} m")
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
fig, ax = plt.subplots(1, 1, figsize=(6, 5))
colors = {'U': '#2196F3', 'M': '#4CAF50', 'L': '#F44336'}
for _, w in wells.iterrows():
c = colors[w['row_label']]
ax.scatter(w['col'], w['row'], s=300, c=c, edgecolors='k', zorder=5)
ax.annotate(w['well'].replace('SSO-', ''), (w['col'], w['row']),
ha='center', va='center', fontsize=9, fontweight='bold', zorder=6)
# Draw all pairwise distances as light lines
for i, w1 in wells.iterrows():
for j, w2 in wells.iterrows():
if i < j:
d = haversine_meters(w1['lat'], w1['lon'], w2['lat'], w2['lon'])
# Only draw adjacent pairs
if abs(w1['row'] - w2['row']) + abs(w1['col'] - w2['col']) == 1:
ax.plot([w1['col'], w2['col']], [w1['row'], w2['row']],
'k-', alpha=0.3, linewidth=1)
mx = (w1['col'] + w2['col'])/2
my = (w1['row'] + w2['row'])/2
ax.annotate(f'{d:.1f}m', (mx, my), fontsize=7, ha='center',
va='bottom' if w1['row'] == w2['row'] else 'center',
color='gray')
ax.set_xlabel('Column (W → E)')
ax.set_ylabel('Row (downhill → uphill)')
ax.set_title('SSO Well Grid with Inter-Well Distances')
ax.set_xlim(-0.5, 2.5)
ax.set_ylim(-0.5, 2.5)
ax.set_xticks([0, 1, 2])
ax.set_xticklabels(['Col 1', 'Col 2', 'Col 3'])
ax.set_yticks([0, 1, 2])
ax.set_yticklabels(['Lower', 'Middle', 'Upper'])
handles = [mpatches.Patch(color=c, label=l) for l, c in
[('Upper', '#2196F3'), ('Middle', '#4CAF50'), ('Lower', '#F44336')]]
ax.legend(handles=handles, loc='upper left', fontsize=8)
plt.tight_layout()
plt.savefig(FIG / 'well_grid_geometry.png', dpi=150, bbox_inches='tight')
plt.show()
print("Saved: figures/well_grid_geometry.png")
3. Hydrogeological Zone Assignment¶
Each sediment sample comes from a specific depth with a lithological description. We parse the description to assign hydrogeological zones:
- VZ = Vadose Zone (unsaturated, above water table)
- VSZ = Variably Saturated Zone (transition)
- SZ1 = Saturated Zone 1 (upper saturated)
- SZ2 = Saturated Zone 2 (deeper saturated)
# Parse hydrogeological zone from sediment sample descriptions
sed_meta = meta[meta['material'] == 'sediment'].copy()
def parse_zone(row):
"""Extract zone from sample description + depth.
Priority: explicit zone label > depth + wetness heuristic.
Many samples lack explicit zone labels but have descriptions like
'dry gravel' or 'wet clay' plus a depth measurement.
"""
desc = row['description'] if pd.notna(row['description']) else ''
depth = row['depth_meter']
desc_upper = desc.upper()
# 1. Explicit zone labels (highest confidence)
for zone in ['VSZ', 'SZ2', 'SZ1', 'SZ3', 'VZ']:
if zone in desc_upper:
return zone
# 2. Depth + description heuristic
# VZ: shallow, typically dry. VSZ: transition. SZ: saturated, wet.
is_dry = any(kw in desc_upper for kw in ['DRY', 'PERCHED'])
is_wet = any(kw in desc_upper for kw in ['WET', 'WATER', 'SATURATED'])
if depth < 2.0:
return 'VZ' if is_dry else ('VSZ' if is_wet else 'VZ')
elif depth < 4.0:
return 'VSZ' if not is_wet else 'SZ1'
elif depth < 6.0:
return 'SZ1'
else:
return 'SZ2'
sed_meta['zone'] = sed_meta.apply(parse_zone, axis=1)
print("Sediment samples by well and hydrogeological zone:")
zone_counts = sed_meta.groupby(['location', 'zone']).size().unstack(fill_value=0)
print(zone_counts.to_string())
print(f"\nZone distribution: {sed_meta['zone'].value_counts().to_dict()}")
# Show zone assignments with depth for validation
print("\nZone assignment validation (well, depth, zone, description):")
check = sed_meta.sort_values(['location', 'depth_meter'])[
['location', 'depth_meter', 'zone', 'description']].head(30)
print(check.to_string(index=False))
# Merge zone info into sediment ASV data
sed_zone = sed_meta[['sample_id', 'location', 'depth_meter', 'zone', 'description']].copy()
sed = sed.merge(sed_zone[['sample_id', 'zone']], on='sample_id', how='left')
print(f"\nSediment ASVs with zone assignment: {sed['zone'].notna().sum():,}/{len(sed):,}")
# Depth profile of zones across wells
fig, ax = plt.subplots(figsize=(10, 5))
zone_colors = {'VZ': '#FFC107', 'VSZ': '#FF9800', 'SZ1': '#2196F3', 'SZ2': '#1565C0', 'unknown': '#9E9E9E'}
for i, well in enumerate(sorted(sed_meta['location'].unique())):
wdata = sed_meta[sed_meta['location'] == well].sort_values('depth_meter')
for _, s in wdata.iterrows():
c = zone_colors.get(s['zone'], '#9E9E9E')
ax.barh(i, 0.8, left=s['depth_meter'] - 0.4, height=0.6, color=c, edgecolor='white', linewidth=0.5)
ax.set_yticks(range(len(sorted(sed_meta['location'].unique()))))
ax.set_yticklabels(sorted(sed_meta['location'].unique()))
ax.set_xlabel('Depth (m)')
ax.set_title('Sediment Sample Depths by Well and Hydrogeological Zone')
ax.invert_xaxis()
handles = [mpatches.Patch(color=c, label=z) for z, c in zone_colors.items() if z != 'unknown']
ax.legend(handles=handles, loc='lower left', fontsize=8)
plt.tight_layout()
plt.savefig(FIG / 'depth_zone_profile.png', dpi=150, bbox_inches='tight')
plt.show()
print("Saved: figures/depth_zone_profile.png")
4. Build Community Matrices¶
Construct sample × taxon abundance matrices for downstream beta-diversity analysis.
We build matrices at two resolutions:
- Well-aggregated: Sum abundances across all depths per well (for spatial analysis)
- Sample-level: Each core segment is a separate sample (for depth analysis)
# --- Sediment: Well-aggregated community matrix (ASV level) ---
# Sum abundances across all depths within each well
sed_well_asv = sed.groupby(['location', 'asv_id'])['abundance'].sum().reset_index()
sed_well_matrix = sed_well_asv.pivot(index='location', columns='asv_id', values='abundance').fillna(0)
print(f"Sediment well-aggregated matrix: {sed_well_matrix.shape[0]} wells x {sed_well_matrix.shape[1]:,} ASVs")
print(f"Total reads per well: {sed_well_matrix.sum(axis=1).astype(int).to_dict()}")
# --- Sediment: Sample-level community matrix ---
sed_sample_matrix = sed.pivot_table(
index='sample_id', columns='asv_id', values='abundance', fill_value=0, aggfunc='sum'
)
print(f"\nSediment sample-level matrix: {sed_sample_matrix.shape[0]} samples x {sed_sample_matrix.shape[1]:,} ASVs")
# --- Groundwater: Well-aggregated community matrix ---
gw_well_asv = gw.groupby(['location', 'asv_id'])['abundance'].sum().reset_index()
gw_well_matrix = gw_well_asv.pivot(index='location', columns='asv_id', values='abundance').fillna(0)
print(f"Groundwater well-aggregated matrix: {gw_well_matrix.shape[0]} wells x {gw_well_matrix.shape[1]:,} ASVs")
print(f"Total reads per well: {gw_well_matrix.sum(axis=1).astype(int).to_dict()}")
# --- Groundwater: Sample-level community matrix ---
gw_sample_matrix = gw.pivot_table(
index='sample_id', columns='asv_id', values='abundance', fill_value=0, aggfunc='sum'
)
print(f"\nGroundwater sample-level matrix: {gw_sample_matrix.shape[0]} samples x {gw_sample_matrix.shape[1]:,} ASVs")
# Also build phylum-level matrices for robust analyses
def build_taxon_matrix(df, taxon_col, group_col):
"""Build taxon abundance matrix at specified taxonomic level."""
agg = df.dropna(subset=[taxon_col]).groupby([group_col, taxon_col])['abundance'].sum().reset_index()
mat = agg.pivot(index=group_col, columns=taxon_col, values='abundance').fillna(0)
return mat
sed_well_phylum = build_taxon_matrix(sed, 'phylum', 'location')
sed_well_genus = build_taxon_matrix(sed, 'genus', 'location')
gw_well_phylum = build_taxon_matrix(gw, 'phylum', 'location')
print(f"Sediment phylum matrix: {sed_well_phylum.shape}")
print(f"Sediment genus matrix: {sed_well_genus.shape}")
print(f"Groundwater phylum matrix: {gw_well_phylum.shape}")
# Save community matrices
sed_well_matrix.to_csv(DATA / 'community_matrix_sediment_asv.csv')
sed_sample_matrix.to_csv(DATA / 'community_matrix_sediment_sample.csv')
sed_well_phylum.to_csv(DATA / 'community_matrix_sediment_phylum.csv')
sed_well_genus.to_csv(DATA / 'community_matrix_sediment_genus.csv')
gw_well_matrix.to_csv(DATA / 'community_matrix_gw_asv.csv')
gw_sample_matrix.to_csv(DATA / 'community_matrix_gw_sample.csv')
gw_well_phylum.to_csv(DATA / 'community_matrix_gw_phylum.csv')
# Save sample-zone mapping for NB03
sed_zone.to_csv(DATA / 'sediment_sample_zones.csv', index=False)
print("Saved community matrices and zone data to data/")
5. Regional Geochemistry Context (Spark)¶
SSO-specific geochemistry is not in BERDL, but nearby wells from the 100 Well Survey and 27 Well Survey have metals data. The EU/ED wells are ~90-120 m northeast of the SSO grid. We extract these to establish what the regional geochemical landscape looks like.
Requires BERDL JupyterHub Spark session.
# --- Spark: Extract nearby well geochemistry ---
# This cell requires a BERDL JupyterHub Spark session
spark = get_spark_session()
# First, understand the 100WS metals brick structure
brick10_schema = spark.sql("DESCRIBE enigma_coral.ddt_brick0000010").toPandas()
print("100WS metals brick schema:")
print(brick10_schema.to_string())
# Sample a few rows to understand the data
brick10_sample = spark.sql("SELECT * FROM enigma_coral.ddt_brick0000010 LIMIT 5").toPandas()
print("\nSample rows:")
print(brick10_sample.to_string())
# Get well locations from CORAL to find wells near SSO
sso_center_lat = 35.977156 # approximate SSO grid center
sso_center_lon = -84.273297
# Get all CORAL locations with coordinates
all_locs = spark.sql("""
SELECT location_id, latitude_degree, longitude_degree, region, feature_type
FROM enigma_coral.sdt_location
WHERE latitude_degree IS NOT NULL
AND longitude_degree IS NOT NULL
""").toPandas()
# Compute distance from SSO center
all_locs['dist_from_sso_m'] = all_locs.apply(
lambda r: haversine_meters(sso_center_lat, sso_center_lon,
r['latitude_degree'], r['longitude_degree']), axis=1)
# Wells within 500m of SSO
nearby = all_locs[all_locs['dist_from_sso_m'] < 500].sort_values('dist_from_sso_m')
print(f"Wells within 500m of SSO center: {len(nearby)}")
print(nearby[['location_id', 'latitude_degree', 'longitude_degree',
'dist_from_sso_m', 'feature_type']].to_string(index=False))
# Extract metals data from 100WS brick for nearby wells
# The brick structure may vary -- adapt column filters after inspecting schema above
nearby_well_ids = nearby[~nearby['location_id'].str.startswith('SSO')]['location_id'].tolist()
if nearby_well_ids:
well_filter = "', '".join(nearby_well_ids)
# Try 100WS brick first
try:
geochem_100ws = spark.sql(f"""
SELECT * FROM enigma_coral.ddt_brick0000010
WHERE well IN ('{well_filter}')
OR location IN ('{well_filter}')
OR sample_id LIKE '%EU%' OR sample_id LIKE '%ED%'
""").toPandas()
print(f"100WS metals for nearby wells: {len(geochem_100ws)} rows")
if len(geochem_100ws) > 0:
print(geochem_100ws.head())
except Exception as e:
print(f"100WS query needs adjustment: {e}")
# Fall back to sampling to understand column names
geochem_100ws = spark.sql("SELECT * FROM enigma_coral.ddt_brick0000010 LIMIT 20").toPandas()
print("Column names:", list(geochem_100ws.columns))
print("Sample well values:", geochem_100ws.iloc[:, :5].to_string())
else:
print("No non-SSO wells found nearby")
# After inspecting the brick schema above, adapt this cell to extract and save
# the relevant metals data for nearby wells.
#
# Expected output: a DataFrame with columns like:
# well, date, analyte, value, unit, physiochemical_state
#
# Save to data/nearby_geochem.csv for use in NB06 synthesis
# Placeholder -- fill in after running the schema exploration cells above
# geochem_nearby.to_csv(DATA / 'nearby_geochem.csv', index=False)
# print(f"Saved {len(geochem_nearby)} geochemistry rows for {geochem_nearby['well'].nunique()} nearby wells")
6. SSO Isolate Genomes¶
144 isolates and 18 genomes were sequenced from SSO-M6-C2. These provide ground-truth functional annotation for at least one well, which we can compare against taxonomy-inferred functional profiles.
# Extract SSO isolates and genomes from CORAL
sso_strains = spark.sql("""
SELECT * FROM enigma_coral.sdt_strain
WHERE location LIKE '%SSO%'
""").toPandas()
sso_genomes = spark.sql("""
SELECT * FROM enigma_coral.sdt_genome
WHERE location LIKE '%SSO%'
""").toPandas()
print(f"SSO isolates: {len(sso_strains)}")
print(f"SSO genomes: {len(sso_genomes)}")
if len(sso_strains) > 0:
print(f"\nIsolate source wells: {sso_strains['location'].unique()}")
print(f"Isolate taxonomic assignments:")
for col in ['phylum', 'class', 'order', 'family', 'genus', 'species']:
if col in sso_strains.columns:
vals = sso_strains[col].dropna().unique()
if len(vals) > 0:
print(f" {col}: {vals[:10]}")
if len(sso_genomes) > 0:
print(f"\nGenome IDs:")
print(sso_genomes[['genome_id', 'location']].to_string(index=False) if 'genome_id' in sso_genomes.columns
else sso_genomes.head().to_string())
# Save
sso_strains.to_csv(DATA / 'sso_isolates.csv', index=False)
if len(sso_genomes) > 0:
sso_genomes.to_csv(DATA / 'sso_genomes.csv', index=False)
print(f"\nSaved isolate and genome data to data/")
7. Data Summary & Completeness Check¶
# Summary of all outputs from this notebook
import os
print("=" * 60)
print("NB01 DATA INTEGRATION SUMMARY")
print("=" * 60)
print(f"\n{'Dataset':<45} {'Shape':>15}")
print("-" * 60)
print(f"{'Sediment well×ASV matrix':<45} {str(sed_well_matrix.shape):>15}")
print(f"{'Sediment sample×ASV matrix':<45} {str(sed_sample_matrix.shape):>15}")
print(f"{'Sediment well×phylum matrix':<45} {str(sed_well_phylum.shape):>15}")
print(f"{'Sediment well×genus matrix':<45} {str(sed_well_genus.shape):>15}")
print(f"{'Groundwater well×ASV matrix':<45} {str(gw_well_matrix.shape):>15}")
print(f"{'Groundwater sample×ASV matrix':<45} {str(gw_sample_matrix.shape):>15}")
print(f"{'Groundwater well×phylum matrix':<45} {str(gw_well_phylum.shape):>15}")
print(f"{'Well distances':<45} {str(geo_dist_df.shape):>15}")
print(f"\n{'Key constraints:'}")
print(f" - Groundwater covers only {gw['location'].nunique()}/9 wells")
print(f" - Species-level taxonomy: ~0% (genus is ceiling)")
print(f" - Sediment and groundwater from different years (2023 vs 2024)")
print(f" - SSO geochemistry values not in BERDL (only sample registrations)")
print(f"\nFiles saved to data/:")
for f in sorted(DATA.glob('*.csv')):
size_kb = os.path.getsize(f) / 1024
print(f" {f.name:<50} {size_kb:>8.1f} KB")
Next Steps¶
- NB02: Spatial analysis of sediment communities (Bray-Curtis, Mantel, MDS, Procrustes)
- NB03: Depth stratification analysis (PERMANOVA: zone vs well)
- NB04: Functional inference from taxonomy
- NB05: Groundwater vs sediment comparison
- NB06: Synthesis — integrate all evidence for environmental gradient inference