Miyu Horiuchi Claude Opus 4.7 (1M context) commited on
Commit
3d34be9
·
1 Parent(s): 0fbea89

Phase E scaffolding: MediaDive integration + strain↔medium links

Browse files

Building toward predicting cultivation media composition from genome (the
actual research deliverable, not just summary stats like T_opt).

src/microbe_model/data/mediadive.py:
- parse_strain_media_links: extracts (medium_id, growth) tuples from a cached
BacDive record (BacDive's `Culture and growth conditions → culture medium[]`
embeds inline links to MediaDive, no new API call needed)
- iter_bacdive_strain_media: walks the BacDive cache (100K records on disk)
- MediaDiveClient: polite REST client (0.3s rate limit) for /rest/medium/{id}
- normalize_recipe: flattens a /medium/{id} payload into per-compound rows

scripts/08_extract_strain_media.py: one-shot extraction from local cache.
Result: 38,649 links across 28,704 strains and 1,581 unique media (zero
network calls). All BacDive entries are growth=yes, so we have positive
labels only — negative cultivation results aren't recorded in this field.

scripts/09_fetch_media_recipes.py: fetches each unique medium recipe via
MediaDive REST. Currently running in background, ~14 min ETA. Caches each
medium's JSON to data/mediadive/{id}.json for resumability.

After this lands, we'll have:
- data/strain_media.parquet — strain↔medium adjacency
- data/media_recipes.parquet — per-(medium, compound) rows with amounts
- data/media_metadata.parquet — medium-level (name, pH range, source)

Top-used media are unsurprising: DSMZ Medium 65 (Streptomyces, 3789 strains),
Medium 92 (TSB, 3053), Medium 9 (Myxococcus, 2850). Long-tail of niche media
provides the prediction signal.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

