import asyncio import functools import hashlib import importlib import json import os import shutil import tempfile import sys import traceback import uuid import subprocess import threading from contextlib import asynccontextmanager from copy import deepcopy from datetime import datetime from typing import Optional, Tuple import time from concurrent.futures import ThreadPoolExecutor import gradio as gr from pdf2image import convert_from_path from pptx import Presentation as PptxPresentation sys.path.append("./") import pptagent.induct as induct import pptagent.pptgen as pptgen from pptagent.document import Document from pptagent.model_utils import ModelManager, parse_pdf from pptagent.multimodal import ImageLabler from pptagent.presentation import Presentation from pptagent.utils import Config, get_logger, package_join, pjoin, ppt_to_images_async async def run_blocking(func, *args, **kw): loop = asyncio.get_running_loop() return await loop.run_in_executor(None, functools.partial(func, *args, **kw)) async def run_cmd(cmd: list[str]): proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"{' '.join(cmd)}\n{stderr.decode()}") return stdout # Constants DEBUG = True if len(sys.argv) == 1 else False RUNS_DIR = package_join("runs") STAGES = ["PPT Parsing", "PDF Parsing", "PPT Analysis", "PPT Generation", "Success!"] # Create a temp directory for Gradio outputs GRADIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "gradio_ppt_agent") os.makedirs(GRADIO_TEMP_DIR, exist_ok=True) # Global variables ppt_video_progress_store: dict[str, dict] = {} progress_store: dict[str, dict] = {} models = None # Initialize as None, will be set in main thread logger = get_logger(__name__) executor = ThreadPoolExecutor(max_workers=2) # 在文件顶部添加默认模板配置 DEFAULT_TEMPLATES = [ { "name": "Template1", "path": "templates/Template1.pptx", "preview": "templates/previews/Template1.jpg" }, { "name": "Template2", "path": "templates/Template2.pptx", "preview": "templates/previews/Template2.jpg" }, { "name": "Template3", "path": "templates/Template3.pptx", "preview": "templates/previews/Template3.jpg" }, ] # 新增函数:获取默认模板列表 def get_default_templates(): """获取可用的默认模板""" available_templates = [] base_dir = os.path.dirname(__file__) for template in DEFAULT_TEMPLATES: template_path = os.path.join(base_dir, template["path"]) preview_path = os.path.join(base_dir, template["preview"]) if os.path.exists(template_path) and os.path.exists(preview_path): available_templates.append({ "name": template["name"], "path": template_path, "preview": preview_path }) return available_templates # 新增函数:模板选择回调 def select_template(selected_template_name): """选择默认模板""" if selected_template_name == "Upload Custom": return gr.update(visible=True), gr.update(visible=False), None else: templates = get_default_templates() for template in templates: if template["name"] == selected_template_name: return gr.update(visible=False), gr.update(visible=True), template["path"] return gr.update(visible=True), gr.update(visible=False), None # 新增函数:创建模板选择界面 def create_template_selection(): """创建模板选择界面""" templates = get_default_templates() with gr.Row(): with gr.Column(): gr.Markdown("### Choose Template") # 创建模板选择按钮 template_choices = [template["name"] for template in templates] + ["Upload Custom"] template_radio = gr.Radio( choices=template_choices, value="Upload Custom", label="Select Template Type" ) # 默认模板预览 template_preview = gr.Gallery( value=[[template["preview"], template["name"]] for template in templates], label="Template Previews", columns=2, rows=2, height="auto", visible=True ) # 自定义上传区域 custom_upload = gr.File( label="Upload Custom PPT Template", file_types=[".pptx"], type="filepath", visible=True ) # 显示选中的模板 selected_template_display = gr.Textbox( label="Selected Template", interactive=False, visible=False ) return template_radio, template_preview, custom_upload, selected_template_display def copy_to_gradio_safe_path(source_path: str, filename: str = None) -> str: """ Copy file to a Gradio-safe location (temp directory) Args: source_path: Path to source file filename: Optional custom filename, defaults to original filename Returns: Path to copied file in temp directory """ if not os.path.exists(source_path): raise FileNotFoundError(f"Source file not found: {source_path}") if filename is None: filename = os.path.basename(source_path) # Create unique filename to avoid conflicts timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") name, ext = os.path.splitext(filename) safe_filename = f"{name}_{timestamp}{ext}" safe_path = os.path.join(GRADIO_TEMP_DIR, safe_filename) shutil.copy2(source_path, safe_path) return safe_path # Initialize models with custom configuration def init_models(api_key: str = None, api_base: str = None, language_model: str = None, vision_model: str = None, text_model: str = None): """Initialize models with custom configuration""" global models try: # Set environment variables if provided if api_key: os.environ["OPENAI_API_KEY"] = api_key if api_base: os.environ["API_BASE"] = api_base if language_model: os.environ["LANGUAGE_MODEL"] = language_model if vision_model: os.environ["VISION_MODEL"] = vision_model if text_model: os.environ["TEXT_MODEL"] = text_model # Initialize models models = ModelManager( api_base=api_base, api_key=api_key, language_model_name=language_model, vision_model_name=vision_model, text_model_name=text_model ) # Test connections in main thread try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(models.test_connections()) assert result, "Model connection test failed" logger.info("Models initialized successfully") return "✅ Models initialized successfully" except Exception as e: error_msg = f"❌ Model initialization failed: {e}" logger.error(error_msg) return error_msg class GradioProgressManager: def __init__(self, task_id: str, stages: list[str], progress_callback=None): self.task_id = task_id self.stages = stages self.progress_callback = progress_callback self.failed = False self.current_stage = 0 self.total_stages = len(stages) async def report_progress(self): self.current_stage += 1 progress = int((self.current_stage / self.total_stages) * 100) status = f"Stage: {self.stages[self.current_stage - 1]}" if self.progress_callback: self.progress_callback(progress, status) return progress, status async def fail_stage(self, error_message: str): error_status = f"{self.stages[self.current_stage]} Error: {error_message}" if self.progress_callback: self.progress_callback(100, error_status) self.failed = True logger.error(f"{self.task_id}: {error_status}") return error_status def generate_ppt(pptx_file, pdf_file, num_pages, progress=gr.Progress()): """Generate PPT from template and PDF""" try: # Make sure models are initialized if models is None: return None, "❌ Please initialize models first in the Configuration tab" # Create task ID task_id = datetime.now().strftime("20%y-%m-%d") + "/" + str(uuid.uuid4()) logger.info(f"PPT generation task created: {task_id}") # Create directories os.makedirs(pjoin(RUNS_DIR, task_id), exist_ok=True) task = { "numberOfPages": num_pages, "pptx": "default_template", } # Handle PPT template if pptx_file is not None: pptx_blob = open(pptx_file, "rb").read() pptx_md5 = hashlib.md5(pptx_blob).hexdigest() task["pptx"] = pptx_md5 pptx_dir = pjoin(RUNS_DIR, "pptx", pptx_md5) if not os.path.exists(pptx_dir): os.makedirs(pptx_dir, exist_ok=True) with open(pjoin(pptx_dir, "source.pptx"), "wb") as f: f.write(pptx_blob) # Handle PDF if pdf_file is not None: pdf_blob = open(pdf_file, "rb").read() pdf_md5 = hashlib.md5(pdf_blob).hexdigest() task["pdf"] = pdf_md5 pdf_dir = pjoin(RUNS_DIR, "pdf", pdf_md5) if not os.path.exists(pdf_dir): os.makedirs(pdf_dir, exist_ok=True) with open(pjoin(pdf_dir, "source.pdf"), "wb") as f: f.write(pdf_blob) else: return None, "❌ Please provide a PDF file" progress_store[task_id] = task # Progress callback def update_progress(prog, status): progress(prog / 100, desc=status) # Run PPT generation directly in main thread event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) final_ppt_path = loop.run_until_complete(ppt_gen_async(task_id, update_progress)) if final_ppt_path and os.path.exists(final_ppt_path): # Copy to Gradio-safe location safe_path = copy_to_gradio_safe_path(final_ppt_path, "generated_presentation.pptx") return safe_path, f"✅ PPT generated successfully! Task ID: {task_id}" else: return None, "❌ PPT generation failed" except Exception as e: logger.error(f"PPT generation error: {str(e)}") traceback.print_exc() return None, f"❌ Error: {str(e)}" def generate_ppt_with_template_selection(selected_template_name, custom_pptx_file, pdf_file, num_pages, progress=gr.Progress()): """Generate PPT with template selection""" try: # Make sure models are initialized if models is None: return None, "❌ Please initialize models first in the Configuration tab" # 确定使用的模板文件 pptx_file = None if selected_template_name and selected_template_name != "Upload Custom": # 使用默认模板 templates = get_default_templates() for template in templates: if template["name"] == selected_template_name: pptx_file = template["path"] break else: # 使用上传的自定义模板 pptx_file = custom_pptx_file # 调用原有的generate_ppt函数 return generate_ppt(pptx_file, pdf_file, num_pages, progress) except Exception as e: logger.error(f"PPT generation with template selection error: {str(e)}") return None, f"❌ Error: {str(e)}" async def ppt_gen_async(task_id: str, progress_callback=None): """Async PPT generation function""" try: if DEBUG: importlib.reload(induct) importlib.reload(pptgen) task = progress_store[task_id] pptx_md5 = task["pptx"] pdf_md5 = task["pdf"] generation_config = Config(pjoin(RUNS_DIR, task_id)) pptx_config = Config(pjoin(RUNS_DIR, "pptx", pptx_md5)) json.dump(task, open(pjoin(generation_config.RUN_DIR, "task.json"), "w")) progress_manager = GradioProgressManager(task_id, STAGES, progress_callback) parsedpdf_dir = pjoin(RUNS_DIR, "pdf", pdf_md5) ppt_image_folder = pjoin(pptx_config.RUN_DIR, "slide_images") if progress_callback: progress_callback(10, "Task initialized successfully") # PPT parsing presentation = Presentation.from_file( pjoin(pptx_config.RUN_DIR, "source.pptx"), pptx_config ) if not os.path.exists(ppt_image_folder) or len(os.listdir(ppt_image_folder)) != len(presentation): await ppt_to_images_async( pjoin(pptx_config.RUN_DIR, "source.pptx"), ppt_image_folder ) # Handle error slides for err_idx, _ in presentation.error_history: error_file = pjoin(ppt_image_folder, f"slide_{err_idx:04d}.jpg") if os.path.exists(error_file): os.remove(error_file) # Rename slides for i, slide in enumerate(presentation.slides, 1): slide.slide_idx = i old_path = pjoin(ppt_image_folder, f"slide_{slide.real_idx:04d}.jpg") new_path = pjoin(ppt_image_folder, f"slide_{slide.slide_idx:04d}.jpg") if os.path.exists(old_path): os.rename(old_path, new_path) # Image labeling labler = ImageLabler(presentation, pptx_config) stats_file = pjoin(pptx_config.RUN_DIR, "image_stats.json") if os.path.exists(stats_file): image_stats = json.load(open(stats_file, encoding="utf-8")) labler.apply_stats(image_stats) else: await labler.caption_images_async(models.vision_model) json.dump( labler.image_stats, open(stats_file, "w", encoding="utf-8"), ensure_ascii=False, indent=4, ) await progress_manager.report_progress() # PDF parsing source_md_path = pjoin(parsedpdf_dir, "source.md") if not os.path.exists(source_md_path): # Check if we have a PDF file pdf_file_path = pjoin(RUNS_DIR, "pdf", pdf_md5, "source.pdf") if os.path.exists(pdf_file_path): text_content = parse_pdf( pdf_file_path, parsedpdf_dir, models.marker_model, ) else: raise ValueError("No PDF file found") else: text_content = open(source_md_path, encoding="utf-8").read() await progress_manager.report_progress() # Document refine refined_doc_path = pjoin(parsedpdf_dir, "refined_doc.json") if not os.path.exists(refined_doc_path): source_doc = await Document.from_markdown_async( text_content, models.language_model, models.vision_model, parsedpdf_dir, ) json.dump( source_doc.to_dict(), open(refined_doc_path, "w"), ensure_ascii=False, indent=4, ) else: source_doc_dict = json.load(open(refined_doc_path)) source_doc = Document.from_dict(source_doc_dict, parsedpdf_dir) await progress_manager.report_progress() # Slide Induction slide_induction_path = pjoin(pptx_config.RUN_DIR, "slide_induction.json") if not os.path.exists(slide_induction_path): deepcopy(presentation).save( pjoin(pptx_config.RUN_DIR, "template.pptx"), layout_only=True ) await ppt_to_images_async( pjoin(pptx_config.RUN_DIR, "template.pptx"), pjoin(pptx_config.RUN_DIR, "template_images"), ) slide_inducter = induct.SlideInducterAsync( presentation, ppt_image_folder, pjoin(pptx_config.RUN_DIR, "template_images"), pptx_config, models.image_model, models.language_model, models.vision_model, ) layout_induction = await slide_inducter.layout_induct() slide_induction = await slide_inducter.content_induct(layout_induction) json.dump( slide_induction, open(slide_induction_path, "w", encoding="utf-8"), ensure_ascii=False, indent=4, ) else: slide_induction = json.load(open(slide_induction_path, encoding="utf-8")) await progress_manager.report_progress() # PPT Generation ppt_agent = pptgen.PPTAgentAsync( models.text_model, models.language_model, models.vision_model, error_exit=False, retry_times=5, ) ppt_agent.set_reference( config=generation_config, slide_induction=slide_induction, presentation=presentation, ) prs, _ = await ppt_agent.generate_pres( source_doc=source_doc, num_slides=task["numberOfPages"], ) final_path = pjoin(generation_config.RUN_DIR, "final.pptx") prs.save(final_path) logger.info(f"{task_id}: generation finished") await progress_manager.report_progress() return final_path except Exception as e: logger.error(f"PPT generation failed: {str(e)}") traceback.print_exc() return None def ppt_to_video(ppt_file, progress=gr.Progress()): """Convert PPT to video presentation""" try: task_id = str(uuid.uuid4()) logger.info(f"PPT2Video task created: {task_id}") task_dir = pjoin(RUNS_DIR, "ppt_video", task_id) os.makedirs(task_dir, exist_ok=True) # Copy PPT file ppt_blob = open(ppt_file, "rb").read() ppt_path = pjoin(task_dir, "source.pptx") with open(ppt_path, "wb") as f: f.write(ppt_blob) # Initialize progress ppt_video_progress_store[task_id] = { "status": "processing", "current_step": 1, "current_slide": 0, "total_slides": 0, "progress_percentage": 0, "task_dir": task_dir, "ppt_path": ppt_path } # Progress callback def update_progress(prog, status): progress(prog, desc=status) # Run PPT to video conversion directly in main thread event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) video_path = loop.run_until_complete(process_ppt_to_video_async(task_id, update_progress)) if video_path and os.path.exists(video_path): # Copy to Gradio-safe location safe_path = copy_to_gradio_safe_path(video_path, "generated_video.mp4") return safe_path, f"✅ Video generated successfully! Task ID: {task_id}" else: return None, "❌ Video generation failed" except Exception as e: logger.error(f"PPT to video error: {str(e)}") return None, f"❌ Error: {str(e)}" async def process_ppt_to_video_async(task_id: str, progress_callback): """Process PPT to video asynchronously""" try: task_dir = ppt_video_progress_store[task_id]["task_dir"] ppt_path = ppt_video_progress_store[task_id]["ppt_path"] progress_callback(0.1, "Converting PPT to PDF...") # Convert PPT to PDF pdf_path = pjoin(task_dir, "source.pdf") await run_cmd([ "libreoffice", "--headless", "--convert-to", "pdf", ppt_path, "--outdir", task_dir ]) # Convert PDF to images images_from_path = await run_blocking(convert_from_path, pdf_path) prs = await run_blocking(PptxPresentation, ppt_path) if len(images_from_path) != len(prs.slides): raise Exception("PPT页数与生成的图片数量不匹配") progress_callback(0.2, "Extracting slides...") # Generate video segments video_segments = [] with tempfile.TemporaryDirectory() as temp_path: total_slides = len(prs.slides) for i, (slide, image) in enumerate(zip(prs.slides, images_from_path)): slide_progress = 0.3 + (i / total_slides) * 0.4 progress_callback(slide_progress, f"Processing slide {i + 1}/{total_slides}") # Get notes notes = "" if slide.has_notes_slide: notes = slide.notes_slide.notes_text_frame.text if not notes.strip(): notes = f"This is slide {i + 1}" # Save image image_path = pjoin(temp_path, f"frame_{i}.jpg") image.save(image_path) # Generate audio audio_path = pjoin(temp_path, f"frame_{i}.wav") await generate_tts_audio(notes, audio_path) # Create video segment video_segment_path = await create_video_segment( image_path, audio_path, temp_path, i ) video_segments.append(video_segment_path) progress_callback(0.8, "Merging video segments...") # Merge video segments output_video_path = pjoin(task_dir, "output.mp4") await merge_video_segments(video_segments, output_video_path) progress_callback(1.0, "Video generation completed!") ppt_video_progress_store[task_id]["status"] = "completed" return output_video_path except Exception as e: logger.error(f"PPT2Video processing failed {task_id}: {e}") ppt_video_progress_store[task_id]["status"] = "failed" return None async def generate_tts_audio(text: str, output_path: str): """Generate TTS audio""" try: # Try to use MegaTTS3 if available sys.path.append(pjoin(os.path.dirname(__file__), "MegaTTS3")) from tts.infer_cli import MegaTTS3DiTInfer from tts.utils.audio_utils.io import save_wav infer = MegaTTS3DiTInfer(ckpt_root=pjoin(os.path.dirname(__file__), "MegaTTS3", "checkpoints")) prompt_audio_path = pjoin(os.path.dirname(__file__), "MegaTTS3", "assets", "English_prompt.wav") with open(prompt_audio_path, 'rb') as f: audio_bytes = f.read() latent_file = None potential_npy = os.path.splitext(prompt_audio_path)[0] + '.npy' if os.path.isfile(potential_npy): latent_file = potential_npy resource_context = infer.preprocess(audio_bytes, latent_file) wav_bytes = infer.forward( resource_context, text, time_step=32, p_w=1.6, t_w=2.5 ) save_wav(wav_bytes, output_path) except Exception as e: logger.error(f"TTS failed: {str(e)}") # Fallback: create silent audio import numpy as np import wave sample_rate = 22050 duration = 3.0 samples = np.zeros(int(sample_rate * duration), dtype=np.int16) with wave.open(output_path, 'w') as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(sample_rate) wav_file.writeframes(samples.tobytes()) async def create_video_segment(image_path: str, audio_path: str, temp_path: str, index: int): """Create video segment from image and audio""" output_path = pjoin(temp_path, f"segment_{index}.mp4") await run_cmd([ "ffmpeg", "-y", "-loop", "1", "-i", image_path, "-i", audio_path, "-vf", "scale=1920:1080", "-c:v", "libx264", "-tune", "stillimage", "-c:a", "aac", "-b:a", "192k", "-pix_fmt", "yuv420p", "-shortest", output_path ]) return output_path async def merge_video_segments(video_segments: list[str], output_path: str): """Merge video segments""" list_file_path = output_path.replace('.mp4', '_list.txt') with open(list_file_path, "w") as f: for seg in video_segments: f.write(f"file '{seg}'\n") await run_cmd([ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file_path, "-c", "copy", output_path ]) os.remove(list_file_path) def cleanup_temp_files(): """Clean up old temporary files""" try: import glob import time # Remove files older than 1 hour cutoff_time = time.time() - 3600 for file_path in glob.glob(os.path.join(GRADIO_TEMP_DIR, "*")): if os.path.getctime(file_path) < cutoff_time: try: os.remove(file_path) logger.info(f"Cleaned up old temp file: {file_path}") except Exception as e: logger.warning(f"Failed to remove temp file {file_path}: {e}") except Exception as e: logger.warning(f"Cleanup failed: {e}") # Gradio interface def create_gradio_interface(): """Create Gradio interface""" with gr.Blocks(title="PresentAgent", theme=gr.themes.Soft()) as demo: gr.Markdown("# PresentAgent - PowerPoint Generation and Presentation Creation") with gr.Tabs(): # Model Configuration Tab with gr.TabItem("🔧 Configuration"): gr.Markdown("## Model Configuration") gr.Markdown( "Configure your API settings and model parameters before using the PPT generation features.") with gr.Row(): with gr.Column(): api_key_input = gr.Textbox( label="API Key", type="password", placeholder="Enter your OpenAI API key", value="" ) api_base_input = gr.Textbox( label="API Base URL", placeholder="https://api.openai.com/v1", value="" ) with gr.Row(): language_model_input = gr.Textbox( label="Language Model", placeholder="Model for text generation", value="gpt-4o" ) vision_model_input = gr.Textbox( label="Vision Model", placeholder="Model for image processing", value="gpt-4o" ) text_model_input = gr.Textbox( label="Text Embedding Model", placeholder="Model for text embeddings", value="text-embedding-3-small" ) init_btn = gr.Button("Initialize Models", variant="primary", size="lg") with gr.Column(): init_status = gr.Textbox( label="Initialization Status", interactive=False, lines=3 ) gr.Markdown(""" ### Instructions: 1. Enter your API key and base URL 2. Configure model names (defaults are recommended) 3. Click "Initialize Models" to test the connection 4. Once initialized, you can use the PPT generation features """) init_btn.click( fn=init_models, inputs=[api_key_input, api_base_input, language_model_input, vision_model_input, text_model_input], outputs=[init_status] ) with gr.TabItem("📊 PPT Generation"): gr.Markdown("## Generate PowerPoint from Template and PDF") with gr.Row(): with gr.Column(): # 模板选择区域 template_radio, template_preview, custom_upload, selected_template_display = create_template_selection() # PDF输入 pdf_input = gr.File( label="PDF Document", file_types=[".pdf"], type="filepath" ) # 页数选择 num_pages_input = gr.Slider( minimum=1, maximum=50, value=10, step=1, label="Number of Slides" ) generate_btn = gr.Button("Generate PPT", variant="primary", size="lg") with gr.Column(): ppt_output = gr.File(label="Generated PPT") ppt_status = gr.Textbox(label="Status", interactive=False, lines=3) # 绑定模板选择事件 template_radio.change( fn=select_template, inputs=[template_radio], outputs=[custom_upload, selected_template_display, gr.State()] ) # 绑定生成按钮 generate_btn.click( fn=generate_ppt_with_template_selection, inputs=[template_radio, custom_upload, pdf_input, num_pages_input], outputs=[ppt_output, ppt_status] ) # PPT to Video Tab with gr.TabItem("🎬 PPT to Presentation"): gr.Markdown("## Convert PowerPoint to Video Presentation") with gr.Row(): with gr.Column(): ppt_video_input = gr.File( label="PowerPoint File", file_types=[".pptx"], type="filepath" ) video_btn = gr.Button("Convert to Video", variant="primary", size="lg") with gr.Column(): video_output = gr.File(label="Generated Video") video_status = gr.Textbox(label="Status", interactive=False, lines=3) video_btn.click( fn=ppt_to_video, inputs=[ppt_video_input], outputs=[video_output, video_status] ) return demo def setup_template_directories(): """设置模板目录结构""" base_dir = os.path.dirname(__file__) template_dir = os.path.join(base_dir, "templates") preview_dir = os.path.join(template_dir, "previews") os.makedirs(template_dir, exist_ok=True) os.makedirs(preview_dir, exist_ok=True) logger.info(f"Template directories created at: {template_dir}") logger.info("Please place your default template files in the templates directory") logger.info("Please place corresponding preview images in the templates/previews directory") # Main function if __name__ == "__main__": # Create runs directory os.makedirs(RUNS_DIR, exist_ok=True) os.makedirs(pjoin(RUNS_DIR, "feedback"), exist_ok=True) setup_template_directories() # Clean up old temp files cleanup_temp_files() # Create and launch Gradio interface demo = create_gradio_interface() # Launch with allowed paths demo.queue().launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True, allowed_paths=[RUNS_DIR] )