import os, uuid, datetime, traceback from pathlib import Path import html as _py_html import pandas as pd import gradio as gr from huggingface_hub import hf_hub_download from urllib.parse import unquote # add at top # ----------- HF DATASET CONFIG ----------- HF_DATASET_REPO = "akazemian/audio-html" # <-- change if needed INDEX_FILENAME = "index.csv" # ----------------------------------------- DB_PATH = "library.csv" ALLOWED_EXTS = {".html"} # Columns in DB EXTRA_COLS = ["category", "dataset", "hf_path"] # <-- add hf_path here BASE_COLS = ["id","filename","path","tags","keywords","notes","uploaded_at"] ALL_DB_COLS = BASE_COLS + EXTRA_COLS # Columns shown in the table (don't show hf_path) TABLE_COLS = ["id","filename","category","dataset", "tags","keywords","notes","uploaded_at"] # At top-level config HF_INDEX_REPO_ID = "akazemian/audio-library" # where index.csv lives *now* HF_INDEX_REPO_TYPE = "space" # <β€” it's a Space, not a dataset INDEX_FILENAME = "index.csv" from huggingface_hub import hf_hub_download # ---------- DB helpers ---------- def _load_db() -> pd.DataFrame: if os.path.exists(DB_PATH): df = pd.read_csv(DB_PATH) for c in ALL_DB_COLS: if c not in df.columns: df[c] = "" for c in ["tags","keywords","notes","category","dataset","hf_path","path","filename","id","uploaded_at"]: df[c] = df[c].fillna("").astype(str) return df[ALL_DB_COLS] return pd.DataFrame(columns=ALL_DB_COLS) def _save_db(df: pd.DataFrame): df.to_csv(DB_PATH, index=False) # ---------- Table normalizer ---------- def _df_from_table_value(table_value): cols = TABLE_COLS if isinstance(table_value, pd.DataFrame): for c in cols: if c not in table_value.columns: table_value[c] = "" return table_value[cols] if isinstance(table_value, list): if not table_value: return pd.DataFrame(columns=cols) first = table_value[0] if isinstance(first, dict): df = pd.DataFrame(table_value) for c in cols: if c not in df.columns: df[c] = "" return df[cols] else: return pd.DataFrame(table_value, columns=cols) return pd.DataFrame(columns=cols) # ---------- Load HF index ---------- def _load_hf_index() -> pd.DataFrame: """ Download + read index.csv from the HF dataset repo. Required columns: id, filename, relpath, category, dataset, tags, keywords, notes, uploaded_at """ local = hf_hub_download( repo_id=HF_INDEX_REPO_ID, repo_type=HF_INDEX_REPO_TYPE, filename=INDEX_FILENAME, ) df = pd.read_csv(local) for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]: if c not in df.columns: df[c] = "" # normalize types for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]: df[c] = df[c].fillna("").astype(str) return df # ---------- Sync by model (prefix inside HF dataset) ---------- from urllib.parse import unquote # ensure this import exists at top def sync_model(model_name: str): raw = (model_name or "").strip() if not raw: return gr.Info("Please enter a model name."), None, None, None, "", "" # 1) read index from HF and filter to this model prefix (accept raw or URL-decoded) try: idx = _load_hf_index() except Exception as e: traceback.print_exc() return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, "", "" decoded = unquote(raw) rel = idx["relpath"].astype(str) sub = idx[ rel.str.startswith(f"{raw}/") | rel.str.startswith(f"{decoded}/") ] if sub.empty: return gr.Info( f"No HTML files found for model '{raw}'. " "Tip: if you copied from the URL, use '=' instead of '%3D'." ), None, None, None, "", "" # 2) load local DB, backfill hf_path for existing rows of this model (by filename) db = _load_db() if not db.empty: rel_by_fname = dict(zip(sub["filename"].astype(str), sub["relpath"].astype(str))) mask_model_rows = db["filename"].astype(str).isin(rel_by_fname.keys()) if mask_model_rows.any(): db.loc[mask_model_rows, "hf_path"] = db.loc[mask_model_rows, "filename"].map( lambda fn: f"hf://{HF_DATASET_REPO}/{rel_by_fname.get(str(fn), str(fn))}" ) # 3) add any missing rows from HF index now = datetime.datetime.now().isoformat(timespec="seconds") existing_hf = set(db["hf_path"].astype(str)) new_rows = [] for _, r in sub.iterrows(): rp = str(r["relpath"]) hf_uri = f"hf://{HF_DATASET_REPO}/{rp}" if hf_uri in existing_hf: continue # If a row with same filename exists already, we updated its hf_path above; skip adding duplicate if not db[db["filename"].astype(str) == str(r["filename"])].empty: continue new_rows.append({ "id": (str(r["id"]) if str(r.get("id", "")) else uuid.uuid4().hex[:8]), "filename": str(r["filename"]), "path": "", # local path unknown in HF flow "hf_path": hf_uri, "tags": str(r.get("tags", "")), "keywords": str(r.get("keywords", "")), "notes": str(r.get("notes", "")), "uploaded_at": (str(r.get("uploaded_at", "")) or now), "category": str(r.get("category", "")), "dataset": str(r.get("dataset", "")), }) if new_rows: db = pd.concat([db, pd.DataFrame(new_rows)], ignore_index=True) _save_db(db) # Use decoded model for downstream filtering current_model = decoded # outputs: [table, tag_filter, category_filter, dataset_filter, count_md, current_model] return refresh_view("", [], "", "", current_model) + (current_model,) # allow user to paste either "wavcoch_audio-preds-sr=16000" or the URL-encoded "%3D" form # def sync_model(model_name: str): # """ # Load index.csv from HF, add rows for the selected model (by relpath prefix), # store HF URIs in DB, and show only that model’s files. # """ # model_name = (model_name or "").strip() # if not model_name: # return gr.Info("Please enter a model name."), None, None, None, "" # try: # idx = _load_hf_index() # except Exception as e: # traceback.print_exc() # return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, "" # # rows like "{model_name}/.../file.html" # subset = idx[idx["relpath"].str.startswith(model_name + "/")] # if subset.empty: # return gr.Info(f"No HTML files found for model '{model_name}' on {HF_DATASET_REPO}"), None, None, None, "" # df = _load_db() # now = datetime.datetime.now().isoformat(timespec="seconds") # new_rows = [] # for _, r in subset.iterrows(): # relpath = r["relpath"] # hub_uri = f"hf://{HF_DATASET_REPO}/{relpath}" # if (df["path"] == hub_uri).any(): # continue # new_rows.append({ # "id": r["id"] if r["id"] else uuid.uuid4().hex[:8], # "filename": r["filename"], # "path": hub_uri, # store HF URI # "tags": r["tags"], # "keywords": r["keywords"], # "notes": r["notes"], # "uploaded_at": r["uploaded_at"] or now, # "category": r["category"], # "dataset": r["dataset"] # }) # if new_rows: # df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True) # _save_db(df) # current_model = model_name # remember which model prefix is active # return refresh_view("", [], "", "", current_model) + (current_model,) # ---------- Search / filters ---------- def refresh_view(query, tag_filters, category_filter, dataset_filter, current_model): df = _load_db() # scope to current model prefix in HF URI if provided if current_model: prefix = f"hf://{HF_DATASET_REPO}/{current_model}/" df = df[df["path"].astype(str).str.startswith(prefix)] # tag vocabulary all_tags = sorted({t.strip() for s in df["tags"].dropna().astype(str).tolist() for t in s.split(",") if t.strip()}) all_cats = sorted([c for c in df["category"].dropna().astype(str).unique() if c]) all_sets = sorted([c for c in df["dataset"].dropna().astype(str).unique() if c]) # free-text query across filename/tags/keywords/notes/category/dataset if query: q = query.lower() mask = ( df["filename"].str.lower().str.contains(q, na=False) | df["tags"].str.lower().str.contains(q, na=False) | df["keywords"].str.lower().str.contains(q, na=False) | df["notes"].str.lower().str.contains(q, na=False) | df["category"].str.lower().str.contains(q, na=False) | df["dataset"].str.lower().str.contains(q, na=False) ) df = df[mask] # tag filters (AND semantics) for t in (tag_filters or []): df = df[df["tags"].astype(str).apply( lambda s: t in [x.strip() for x in s.split(",") if x.strip()])] # dropdown filters (exact match) if category_filter: df = df[df["category"] == category_filter] if dataset_filter: df = df[df["dataset"] == dataset_filter] df = df.sort_values("uploaded_at", ascending=False).reset_index(drop=True) view = df[TABLE_COLS].copy() count_text = f"**Showing {len(view)} file(s)**" return ( view, gr.update(choices=all_tags), gr.update(choices=[""] + all_cats, value=category_filter or ""), gr.update(choices=[""] + all_sets, value=dataset_filter or ""), count_text ) # ---------- Preview ---------- def _iframe_from_html_string(raw_html: str, height_px: int = 720) -> str: srcdoc = raw_html.replace("&", "&").replace('"', """) return f'' def select_row(evt: gr.SelectData, table_value, source_mode): try: view = _df_from_table_value(table_value) if view.empty: return "No rows.", "" # --- resolve row_idx robustly --- row_idx = None # 1) Preferred: evt.index (int or [int, ...]) ix = getattr(evt, "index", None) if isinstance(ix, int): row_idx = ix elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int): row_idx = ix[0] # 2) Fallbacks: evt.value may be a dict with id, or a list (row values) if row_idx is None: val = getattr(evt, "value", None) if isinstance(val, dict) and "id" in val: hits = view.index[view["id"] == val["id"]].tolist() if hits: row_idx = hits[0] elif isinstance(val, list) and len(val) >= 1: # assume first column is id hits = view.index[view["id"] == val[0]].tolist() if hits: row_idx = hits[0] # 3) Last resort: default to first row if row_idx is None: row_idx = 0 # bounds check if not (0 <= row_idx < len(view)): return "Invalid selection.", "" row = view.iloc[row_idx] sel_id = row["id"] # --- look up the full record from DB --- db = _load_db() rec = db[db["id"] == sel_id] if rec.empty: return "Could not find file for this row.", "" # --- choose source: HF vs Local --- use_hf = (str(source_mode).upper() == "HF") path_str = rec["hf_path"].values[0] if use_hf else rec["path"].values[0] path_str = str(path_str or "") if not path_str: return "No path available for this source.", f"πŸ“„ {row['filename']}" # HF dataset URI β†’ lazy download then iframe from raw HTML if path_str.startswith("hf://"): _, rest = path_str.split("hf://", 1) repo_id, relpath = rest.split("/", 1) local_path = hf_hub_download(repo_id=repo_id, repo_type="dataset", filename=relpath) raw_html = Path(local_path).read_text(encoding="utf-8") iframe = _iframe_from_html_string(raw_html, height_px=720) return iframe, f"πŸ“„ {row['filename']}" # Direct HTTP URL (CDN) β†’ iframe src if path_str.startswith("http"): iframe = f'' return iframe, f"πŸ“„ {row['filename']}" # Local file fallback p = Path(path_str) if not p.exists(): return f"File not found: {_py_html.escape(str(p))}", f"πŸ“„ {row['filename']}" raw_html = p.read_text(encoding="utf-8") iframe = _iframe_from_html_string(raw_html, height_px=720) return iframe, f"πŸ“„ {row['filename']}" except Exception as e: traceback.print_exc() return f"
Failed to render (see terminal):\n{_py_html.escape(str(e))}
", "" # def select_row(evt: gr.SelectData, table_value): # try: # view = _df_from_table_value(table_value) # if view.empty: # return "No rows.", "" # # resolve row # row_idx = None # ix = getattr(evt, "index", None) # if isinstance(ix, int): # row_idx = ix # elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int): # row_idx = ix[0] # if row_idx is None: # val = getattr(evt, "value", None) # if isinstance(val, dict) and "id" in val: # hits = view.index[view["id"] == val["id"]].tolist() # if hits: row_idx = hits[0] # elif isinstance(val, list) and len(val) >= 1: # hits = view.index[view["id"] == val[0]].tolist() # if hits: row_idx = hits[0] # if row_idx is None or not (0 <= row_idx < len(view)): # return "Invalid selection.", "" # row = view.iloc[row_idx] # sel_id = row["id"] # db = _load_db() # rec = db[db["id"] == sel_id] # if rec.empty: # return "Could not find file for this row.", "" # path_str = rec["path"].values[0] # # Hub-backed path β†’ lazy download # if str(path_str).startswith("hf://"): # _, rest = path_str.split("hf://", 1) # repo_id, relpath = rest.split("/", 1) # local_path = hf_hub_download(repo_id=repo_id, repo_type="dataset", filename=relpath) # raw_html = Path(local_path).read_text(encoding="utf-8") # elif str(path_str).startswith("http"): # # if you ever swap to CDN URLs, iframe the URL directly # iframe = f'' # return iframe, f"πŸ“„ {row['filename']}" # else: # # local file fallback (not used for HF flow, kept for compatibility) # p = Path(path_str) # if not p.exists(): # return f"File not found: {_py_html.escape(str(p))}", f"πŸ“„ {row['filename']}" # raw_html = p.read_text(encoding="utf-8") # iframe = _iframe_from_html_string(raw_html, height_px=720) # return iframe, f"πŸ“„ {row['filename']}" # except Exception as e: # traceback.print_exc() # return f"
Failed to render (see terminal):\n{_py_html.escape(str(e))}
", "" # ---------- Save edits ---------- def save_edits(edited_table, current_model): if edited_table is None or not len(edited_table): return gr.Info("Nothing to save.") df_db = _load_db() editable_cols = ["category","dataset","tags","keywords","notes"] for c in editable_cols: edited_table[c] = edited_table[c].fillna("").astype(str) for _, row in edited_table.iterrows(): i = df_db.index[df_db["id"] == row["id"]] if len(i): for c in editable_cols: df_db.at[i[0], c] = row[c] _save_db(df_db) # return refreshed table only (respect current_model scope) return refresh_view("", [], "", "", current_model)[0] # -------------------- UI -------------------- # CSS that targets only the three buttons via elem_id custom_css = """ /* scope styles to only these 3 components */ #sync-btn button, #refresh-btn button, #save-btn button, #sync-btn .gr-button, #refresh-btn .gr-button, #save-btn .gr-button, #sync-btn [role="button"], #refresh-btn [role="button"], #save-btn [role="button"] { background: #f97316 !important; /* orange-500 */ border-color: #f97316 !important; color: #fff !important; } /* hover/active */ #sync-btn button:hover, #refresh-btn button:hover, #save-btn button:hover, #sync-btn .gr-button:hover, #refresh-btn .gr-button:hover, #save-btn .gr-button:hover, #sync-btn [role="button"]:hover, #refresh-btn [role="button"]:hover, #save-btn [role="button"]:hover { background: #ea580c !important; /* orange-600 */ border-color: #ea580c !important; } /* (optional) also set CSS vars in case theme uses them */ #sync-btn, #refresh-btn, #save-btn { --button-primary-background-fill: #f97316; --button-primary-background-fill-hover: #ea580c; --button-text-color: #fff; } """ # with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo: # gr.Markdown("## 🎧 Audio Reconstruction Reports β€” sync β€’ search β€’ view") # current_model = gr.State("") # remembers active model prefix inside HF repo # source_mode = gr.State("HF") # default # with gr.Row(): # with gr.Column(scale=1): # # Choose model & sync # gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}//...`") # model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192") # sync_btn = gr.Button("Sync this model", elem_id="sync-btn") # # Search & filters # gr.Markdown("---\n**Search & filter**") # query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to search…") # tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)") # category_filter = gr.Dropdown(choices=[], label="Category") # dataset_filter = gr.Dropdown(choices=[], label="Dataset") # refresh_btn = gr.Button("Refresh", elem_id="refresh-btn") # with gr.Column(scale=2): # # Count of current view # count_md = gr.Markdown("**Showing 0 file(s)**") # gr.Markdown("**Library** (click a row to preview; edit cells and Save)") # table = gr.Dataframe( # headers=TABLE_COLS, # datatype=["str"] * len(TABLE_COLS), # interactive=True, # wrap=True, # row_count=(0, "dynamic"), # col_count=(len(TABLE_COLS), "fixed") # ) # with gr.Row(): # save_btn = gr.Button("Save Edits", elem_id="save-btn") # preview_label = gr.Markdown("") # preview_html = gr.HTML("") # # wiring: sync (also sets current_model) # sync_btn.click( # sync_model, # [model_in], # [table, tag_filter, category_filter, dataset_filter, count_md, current_model] # ) # # wiring: refresh + live filters (respect current_model) # refresh_btn.click( # refresh_view, # [query, tag_filter, category_filter, dataset_filter, current_model], # [table, tag_filter, category_filter, dataset_filter, count_md] # ) # for comp in (query, tag_filter, category_filter, dataset_filter): # comp.change( # refresh_view, # [query, tag_filter, category_filter, dataset_filter, current_model], # [table, tag_filter, category_filter, dataset_filter, count_md] # ) # table.select(select_row, [table], [preview_html, preview_label]) # save_btn.click(save_edits, [table, current_model], [table]) # # initial load (no model yet) # demo.load( # refresh_view, # [query, tag_filter, category_filter, dataset_filter, current_model], # [table, tag_filter, category_filter, dataset_filter, count_md] # ) # if __name__ == "__main__": # demo.launch(share=True) # auth optional with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo: gr.Markdown("## 🎧 Audio Reconstruction Reports β€” sync β€’ search β€’ view") current_model = gr.State("") # remembers active model prefix inside HF repo source_mode = gr.State("HF") # default with gr.Row(): with gr.Column(scale=1): # Choose model & sync gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}//...`") model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192") sync_btn = gr.Button("Sync this model", elem_id="sync-btn") # Search & filters gr.Markdown("---\n**Search & filter**") query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to search…") tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)") category_filter = gr.Dropdown(choices=[], label="Category") dataset_filter = gr.Dropdown(choices=[], label="Dataset") # πŸ”½ Step 5: Source toggle (HF vs Local) mode_radio = gr.Radio( choices=["HF", "Local"], value="HF", label="Source", info="Preview from HF dataset or local disk" ) refresh_btn = gr.Button("Refresh", elem_id="refresh-btn") with gr.Column(scale=2): # Count of current view count_md = gr.Markdown("**Showing 0 file(s)**") gr.Markdown("**Library** (click a row to preview; edit cells and Save)") table = gr.Dataframe( headers=TABLE_COLS, datatype=["str"] * len(TABLE_COLS), interactive=True, wrap=True, row_count=(0, "dynamic"), col_count=(len(TABLE_COLS), "fixed") ) with gr.Row(): save_btn = gr.Button("Save Edits", elem_id="save-btn") preview_label = gr.Markdown("") preview_html = gr.HTML("") # wiring: sync (also sets current_model) sync_btn.click( sync_model, [model_in], [table, tag_filter, category_filter, dataset_filter, count_md, current_model] ) # wiring: refresh + live filters (respect current_model) refresh_btn.click( refresh_view, [query, tag_filter, category_filter, dataset_filter, current_model], [table, tag_filter, category_filter, dataset_filter, count_md] ) # Trigger refresh when any filter OR source mode changes for comp in (query, tag_filter, category_filter, dataset_filter, mode_radio): comp.change( refresh_view, [query, tag_filter, category_filter, dataset_filter, current_model], [table, tag_filter, category_filter, dataset_filter, count_md] ) # Keep source_mode state in sync with the radio mode_radio.change(lambda x: x, [mode_radio], [source_mode]) # Pass source_mode into select_row so it can choose hf_path vs path table.select(select_row, [table, source_mode], [preview_html, preview_label]) save_btn.click(save_edits, [table, current_model], [table]) # initial load (no model yet) demo.load( refresh_view, [query, tag_filter, category_filter, dataset_filter, current_model], [table, tag_filter, category_filter, dataset_filter, count_md] ) if __name__ == "__main__": demo.launch(share=True) # auth optional