scripts/08_extract_strain_media.py ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Build data/strain_media.parquet by walking the BacDive cache.
2
+
3
+ No network calls — pure local extraction from cached records.
4
+ Output: one row per (bacdive_id, medium_id) link.
5
+ """
6
+ from __future__ import annotations
7
+
8
+ import pandas as pd
9
+ from tqdm import tqdm
10
+
11
+ from microbe_model import config
12
+ from microbe_model.data.mediadive import iter_bacdive_strain_media
13
+
14
+
15
+ def main() -> None:
16
+ rows = []
17
+ n_files = sum(1 for _ in config.BACDIVE_DIR.glob("*.json"))
18
+ print(f"Walking {n_files:,} BacDive cached records...")
19
+ for row in tqdm(iter_bacdive_strain_media(), total=n_files, unit="link"):
20
+ rows.append(row)
21
+
22
+ df = pd.DataFrame(rows)
23
+ out = config.DATA / "strain_media.parquet"
24
+ df.to_parquet(out, index=False)
25
+
26
+ print(f"\nWrote {len(df):,} strain↔medium links to {out}")
27
+ print(f" unique strains: {df['bacdive_id'].nunique():,}")
28
+ print(f" unique media: {df['medium_id'].nunique():,}")
29
+ print(f" growth=yes: {(df['growth'] == 'yes').sum():,}")
30
+ print(f" growth=no: {(df['growth'] == 'no').sum():,}")
31
+ print(f" growth=weak: {(df['growth'] == 'weak').sum():,}")
32
+ print("\nTop 10 most-used media:")
33
+ print(df.groupby(['medium_id', 'medium_name']).size().sort_values(ascending=False).head(10))
34
+
35
+
36
+ if __name__ == "__main__":
37
+ main()
scripts/09_fetch_media_recipes.py ADDED
@@ -0,0 +1,81 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Fetch full recipes for every medium referenced in data/strain_media.parquet.
2
+
3
+ Sequential, polite — MediaDive is a small public API. ~25 min for ~1,500 media at
4
+ 0.3s/call. Outputs:
5
+ - data/media_metadata.parquet — one row per medium (name, pH range, source, etc.)
6
+ - data/media_recipes.parquet — one row per (medium_id, compound_id) recipe entry
7
+ """
8
+ from __future__ import annotations
9
+
10
+ import json
11
+
12
+ import pandas as pd
13
+ from tqdm import tqdm
14
+
15
+ from microbe_model import config
16
+ from microbe_model.data.mediadive import MediaDiveClient, normalize_recipe
17
+
18
+
19
+ def main() -> None:
20
+ links_path = config.DATA / "strain_media.parquet"
21
+ if not links_path.exists():
22
+ raise SystemExit(f"Missing {links_path}. Run scripts/08_extract_strain_media.py first.")
23
+ links = pd.read_parquet(links_path)
24
+
25
+ medium_ids = sorted(links["medium_id"].dropna().unique().tolist())
26
+ print(f"Fetching {len(medium_ids):,} unique medium recipes from MediaDive")
27
+
28
+ cache_dir = config.DATA / "mediadive"
29
+ cache_dir.mkdir(parents=True, exist_ok=True)
30
+
31
+ client = MediaDiveClient()
32
+ metadata_rows = []
33
+ recipe_rows = []
34
+ failed: list[str] = []
35
+
36
+ for mid in tqdm(medium_ids, desc="MediaDive", unit="medium"):
37
+ cache_path = cache_dir / f"{mid}.json"
38
+ if cache_path.exists():
39
+ payload = json.loads(cache_path.read_text())
40
+ else:
41
+ payload = client.fetch_medium(mid)
42
+ if payload is None:
43
+ failed.append(mid)
44
+ continue
45
+ cache_path.write_text(json.dumps(payload))
46
+
47
+ medium = payload.get("medium") or {}
48
+ metadata_rows.append({
49
+ "medium_id": str(mid),
50
+ "name": medium.get("name"),
51
+ "complex_medium": medium.get("complex_medium"),
52
+ "min_pH": medium.get("min_pH"),
53
+ "max_pH": medium.get("max_pH"),
54
+ "source": medium.get("source"),
55
+ "link": medium.get("link"),
56
+ "n_solutions": len(payload.get("solutions") or []),
57
+ })
58
+ recipe_rows.extend(normalize_recipe(payload))
59
+
60
+ md = pd.DataFrame(metadata_rows)
61
+ rc = pd.DataFrame(recipe_rows)
62
+ md.to_parquet(config.DATA / "media_metadata.parquet", index=False)
63
+ rc.to_parquet(config.DATA / "media_recipes.parquet", index=False)
64
+
65
+ print(f"\nWrote {len(md):,} media to media_metadata.parquet")
66
+ print(f"Wrote {len(rc):,} compound-rows to media_recipes.parquet")
67
+ print(f"Failed to fetch: {len(failed)}")
68
+ if rc.empty:
69
+ return
70
+
71
+ print(f"\nUnique compounds: {rc['compound'].nunique():,}")
72
+ print("Top 15 most-used compounds across all recipes:")
73
+ top = (rc.groupby("compound")
74
+ .agg(n_media=("medium_id", "nunique"), median_g_l=("g_l", "median"))
75
+ .sort_values("n_media", ascending=False)
76
+ .head(15))
77
+ print(top.to_string())
78
+
79
+
80
+ if __name__ == "__main__":
81
+ main()
src/microbe_model/data/mediadive.py ADDED
@@ -0,0 +1,148 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """MediaDive (DSMZ) integration — strain↔medium links and full recipes.
2
+
3
+ The BacDive v2 records we already cached include inline medium links of the form
4
+ ``https://mediadive.dsmz.de/medium/{id}`` plus a `growth: yes/no` flag. So extracting
5
+ strain↔medium pairs needs no new API calls. The medium *recipes* (compound list
6
+ with amounts) do need network access via MediaDive's REST API.
7
+
8
+ API documentation observed live on 2026-04-27:
9
+ - /rest/medium/{id} → full recipe with solutions[].recipe[] (compound + amount + unit + g_l)
10
+ - /rest/media → paginated list of all media (limit + offset)
11
+ - /rest/medium-strains/{id} → strains linked to a medium (with bacdive_id)
12
+ """
13
+ from __future__ import annotations
14
+
15
+ import json
16
+ import re
17
+ import time
18
+ from collections.abc import Iterator
19
+ from pathlib import Path
20
+ from typing import Any
21
+
22
+ import requests
23
+
24
+ from microbe_model import config
25
+
26
+ BASE_URL = "https://mediadive.dsmz.de/rest"
27
+ RATE_LIMIT_S = 0.3 # be polite to a small public API
28
+
29
+
30
+ def _extract_medium_id(link: str | None) -> str | None:
31
+ if not link:
32
+ return None
33
+ m = re.search(r"/medium/([A-Za-z0-9]+)", link)
34
+ return m.group(1) if m else None
35
+
36
+
37
+ def parse_strain_media_links(record: dict[str, Any]) -> list[dict[str, Any]]:
38
+ """Return a list of {medium_id, medium_name, growth} for each medium in a BacDive record."""
39
+ culture = record.get("Culture and growth conditions") or {}
40
+ raw = culture.get("culture medium") or []
41
+ if isinstance(raw, dict):
42
+ raw = [raw]
43
+
44
+ out: list[dict[str, Any]] = []
45
+ for m in raw:
46
+ if not isinstance(m, dict):
47
+ continue
48
+ medium_id = _extract_medium_id(m.get("link"))
49
+ if not medium_id:
50
+ continue
51
+ growth = (m.get("growth") or "").strip().lower()
52
+ out.append({
53
+ "medium_id": str(medium_id),
54
+ "medium_name": m.get("name"),
55
+ "growth": growth, # "yes", "no", "weak", or ""
56
+ })
57
+ return out
58
+
59
+
60
+ def iter_bacdive_strain_media(cache_dir: Path | None = None) -> Iterator[dict[str, Any]]:
61
+ """Walk the BacDive cache and yield {bacdive_id, medium_id, medium_name, growth} rows."""
62
+ cache_dir = cache_dir or config.BACDIVE_DIR
63
+ for path in cache_dir.glob("*.json"):
64
+ try:
65
+ record = json.loads(path.read_text())
66
+ except json.JSONDecodeError:
67
+ continue
68
+ try:
69
+ bid = int(path.stem)
70
+ except ValueError:
71
+ continue
72
+ for link in parse_strain_media_links(record):
73
+ yield {
74
+ "bacdive_id": bid,
75
+ "medium_id": link["medium_id"],
76
+ "medium_name": link["medium_name"],
77
+ "growth": link["growth"],
78
+ }
79
+
80
+
81
+ class MediaDiveClient:
82
+ """Polite REST client for MediaDive — 0.3s sleep between calls by default."""
83
+
84
+ def __init__(self, *, rate_limit_s: float = RATE_LIMIT_S) -> None:
85
+ self.session = requests.Session()
86
+ self.rate_limit_s = rate_limit_s
87
+
88
+ def _get(self, path: str, params: dict | None = None) -> dict[str, Any]:
89
+ time.sleep(self.rate_limit_s)
90
+ url = f"{BASE_URL}{path}"
91
+ for attempt in range(3):
92
+ try:
93
+ resp = self.session.get(url, params=params, timeout=30)
94
+ if resp.status_code in (429, 502, 503):
95
+ time.sleep(2 ** attempt)
96
+ continue
97
+ resp.raise_for_status()
98
+ return resp.json()
99
+ except requests.RequestException:
100
+ if attempt == 2:
101
+ raise
102
+ time.sleep(2 ** attempt)
103
+ return {}
104
+
105
+ def fetch_medium(self, medium_id: str) -> dict[str, Any] | None:
106
+ """Return the full medium record, or None if not found / malformed."""
107
+ try:
108
+ body = self._get(f"/medium/{medium_id}")
109
+ except requests.HTTPError:
110
+ return None
111
+ if body.get("status") != 200:
112
+ return None
113
+ return body.get("data") or None
114
+
115
+ def list_media(self, *, limit: int = 200, offset: int = 0) -> list[dict[str, Any]]:
116
+ body = self._get("/media", params={"limit": limit, "offset": offset})
117
+ return body.get("data") or []
118
+
119
+
120
+ def normalize_recipe(medium_payload: dict[str, Any]) -> list[dict[str, Any]]:
121
+ """Flatten a /medium/{id} payload into per-compound rows.
122
+
123
+ Each row: {medium_id, solution_name, compound_id, compound, amount, unit, g_l, optional}.
124
+ Skips compounds with no g_l / amount.
125
+ """
126
+ medium = medium_payload.get("medium") or {}
127
+ medium_id = str(medium.get("id", ""))
128
+ rows: list[dict[str, Any]] = []
129
+ for solution in medium_payload.get("solutions") or []:
130
+ sol_name = solution.get("name", "")
131
+ for r in solution.get("recipe") or []:
132
+ if not isinstance(r, dict):
133
+ continue
134
+ compound = r.get("compound")
135
+ if not compound:
136
+ continue
137
+ rows.append({
138
+ "medium_id": medium_id,
139
+ "solution_name": sol_name,
140
+ "compound_id": r.get("compound_id"),
141
+ "compound": compound,
142
+ "amount": r.get("amount"),
143
+ "unit": r.get("unit"),
144
+ "g_l": r.get("g_l"),
145
+ "optional": int(r.get("optional", 0) or 0),
146
+ "condition": r.get("condition"),
147
+ })
148
+ return rows