audio-library / app.py
akazemian's picture
Upload folder using huggingface_hub
4fe3afe verified
raw
history blame
24.6 kB
import os, uuid, datetime, traceback
from pathlib import Path
import html as _py_html
import pandas as pd
import gradio as gr
from huggingface_hub import hf_hub_download
from urllib.parse import unquote # add at top
# ----------- HF DATASET CONFIG -----------
HF_DATASET_REPO = "akazemian/audio-html" # <-- change if needed
INDEX_FILENAME = "index.csv"
# -----------------------------------------
DB_PATH = "library.csv"
ALLOWED_EXTS = {".html"}
# Columns in DB
EXTRA_COLS = ["category", "dataset", "hf_path"] # <-- add hf_path here
BASE_COLS = ["id","filename","path","tags","keywords","notes","uploaded_at"]
ALL_DB_COLS = BASE_COLS + EXTRA_COLS
# Columns shown in the table (don't show hf_path)
TABLE_COLS = ["id","filename","category","dataset",
"tags","keywords","notes","uploaded_at"]
# At top-level config
HF_INDEX_REPO_ID = "akazemian/audio-library" # where index.csv lives *now*
HF_INDEX_REPO_TYPE = "space" # <— it's a Space, not a dataset
INDEX_FILENAME = "index.csv"
from huggingface_hub import hf_hub_download
# ---------- DB helpers ----------
def _load_db() -> pd.DataFrame:
if os.path.exists(DB_PATH):
df = pd.read_csv(DB_PATH)
for c in ALL_DB_COLS:
if c not in df.columns:
df[c] = ""
for c in ["tags","keywords","notes","category","dataset","hf_path","path","filename","id","uploaded_at"]:
df[c] = df[c].fillna("").astype(str)
return df[ALL_DB_COLS]
return pd.DataFrame(columns=ALL_DB_COLS)
def _save_db(df: pd.DataFrame):
df.to_csv(DB_PATH, index=False)
# ---------- Table normalizer ----------
def _df_from_table_value(table_value):
cols = TABLE_COLS
if isinstance(table_value, pd.DataFrame):
for c in cols:
if c not in table_value.columns:
table_value[c] = ""
return table_value[cols]
if isinstance(table_value, list):
if not table_value:
return pd.DataFrame(columns=cols)
first = table_value[0]
if isinstance(first, dict):
df = pd.DataFrame(table_value)
for c in cols:
if c not in df.columns:
df[c] = ""
return df[cols]
else:
return pd.DataFrame(table_value, columns=cols)
return pd.DataFrame(columns=cols)
# ---------- Load HF index ----------
def _load_hf_index() -> pd.DataFrame:
"""
Download + read index.csv from the HF dataset repo.
Required columns: id, filename, relpath, category, dataset, tags, keywords, notes, uploaded_at
"""
local = hf_hub_download(
repo_id=HF_INDEX_REPO_ID,
repo_type=HF_INDEX_REPO_TYPE,
filename=INDEX_FILENAME,
)
df = pd.read_csv(local)
for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]:
if c not in df.columns:
df[c] = ""
# normalize types
for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]:
df[c] = df[c].fillna("").astype(str)
return df
# ---------- Sync by model (prefix inside HF dataset) ----------
from urllib.parse import unquote # ensure this import exists at top
def sync_model(model_name: str):
raw = (model_name or "").strip()
if not raw:
return gr.Info("Please enter a model name."), None, None, None, "", ""
# 1) read index from HF and filter to this model prefix (accept raw or URL-decoded)
try:
idx = _load_hf_index()
except Exception as e:
traceback.print_exc()
return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, "", ""
decoded = unquote(raw)
rel = idx["relpath"].astype(str)
sub = idx[ rel.str.startswith(f"{raw}/") | rel.str.startswith(f"{decoded}/") ]
if sub.empty:
return gr.Info(
f"No HTML files found for model '{raw}'. "
"Tip: if you copied from the URL, use '=' instead of '%3D'."
), None, None, None, "", ""
# 2) load local DB, backfill hf_path for existing rows of this model (by filename)
db = _load_db()
if not db.empty:
rel_by_fname = dict(zip(sub["filename"].astype(str), sub["relpath"].astype(str)))
mask_model_rows = db["filename"].astype(str).isin(rel_by_fname.keys())
if mask_model_rows.any():
db.loc[mask_model_rows, "hf_path"] = db.loc[mask_model_rows, "filename"].map(
lambda fn: f"hf://{HF_DATASET_REPO}/{rel_by_fname.get(str(fn), str(fn))}"
)
# 3) add any missing rows from HF index
now = datetime.datetime.now().isoformat(timespec="seconds")
existing_hf = set(db["hf_path"].astype(str))
new_rows = []
for _, r in sub.iterrows():
rp = str(r["relpath"])
hf_uri = f"hf://{HF_DATASET_REPO}/{rp}"
if hf_uri in existing_hf:
continue
# If a row with same filename exists already, we updated its hf_path above; skip adding duplicate
if not db[db["filename"].astype(str) == str(r["filename"])].empty:
continue
new_rows.append({
"id": (str(r["id"]) if str(r.get("id", "")) else uuid.uuid4().hex[:8]),
"filename": str(r["filename"]),
"path": "", # local path unknown in HF flow
"hf_path": hf_uri,
"tags": str(r.get("tags", "")),
"keywords": str(r.get("keywords", "")),
"notes": str(r.get("notes", "")),
"uploaded_at": (str(r.get("uploaded_at", "")) or now),
"category": str(r.get("category", "")),
"dataset": str(r.get("dataset", "")),
})
if new_rows:
db = pd.concat([db, pd.DataFrame(new_rows)], ignore_index=True)
_save_db(db)
# Use decoded model for downstream filtering
current_model = decoded
# outputs: [table, tag_filter, category_filter, dataset_filter, count_md, current_model]
return refresh_view("", [], "", "", current_model) + (current_model,)
# allow user to paste either "wavcoch_audio-preds-sr=16000" or the URL-encoded "%3D" form
# def sync_model(model_name: str):
# """
# Load index.csv from HF, add rows for the selected model (by relpath prefix),
# store HF URIs in DB, and show only that model’s files.
# """
# model_name = (model_name or "").strip()
# if not model_name:
# return gr.Info("Please enter a model name."), None, None, None, ""
# try:
# idx = _load_hf_index()
# except Exception as e:
# traceback.print_exc()
# return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, ""
# # rows like "{model_name}/.../file.html"
# subset = idx[idx["relpath"].str.startswith(model_name + "/")]
# if subset.empty:
# return gr.Info(f"No HTML files found for model '{model_name}' on {HF_DATASET_REPO}"), None, None, None, ""
# df = _load_db()
# now = datetime.datetime.now().isoformat(timespec="seconds")
# new_rows = []
# for _, r in subset.iterrows():
# relpath = r["relpath"]
# hub_uri = f"hf://{HF_DATASET_REPO}/{relpath}"
# if (df["path"] == hub_uri).any():
# continue
# new_rows.append({
# "id": r["id"] if r["id"] else uuid.uuid4().hex[:8],
# "filename": r["filename"],
# "path": hub_uri, # store HF URI
# "tags": r["tags"],
# "keywords": r["keywords"],
# "notes": r["notes"],
# "uploaded_at": r["uploaded_at"] or now,
# "category": r["category"],
# "dataset": r["dataset"]
# })
# if new_rows:
# df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
# _save_db(df)
# current_model = model_name # remember which model prefix is active
# return refresh_view("", [], "", "", current_model) + (current_model,)
# ---------- Search / filters ----------
def refresh_view(query, tag_filters, category_filter, dataset_filter, current_model):
df = _load_db()
# scope to current model prefix in HF URI if provided
if current_model:
prefix = f"hf://{HF_DATASET_REPO}/{current_model}/"
df = df[df["path"].astype(str).str.startswith(prefix)]
# tag vocabulary
all_tags = sorted({t.strip()
for s in df["tags"].dropna().astype(str).tolist()
for t in s.split(",") if t.strip()})
all_cats = sorted([c for c in df["category"].dropna().astype(str).unique() if c])
all_sets = sorted([c for c in df["dataset"].dropna().astype(str).unique() if c])
# free-text query across filename/tags/keywords/notes/category/dataset
if query:
q = query.lower()
mask = (
df["filename"].str.lower().str.contains(q, na=False) |
df["tags"].str.lower().str.contains(q, na=False) |
df["keywords"].str.lower().str.contains(q, na=False) |
df["notes"].str.lower().str.contains(q, na=False) |
df["category"].str.lower().str.contains(q, na=False) |
df["dataset"].str.lower().str.contains(q, na=False)
)
df = df[mask]
# tag filters (AND semantics)
for t in (tag_filters or []):
df = df[df["tags"].astype(str).apply(
lambda s: t in [x.strip() for x in s.split(",") if x.strip()])]
# dropdown filters (exact match)
if category_filter:
df = df[df["category"] == category_filter]
if dataset_filter:
df = df[df["dataset"] == dataset_filter]
df = df.sort_values("uploaded_at", ascending=False).reset_index(drop=True)
view = df[TABLE_COLS].copy()
count_text = f"**Showing {len(view)} file(s)**"
return (
view,
gr.update(choices=all_tags),
gr.update(choices=[""] + all_cats, value=category_filter or ""),
gr.update(choices=[""] + all_sets, value=dataset_filter or ""),
count_text
)
# ---------- Preview ----------
def _iframe_from_html_string(raw_html: str, height_px: int = 720) -> str:
srcdoc = raw_html.replace("&", "&amp;").replace('"', "&quot;")
return f'<iframe style="width:100%;height:{height_px}px;border:1px solid #ddd;border-radius:8px;" srcdoc="{srcdoc}"></iframe>'
def select_row(evt: gr.SelectData, table_value, source_mode):
try:
view = _df_from_table_value(table_value)
if view.empty:
return "<em>No rows.</em>", ""
# --- resolve row_idx robustly ---
row_idx = None
# 1) Preferred: evt.index (int or [int, ...])
ix = getattr(evt, "index", None)
if isinstance(ix, int):
row_idx = ix
elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int):
row_idx = ix[0]
# 2) Fallbacks: evt.value may be a dict with id, or a list (row values)
if row_idx is None:
val = getattr(evt, "value", None)
if isinstance(val, dict) and "id" in val:
hits = view.index[view["id"] == val["id"]].tolist()
if hits:
row_idx = hits[0]
elif isinstance(val, list) and len(val) >= 1:
# assume first column is id
hits = view.index[view["id"] == val[0]].tolist()
if hits:
row_idx = hits[0]
# 3) Last resort: default to first row
if row_idx is None:
row_idx = 0
# bounds check
if not (0 <= row_idx < len(view)):
return "<em>Invalid selection.</em>", ""
row = view.iloc[row_idx]
sel_id = row["id"]
# --- look up the full record from DB ---
db = _load_db()
rec = db[db["id"] == sel_id]
if rec.empty:
return "<em>Could not find file for this row.</em>", ""
# --- choose source: HF vs Local ---
use_hf = (str(source_mode).upper() == "HF")
path_str = rec["hf_path"].values[0] if use_hf else rec["path"].values[0]
path_str = str(path_str or "")
if not path_str:
return "<em>No path available for this source.</em>", f"📄 {row['filename']}"
# HF dataset URI → lazy download then iframe from raw HTML
if path_str.startswith("hf://"):
_, rest = path_str.split("hf://", 1)
repo_id, relpath = rest.split("/", 1)
local_path = hf_hub_download(repo_id=repo_id, repo_type="dataset", filename=relpath)
raw_html = Path(local_path).read_text(encoding="utf-8")
iframe = _iframe_from_html_string(raw_html, height_px=720)
return iframe, f"📄 {row['filename']}"
# Direct HTTP URL (CDN) → iframe src
if path_str.startswith("http"):
iframe = f'<iframe style="width:100%;height:720px;border:1px solid #ddd;border-radius:8px;" src="{_py_html.escape(path_str)}"></iframe>'
return iframe, f"📄 {row['filename']}"
# Local file fallback
p = Path(path_str)
if not p.exists():
return f"<em>File not found:</em> <code>{_py_html.escape(str(p))}</code>", f"📄 {row['filename']}"
raw_html = p.read_text(encoding="utf-8")
iframe = _iframe_from_html_string(raw_html, height_px=720)
return iframe, f"📄 {row['filename']}"
except Exception as e:
traceback.print_exc()
return f"<pre>Failed to render (see terminal):\n{_py_html.escape(str(e))}</pre>", ""
# def select_row(evt: gr.SelectData, table_value):
# try:
# view = _df_from_table_value(table_value)
# if view.empty:
# return "<em>No rows.</em>", ""
# # resolve row
# row_idx = None
# ix = getattr(evt, "index", None)
# if isinstance(ix, int):
# row_idx = ix
# elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int):
# row_idx = ix[0]
# if row_idx is None:
# val = getattr(evt, "value", None)
# if isinstance(val, dict) and "id" in val:
# hits = view.index[view["id"] == val["id"]].tolist()
# if hits: row_idx = hits[0]
# elif isinstance(val, list) and len(val) >= 1:
# hits = view.index[view["id"] == val[0]].tolist()
# if hits: row_idx = hits[0]
# if row_idx is None or not (0 <= row_idx < len(view)):
# return "<em>Invalid selection.</em>", ""
# row = view.iloc[row_idx]
# sel_id = row["id"]
# db = _load_db()
# rec = db[db["id"] == sel_id]
# if rec.empty:
# return "<em>Could not find file for this row.</em>", ""
# path_str = rec["path"].values[0]
# # Hub-backed path → lazy download
# if str(path_str).startswith("hf://"):
# _, rest = path_str.split("hf://", 1)
# repo_id, relpath = rest.split("/", 1)
# local_path = hf_hub_download(repo_id=repo_id, repo_type="dataset", filename=relpath)
# raw_html = Path(local_path).read_text(encoding="utf-8")
# elif str(path_str).startswith("http"):
# # if you ever swap to CDN URLs, iframe the URL directly
# iframe = f'<iframe style="width:100%;height:720px;border:1px solid #ddd;border-radius:8px;" src="{_py_html.escape(path_str)}"></iframe>'
# return iframe, f"📄 {row['filename']}"
# else:
# # local file fallback (not used for HF flow, kept for compatibility)
# p = Path(path_str)
# if not p.exists():
# return f"<em>File not found:</em> <code>{_py_html.escape(str(p))}</code>", f"📄 {row['filename']}"
# raw_html = p.read_text(encoding="utf-8")
# iframe = _iframe_from_html_string(raw_html, height_px=720)
# return iframe, f"📄 {row['filename']}"
# except Exception as e:
# traceback.print_exc()
# return f"<pre>Failed to render (see terminal):\n{_py_html.escape(str(e))}</pre>", ""
# ---------- Save edits ----------
def save_edits(edited_table, current_model):
if edited_table is None or not len(edited_table):
return gr.Info("Nothing to save.")
df_db = _load_db()
editable_cols = ["category","dataset","tags","keywords","notes"]
for c in editable_cols:
edited_table[c] = edited_table[c].fillna("").astype(str)
for _, row in edited_table.iterrows():
i = df_db.index[df_db["id"] == row["id"]]
if len(i):
for c in editable_cols:
df_db.at[i[0], c] = row[c]
_save_db(df_db)
# return refreshed table only (respect current_model scope)
return refresh_view("", [], "", "", current_model)[0]
# -------------------- UI --------------------
# CSS that targets only the three buttons via elem_id
custom_css = """
/* scope styles to only these 3 components */
#sync-btn button,
#refresh-btn button,
#save-btn button,
#sync-btn .gr-button,
#refresh-btn .gr-button,
#save-btn .gr-button,
#sync-btn [role="button"],
#refresh-btn [role="button"],
#save-btn [role="button"] {
background: #f97316 !important; /* orange-500 */
border-color: #f97316 !important;
color: #fff !important;
}
/* hover/active */
#sync-btn button:hover,
#refresh-btn button:hover,
#save-btn button:hover,
#sync-btn .gr-button:hover,
#refresh-btn .gr-button:hover,
#save-btn .gr-button:hover,
#sync-btn [role="button"]:hover,
#refresh-btn [role="button"]:hover,
#save-btn [role="button"]:hover {
background: #ea580c !important; /* orange-600 */
border-color: #ea580c !important;
}
/* (optional) also set CSS vars in case theme uses them */
#sync-btn, #refresh-btn, #save-btn {
--button-primary-background-fill: #f97316;
--button-primary-background-fill-hover: #ea580c;
--button-text-color: #fff;
}
"""
# with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo:
# gr.Markdown("## 🎧 Audio Reconstruction Reports — sync • search • view")
# current_model = gr.State("") # remembers active model prefix inside HF repo
# source_mode = gr.State("HF") # default
# with gr.Row():
# with gr.Column(scale=1):
# # Choose model & sync
# gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}/<model_name>/...`")
# model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192")
# sync_btn = gr.Button("Sync this model", elem_id="sync-btn")
# # Search & filters
# gr.Markdown("---\n**Search & filter**")
# query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to search…")
# tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)")
# category_filter = gr.Dropdown(choices=[], label="Category")
# dataset_filter = gr.Dropdown(choices=[], label="Dataset")
# refresh_btn = gr.Button("Refresh", elem_id="refresh-btn")
# with gr.Column(scale=2):
# # Count of current view
# count_md = gr.Markdown("**Showing 0 file(s)**")
# gr.Markdown("**Library** (click a row to preview; edit cells and Save)")
# table = gr.Dataframe(
# headers=TABLE_COLS,
# datatype=["str"] * len(TABLE_COLS),
# interactive=True,
# wrap=True,
# row_count=(0, "dynamic"),
# col_count=(len(TABLE_COLS), "fixed")
# )
# with gr.Row():
# save_btn = gr.Button("Save Edits", elem_id="save-btn")
# preview_label = gr.Markdown("")
# preview_html = gr.HTML("")
# # wiring: sync (also sets current_model)
# sync_btn.click(
# sync_model,
# [model_in],
# [table, tag_filter, category_filter, dataset_filter, count_md, current_model]
# )
# # wiring: refresh + live filters (respect current_model)
# refresh_btn.click(
# refresh_view,
# [query, tag_filter, category_filter, dataset_filter, current_model],
# [table, tag_filter, category_filter, dataset_filter, count_md]
# )
# for comp in (query, tag_filter, category_filter, dataset_filter):
# comp.change(
# refresh_view,
# [query, tag_filter, category_filter, dataset_filter, current_model],
# [table, tag_filter, category_filter, dataset_filter, count_md]
# )
# table.select(select_row, [table], [preview_html, preview_label])
# save_btn.click(save_edits, [table, current_model], [table])
# # initial load (no model yet)
# demo.load(
# refresh_view,
# [query, tag_filter, category_filter, dataset_filter, current_model],
# [table, tag_filter, category_filter, dataset_filter, count_md]
# )
# if __name__ == "__main__":
# demo.launch(share=True) # auth optional
with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo:
gr.Markdown("## 🎧 Audio Reconstruction Reports — sync • search • view")
current_model = gr.State("") # remembers active model prefix inside HF repo
source_mode = gr.State("HF") # default
with gr.Row():
with gr.Column(scale=1):
# Choose model & sync
gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}/<model_name>/...`")
model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192")
sync_btn = gr.Button("Sync this model", elem_id="sync-btn")
# Search & filters
gr.Markdown("---\n**Search & filter**")
query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to search…")
tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)")
category_filter = gr.Dropdown(choices=[], label="Category")
dataset_filter = gr.Dropdown(choices=[], label="Dataset")
# 🔽 Step 5: Source toggle (HF vs Local)
mode_radio = gr.Radio(
choices=["HF", "Local"],
value="HF",
label="Source",
info="Preview from HF dataset or local disk"
)
refresh_btn = gr.Button("Refresh", elem_id="refresh-btn")
with gr.Column(scale=2):
# Count of current view
count_md = gr.Markdown("**Showing 0 file(s)**")
gr.Markdown("**Library** (click a row to preview; edit cells and Save)")
table = gr.Dataframe(
headers=TABLE_COLS,
datatype=["str"] * len(TABLE_COLS),
interactive=True,
wrap=True,
row_count=(0, "dynamic"),
col_count=(len(TABLE_COLS), "fixed")
)
with gr.Row():
save_btn = gr.Button("Save Edits", elem_id="save-btn")
preview_label = gr.Markdown("")
preview_html = gr.HTML("")
# wiring: sync (also sets current_model)
sync_btn.click(
sync_model,
[model_in],
[table, tag_filter, category_filter, dataset_filter, count_md, current_model]
)
# wiring: refresh + live filters (respect current_model)
refresh_btn.click(
refresh_view,
[query, tag_filter, category_filter, dataset_filter, current_model],
[table, tag_filter, category_filter, dataset_filter, count_md]
)
# Trigger refresh when any filter OR source mode changes
for comp in (query, tag_filter, category_filter, dataset_filter, mode_radio):
comp.change(
refresh_view,
[query, tag_filter, category_filter, dataset_filter, current_model],
[table, tag_filter, category_filter, dataset_filter, count_md]
)
# Keep source_mode state in sync with the radio
mode_radio.change(lambda x: x, [mode_radio], [source_mode])
# Pass source_mode into select_row so it can choose hf_path vs path
table.select(select_row, [table, source_mode], [preview_html, preview_label])
save_btn.click(save_edits, [table, current_model], [table])
# initial load (no model yet)
demo.load(
refresh_view,
[query, tag_filter, category_filter, dataset_filter, current_model],
[table, tag_filter, category_filter, dataset_filter, count_md]
)
if __name__ == "__main__":
demo.launch(share=True) # auth optional