audio-library / sync_library_and_hf.py
akazemian's picture
Upload folder using huggingface_hub
4fe3afe verified
raw
history blame
9.59 kB
#!/usr/bin/env python3
# sync_library_and_hf.py
import argparse, datetime, uuid, posixpath, sys, traceback
from pathlib import Path
from typing import List, Tuple
from urllib.parse import unquote # add at top
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download, CommitOperationAdd
from huggingface_hub.utils import HfHubHTTPError
REQUIRED_DB_COLS = [
"id","filename","path","tags","keywords","notes","uploaded_at","category","dataset","hf_path"
]
INDEX_COLS = ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]
def now_iso() -> str:
return datetime.datetime.now().isoformat(timespec="seconds")
def ensure_cols(df: pd.DataFrame, cols: list) -> pd.DataFrame:
for c in cols:
if c not in df.columns:
df[c] = ""
for c in cols:
df[c] = df[c].fillna("").astype(str)
return df[cols]
def load_db(db_path: Path) -> pd.DataFrame:
if db_path.exists():
df = pd.read_csv(db_path)
else:
df = pd.DataFrame(columns=REQUIRED_DB_COLS)
return ensure_cols(df, REQUIRED_DB_COLS)
def save_db(df: pd.DataFrame, db_path: Path):
db_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(db_path, index=False)
def load_hf_index(repo_id: str, index_filename: str) -> Tuple[pd.DataFrame, bool]:
try:
p = hf_hub_download(repo_id=repo_id, repo_type="dataset", filename=index_filename)
df = pd.read_csv(p)
return ensure_cols(df, INDEX_COLS), True
except HfHubHTTPError as e:
if e.response is not None and e.response.status_code == 404:
return ensure_cols(pd.DataFrame(columns=INDEX_COLS), INDEX_COLS), False
raise
def relpath_posix(local_path: Path, root: Path) -> str:
rel = local_path.resolve().relative_to(root.resolve())
return posixpath.join(*rel.parts)
def discover_new_local_htmls(reports_root: Path, df_db: pd.DataFrame) -> List[Path]:
all_htmls = list(reports_root.rglob("*.html"))
existing_paths = set(df_db["path"].astype(str))
return sorted([p for p in all_htmls if str(p) not in existing_paths])
def rows_from_files(files: List[Path]) -> pd.DataFrame:
ts = now_iso()
rows = [{
"id": uuid.uuid4().hex[:8],
"filename": p.name,
"path": str(p),
"tags": "",
"keywords": "",
"notes": "",
"uploaded_at": ts,
"category": "",
"dataset": "",
"hf_path": "",
} for p in files]
return pd.DataFrame(rows, columns=REQUIRED_DB_COLS) if rows else pd.DataFrame(columns=REQUIRED_DB_COLS)
def backfill_hf_paths_by_relpath(df_db: pd.DataFrame, reports_root: Path, hf_repo: str, idx: pd.DataFrame) -> int:
"""Set hf_path using exact relpath matches."""
rel_set = set(idx["relpath"].astype(str))
updated = 0
for i, p in enumerate(df_db["path"].astype(str).tolist()):
if not p:
continue
try:
rp = relpath_posix(Path(p), reports_root)
except Exception:
continue
if (not df_db.at[i, "hf_path"]) and rp in rel_set:
df_db.at[i, "hf_path"] = f"hf://{hf_repo}/{rp}"
updated += 1
return updated
def backfill_hf_paths_by_filename(df_db: pd.DataFrame, hf_repo: str, idx: pd.DataFrame) -> int:
"""Set hf_path by filename match (fallback)."""
updated = 0
rel_by_fname = dict(zip(idx["filename"].astype(str), idx["relpath"].astype(str)))
mask = df_db["hf_path"].astype(str) == ""
for i in df_db.index[mask]:
fn = str(df_db.at[i, "filename"])
rp = rel_by_fname.get(fn)
if rp:
df_db.at[i, "hf_path"] = f"hf://{hf_repo}/{rp}"
updated += 1
return updated
def append_to_remote_index(remote_index: pd.DataFrame, new_rows: List[dict]) -> pd.DataFrame:
if not new_rows:
return remote_index
add_df = pd.DataFrame(new_rows, columns=INDEX_COLS)
merged = pd.concat([remote_index, add_df], ignore_index=True)
merged = merged.drop_duplicates(subset=["relpath"], keep="first")
return merged[INDEX_COLS]
def commit_ops_in_batches(api: HfApi, repo_id: str, ops: List[CommitOperationAdd], batch_size: int, msg_prefix: str):
if not ops:
return
for start in range(0, len(ops), batch_size):
batch = ops[start:start+batch_size]
api.create_commit(
repo_id=repo_id,
repo_type="dataset",
operations=batch,
commit_message=f"{msg_prefix} (n={len(batch)})"
)
def main():
ap = argparse.ArgumentParser(description="Sync local library.csv with HF dataset: add new local files, upload missing to HF, update index.csv, backfill hf_path.")
ap.add_argument("--reports-root", required=True, type=Path, help="Root containing {model}/.../*.html")
ap.add_argument("--db-path", required=True, type=Path, help="Path to local library.csv")
ap.add_argument("--repo-id", required=True, help="HF dataset repo id, e.g. USER/audio-html")
ap.add_argument("--index-filename", default="index.csv", help="Index filename in the HF dataset (default: index.csv)")
ap.add_argument("--batch-size", type=int, default=1000, help="Files per commit when uploading to HF")
ap.add_argument("--dry-run", action="store_true", help="Print actions; do not write or push")
ap.add_argument("--commit-message", default="Sync: add new HTMLs + update index.csv", help="Commit message prefix")
args = ap.parse_args()
reports_root: Path = args.reports_root
db_path: Path = args.db_path
hf_repo: str = args.repo_id
index_filename: str = args.index_filename
bs: int = args.batch_size
dry: bool = args.dry_run
print(f"[config] reports_root={reports_root}")
print(f"[config] db_path={db_path}")
print(f"[config] repo_id={hf_repo}, index={index_filename}")
print(f"[config] batch_size={bs}, dry_run={dry}")
# 1) Load DB
df_db = load_db(db_path)
# 2) Append new local *.html files to DB
new_local_files = discover_new_local_htmls(reports_root, df_db)
print(f"[scan] new local HTML files: {len(new_local_files)}")
if new_local_files:
df_new = rows_from_files(new_local_files)
df_db = pd.concat([df_db, df_new], ignore_index=True)
# 3) Load remote index (or create a new empty one)
remote_index, existed = load_hf_index(hf_repo, index_filename)
print(f"[index] remote exists={existed}, rows={len(remote_index)}")
# 4) Backfill hf_path from remote index (relpath first, then filename)
n1 = backfill_hf_paths_by_relpath(df_db, reports_root, hf_repo, remote_index)
n2 = backfill_hf_paths_by_filename(df_db, hf_repo, remote_index)
print(f"[hf] backfilled hf_path: relpath={n1}, filename={n2}")
# 5) Determine which rows still need uploading to HF
need_upload_idx = df_db[(df_db["hf_path"] == "") & (df_db["path"] != "")]
print(f"[hf] rows needing upload: {len(need_upload_idx)}")
ops: List[CommitOperationAdd] = []
new_index_rows: List[dict] = []
for _, r in need_upload_idx.iterrows():
local = Path(r["path"])
if not local.exists():
continue # skip missing local file
try:
rp = relpath_posix(local, reports_root)
except Exception:
continue # path outside root; skip
# prepare upload op
ops.append(CommitOperationAdd(path_in_repo=rp, path_or_fileobj=str(local)))
# prepare new index row for this file
new_index_rows.append({
"id": r["id"] or uuid.uuid4().hex[:8],
"filename": r["filename"],
"relpath": rp,
"category": r["category"],
"dataset": r["dataset"],
"tags": r["tags"],
"keywords": r["keywords"],
"notes": r["notes"],
"uploaded_at": r["uploaded_at"] or now_iso(),
})
api = HfApi()
# 6) Upload missing files in batches
if ops and not dry:
print(f"[hf] uploading {len(ops)} files in batches of {bs}...")
commit_ops_in_batches(api, hf_repo, ops, bs, args.commit_message)
elif ops and dry:
print(f"[dry-run] would upload {len(ops)} files")
# 7) Merge/commit updated index.csv
if new_index_rows:
merged_index = append_to_remote_index(remote_index, new_index_rows)
if not dry:
tmp = Path("index.updated.csv")
merged_index.to_csv(tmp, index=False)
api.create_commit(
repo_id=hf_repo,
repo_type="dataset",
operations=[CommitOperationAdd(path_in_repo=index_filename, path_or_fileobj=str(tmp))],
commit_message=f"{args.commit_message} (update {index_filename}, rows={len(merged_index)})"
)
tmp.unlink(missing_ok=True)
else:
print(f"[dry-run] would update {index_filename} with +{len(new_index_rows)} rows")
# 8) Update hf_path locally for those rows we just uploaded
for _, r in need_upload_idx.iterrows():
local = Path(r["path"])
try:
rp = relpath_posix(local, reports_root)
except Exception:
continue
df_db.loc[df_db["path"] == str(local), "hf_path"] = f"hf://{hf_repo}/{rp}"
# 9) Save DB
if dry:
print("[dry-run] not writing library.csv")
else:
save_db(df_db, db_path)
print(f"[done] wrote {len(df_db)} rows to {db_path}")
if __name__ == "__main__":
try:
main()
except Exception as e:
traceback.print_exc()
sys.exit(1)