Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # sync_library_and_hf.py | |
| import argparse, datetime, uuid, posixpath, sys, traceback | |
| from pathlib import Path | |
| from typing import List, Tuple | |
| from urllib.parse import unquote # add at top | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download, CommitOperationAdd | |
| from huggingface_hub.utils import HfHubHTTPError | |
| REQUIRED_DB_COLS = [ | |
| "id","filename","path","tags","keywords","notes","uploaded_at","category","dataset","hf_path" | |
| ] | |
| INDEX_COLS = ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"] | |
| def now_iso() -> str: | |
| return datetime.datetime.now().isoformat(timespec="seconds") | |
| def ensure_cols(df: pd.DataFrame, cols: list) -> pd.DataFrame: | |
| for c in cols: | |
| if c not in df.columns: | |
| df[c] = "" | |
| for c in cols: | |
| df[c] = df[c].fillna("").astype(str) | |
| return df[cols] | |
| def load_db(db_path: Path) -> pd.DataFrame: | |
| if db_path.exists(): | |
| df = pd.read_csv(db_path) | |
| else: | |
| df = pd.DataFrame(columns=REQUIRED_DB_COLS) | |
| return ensure_cols(df, REQUIRED_DB_COLS) | |
| def save_db(df: pd.DataFrame, db_path: Path): | |
| db_path.parent.mkdir(parents=True, exist_ok=True) | |
| df.to_csv(db_path, index=False) | |
| def load_hf_index(repo_id: str, index_filename: str) -> Tuple[pd.DataFrame, bool]: | |
| try: | |
| p = hf_hub_download(repo_id=repo_id, repo_type="dataset", filename=index_filename) | |
| df = pd.read_csv(p) | |
| return ensure_cols(df, INDEX_COLS), True | |
| except HfHubHTTPError as e: | |
| if e.response is not None and e.response.status_code == 404: | |
| return ensure_cols(pd.DataFrame(columns=INDEX_COLS), INDEX_COLS), False | |
| raise | |
| def relpath_posix(local_path: Path, root: Path) -> str: | |
| rel = local_path.resolve().relative_to(root.resolve()) | |
| return posixpath.join(*rel.parts) | |
| def discover_new_local_htmls(reports_root: Path, df_db: pd.DataFrame) -> List[Path]: | |
| all_htmls = list(reports_root.rglob("*.html")) | |
| existing_paths = set(df_db["path"].astype(str)) | |
| return sorted([p for p in all_htmls if str(p) not in existing_paths]) | |
| def rows_from_files(files: List[Path]) -> pd.DataFrame: | |
| ts = now_iso() | |
| rows = [{ | |
| "id": uuid.uuid4().hex[:8], | |
| "filename": p.name, | |
| "path": str(p), | |
| "tags": "", | |
| "keywords": "", | |
| "notes": "", | |
| "uploaded_at": ts, | |
| "category": "", | |
| "dataset": "", | |
| "hf_path": "", | |
| } for p in files] | |
| return pd.DataFrame(rows, columns=REQUIRED_DB_COLS) if rows else pd.DataFrame(columns=REQUIRED_DB_COLS) | |
| def backfill_hf_paths_by_relpath(df_db: pd.DataFrame, reports_root: Path, hf_repo: str, idx: pd.DataFrame) -> int: | |
| """Set hf_path using exact relpath matches.""" | |
| rel_set = set(idx["relpath"].astype(str)) | |
| updated = 0 | |
| for i, p in enumerate(df_db["path"].astype(str).tolist()): | |
| if not p: | |
| continue | |
| try: | |
| rp = relpath_posix(Path(p), reports_root) | |
| except Exception: | |
| continue | |
| if (not df_db.at[i, "hf_path"]) and rp in rel_set: | |
| df_db.at[i, "hf_path"] = f"hf://{hf_repo}/{rp}" | |
| updated += 1 | |
| return updated | |
| def backfill_hf_paths_by_filename(df_db: pd.DataFrame, hf_repo: str, idx: pd.DataFrame) -> int: | |
| """Set hf_path by filename match (fallback).""" | |
| updated = 0 | |
| rel_by_fname = dict(zip(idx["filename"].astype(str), idx["relpath"].astype(str))) | |
| mask = df_db["hf_path"].astype(str) == "" | |
| for i in df_db.index[mask]: | |
| fn = str(df_db.at[i, "filename"]) | |
| rp = rel_by_fname.get(fn) | |
| if rp: | |
| df_db.at[i, "hf_path"] = f"hf://{hf_repo}/{rp}" | |
| updated += 1 | |
| return updated | |
| def append_to_remote_index(remote_index: pd.DataFrame, new_rows: List[dict]) -> pd.DataFrame: | |
| if not new_rows: | |
| return remote_index | |
| add_df = pd.DataFrame(new_rows, columns=INDEX_COLS) | |
| merged = pd.concat([remote_index, add_df], ignore_index=True) | |
| merged = merged.drop_duplicates(subset=["relpath"], keep="first") | |
| return merged[INDEX_COLS] | |
| def commit_ops_in_batches(api: HfApi, repo_id: str, ops: List[CommitOperationAdd], batch_size: int, msg_prefix: str): | |
| if not ops: | |
| return | |
| for start in range(0, len(ops), batch_size): | |
| batch = ops[start:start+batch_size] | |
| api.create_commit( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| operations=batch, | |
| commit_message=f"{msg_prefix} (n={len(batch)})" | |
| ) | |
| def main(): | |
| ap = argparse.ArgumentParser(description="Sync local library.csv with HF dataset: add new local files, upload missing to HF, update index.csv, backfill hf_path.") | |
| ap.add_argument("--reports-root", required=True, type=Path, help="Root containing {model}/.../*.html") | |
| ap.add_argument("--db-path", required=True, type=Path, help="Path to local library.csv") | |
| ap.add_argument("--repo-id", required=True, help="HF dataset repo id, e.g. USER/audio-html") | |
| ap.add_argument("--index-filename", default="index.csv", help="Index filename in the HF dataset (default: index.csv)") | |
| ap.add_argument("--batch-size", type=int, default=1000, help="Files per commit when uploading to HF") | |
| ap.add_argument("--dry-run", action="store_true", help="Print actions; do not write or push") | |
| ap.add_argument("--commit-message", default="Sync: add new HTMLs + update index.csv", help="Commit message prefix") | |
| args = ap.parse_args() | |
| reports_root: Path = args.reports_root | |
| db_path: Path = args.db_path | |
| hf_repo: str = args.repo_id | |
| index_filename: str = args.index_filename | |
| bs: int = args.batch_size | |
| dry: bool = args.dry_run | |
| print(f"[config] reports_root={reports_root}") | |
| print(f"[config] db_path={db_path}") | |
| print(f"[config] repo_id={hf_repo}, index={index_filename}") | |
| print(f"[config] batch_size={bs}, dry_run={dry}") | |
| # 1) Load DB | |
| df_db = load_db(db_path) | |
| # 2) Append new local *.html files to DB | |
| new_local_files = discover_new_local_htmls(reports_root, df_db) | |
| print(f"[scan] new local HTML files: {len(new_local_files)}") | |
| if new_local_files: | |
| df_new = rows_from_files(new_local_files) | |
| df_db = pd.concat([df_db, df_new], ignore_index=True) | |
| # 3) Load remote index (or create a new empty one) | |
| remote_index, existed = load_hf_index(hf_repo, index_filename) | |
| print(f"[index] remote exists={existed}, rows={len(remote_index)}") | |
| # 4) Backfill hf_path from remote index (relpath first, then filename) | |
| n1 = backfill_hf_paths_by_relpath(df_db, reports_root, hf_repo, remote_index) | |
| n2 = backfill_hf_paths_by_filename(df_db, hf_repo, remote_index) | |
| print(f"[hf] backfilled hf_path: relpath={n1}, filename={n2}") | |
| # 5) Determine which rows still need uploading to HF | |
| need_upload_idx = df_db[(df_db["hf_path"] == "") & (df_db["path"] != "")] | |
| print(f"[hf] rows needing upload: {len(need_upload_idx)}") | |
| ops: List[CommitOperationAdd] = [] | |
| new_index_rows: List[dict] = [] | |
| for _, r in need_upload_idx.iterrows(): | |
| local = Path(r["path"]) | |
| if not local.exists(): | |
| continue # skip missing local file | |
| try: | |
| rp = relpath_posix(local, reports_root) | |
| except Exception: | |
| continue # path outside root; skip | |
| # prepare upload op | |
| ops.append(CommitOperationAdd(path_in_repo=rp, path_or_fileobj=str(local))) | |
| # prepare new index row for this file | |
| new_index_rows.append({ | |
| "id": r["id"] or uuid.uuid4().hex[:8], | |
| "filename": r["filename"], | |
| "relpath": rp, | |
| "category": r["category"], | |
| "dataset": r["dataset"], | |
| "tags": r["tags"], | |
| "keywords": r["keywords"], | |
| "notes": r["notes"], | |
| "uploaded_at": r["uploaded_at"] or now_iso(), | |
| }) | |
| api = HfApi() | |
| # 6) Upload missing files in batches | |
| if ops and not dry: | |
| print(f"[hf] uploading {len(ops)} files in batches of {bs}...") | |
| commit_ops_in_batches(api, hf_repo, ops, bs, args.commit_message) | |
| elif ops and dry: | |
| print(f"[dry-run] would upload {len(ops)} files") | |
| # 7) Merge/commit updated index.csv | |
| if new_index_rows: | |
| merged_index = append_to_remote_index(remote_index, new_index_rows) | |
| if not dry: | |
| tmp = Path("index.updated.csv") | |
| merged_index.to_csv(tmp, index=False) | |
| api.create_commit( | |
| repo_id=hf_repo, | |
| repo_type="dataset", | |
| operations=[CommitOperationAdd(path_in_repo=index_filename, path_or_fileobj=str(tmp))], | |
| commit_message=f"{args.commit_message} (update {index_filename}, rows={len(merged_index)})" | |
| ) | |
| tmp.unlink(missing_ok=True) | |
| else: | |
| print(f"[dry-run] would update {index_filename} with +{len(new_index_rows)} rows") | |
| # 8) Update hf_path locally for those rows we just uploaded | |
| for _, r in need_upload_idx.iterrows(): | |
| local = Path(r["path"]) | |
| try: | |
| rp = relpath_posix(local, reports_root) | |
| except Exception: | |
| continue | |
| df_db.loc[df_db["path"] == str(local), "hf_path"] = f"hf://{hf_repo}/{rp}" | |
| # 9) Save DB | |
| if dry: | |
| print("[dry-run] not writing library.csv") | |
| else: | |
| save_db(df_db, db_path) | |
| print(f"[done] wrote {len(df_db)} rows to {db_path}") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except Exception as e: | |
| traceback.print_exc() | |
| sys.exit(1) | |