Spaces:
Runtime error
Runtime error
| import os | |
| import subprocess | |
| import json | |
| def stitch_and_caption( | |
| segment_videos, | |
| audio_path, | |
| transcription_segments, | |
| template_name, | |
| work_dir=".", | |
| crossfade_duration=0.25 | |
| ): | |
| """ | |
| Stitch video segments with crossfade transitions, add original audio, and overlay kinetic captions. | |
| Args: | |
| segment_videos (list): List of file paths for the video segments. | |
| audio_path (str): Path to the original audio file. | |
| transcription_segments (list): The list of segment dictionaries from segment.py, including text and word timestamps. | |
| template_name (str): The name of the PyCaps template to use. | |
| work_dir (str): The working directory for temporary and final files. | |
| crossfade_duration (float): Duration of crossfade transitions in seconds (0 for hard cuts). | |
| Returns: | |
| str: The path to the final subtitled video. | |
| """ | |
| if not segment_videos: | |
| raise RuntimeError("No video segments to stitch.") | |
| stitched_path = os.path.join(work_dir, "stitched.mp4") | |
| final_path = os.path.join(work_dir, "final_video.mp4") | |
| # 1. Stitch video segments together with crossfades using ffmpeg | |
| print("Stitching video segments with crossfades...") | |
| try: | |
| # Get accurate durations for each video segment using ffprobe | |
| durations = [_get_video_duration(seg_file) for seg_file in segment_videos] | |
| cross_dur = crossfade_duration # Crossfade duration in seconds | |
| # Handle the case where crossfade is disabled (hard cuts) | |
| if cross_dur <= 0: | |
| # Use concat demuxer for hard cuts (more reliable for exact segment timing) | |
| concat_file = os.path.join(work_dir, "concat_list.txt") | |
| with open(concat_file, "w") as f: | |
| for seg_file in segment_videos: | |
| f.write(f"file '{os.path.abspath(seg_file)}'\n") | |
| # Run ffmpeg with concat demuxer | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", concat_file, | |
| "-i", audio_path, | |
| "-c:v", "copy", # Copy video stream without re-encoding for speed | |
| "-c:a", "aac", | |
| "-b:a", "192k", | |
| "-map", "0:v", | |
| "-map", "1:a", | |
| "-shortest", | |
| stitched_path | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| else: | |
| # Build the complex filter string for ffmpeg with crossfades | |
| inputs = [] | |
| filter_complex_parts = [] | |
| stream_labels = [] | |
| # Prepare inputs and initial stream labels | |
| for i, seg_file in enumerate(segment_videos): | |
| inputs.extend(["-i", seg_file]) | |
| stream_labels.append(f"[{i}:v]") | |
| # If only one video, no stitching needed, just prep for subtitling | |
| if len(segment_videos) == 1: | |
| final_video_stream = "[0:v]" | |
| filter_complex_str = f"[0:v]format=yuv420p[video]" | |
| else: | |
| # Sequentially chain xfade filters | |
| last_stream_label = stream_labels[0] | |
| current_offset = 0.0 | |
| for i in range(len(segment_videos) - 1): | |
| current_offset += durations[i] - cross_dur | |
| next_stream_label = f"v{i+1}" | |
| filter_complex_parts.append( | |
| f"{last_stream_label}{stream_labels[i+1]}" | |
| f"xfade=transition=fade:duration={cross_dur}:offset={current_offset}" | |
| f"[{next_stream_label}]" | |
| ) | |
| last_stream_label = f"[{next_stream_label}]" | |
| final_video_stream = last_stream_label | |
| filter_complex_str = ";".join(filter_complex_parts) | |
| filter_complex_str += f";{final_video_stream}format=yuv420p[video]" | |
| # Construct the full ffmpeg command | |
| cmd = ["ffmpeg", "-y"] | |
| cmd.extend(inputs) | |
| cmd.extend(["-i", audio_path]) # Add original audio as the last input | |
| cmd.extend([ | |
| "-filter_complex", filter_complex_str, | |
| "-map", "[video]", # Map the final video stream | |
| "-map", f"{len(segment_videos)}:a", # Map the audio stream | |
| "-c:v", "libx264", | |
| "-crf", "18", | |
| "-preset", "fast", | |
| "-c:a", "aac", | |
| "-b:a", "192k", | |
| "-shortest", # Finish encoding when the shortest stream ends | |
| stitched_path | |
| ]) | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| except subprocess.CalledProcessError as e: | |
| print("Error during ffmpeg stitching:") | |
| print("FFMPEG stdout:", e.stdout) | |
| print("FFMPEG stderr:", e.stderr) | |
| raise RuntimeError("FFMPEG stitching failed.") from e | |
| # 2. Use PyCaps to render captions on the stitched video | |
| print("Overlaying kinetic subtitles...") | |
| # Save the real transcription data to a JSON file for PyCaps | |
| transcription_json_path = os.path.join(work_dir, "transcription_for_pycaps.json") | |
| _save_whisper_json(transcription_segments, transcription_json_path) | |
| # Run pycaps render command | |
| try: | |
| pycaps_cmd = [ | |
| "pycaps", "render", | |
| "--input", stitched_path, | |
| "--template", os.path.join("templates", template_name), | |
| "--whisper-json", transcription_json_path, | |
| "--output", final_path | |
| ] | |
| subprocess.run(pycaps_cmd, check=True, capture_output=True, text=True) | |
| except FileNotFoundError: | |
| raise RuntimeError("`pycaps` command not found. Make sure pycaps is installed correctly (e.g., `pip install git+https://github.com/francozanardi/pycaps.git`).") | |
| except subprocess.CalledProcessError as e: | |
| print("Error during PyCaps subtitle rendering:") | |
| print("PyCaps stdout:", e.stdout) | |
| print("PyCaps stderr:", e.stderr) | |
| raise RuntimeError("PyCaps rendering failed.") from e | |
| return final_path | |
| def _get_video_duration(file_path): | |
| """Get video duration in seconds using ffprobe.""" | |
| try: | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-select_streams", "v:0", | |
| "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", | |
| file_path | |
| ] | |
| output = subprocess.check_output(cmd, text=True).strip() | |
| return float(output) | |
| except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e: | |
| print(f"Warning: Could not get duration for {file_path}. Error: {e}. Falling back to 0.0.") | |
| return 0.0 | |
| def _save_whisper_json(transcription_segments, json_path): | |
| """ | |
| Saves the transcription segments into a Whisper-formatted JSON file for PyCaps. | |
| Args: | |
| transcription_segments (list): A list of segment dictionaries, each containing | |
| 'start', 'end', 'text', and 'words' keys. | |
| json_path (str): The file path to save the JSON data. | |
| """ | |
| print(f"Saving transcription to {json_path} for subtitling...") | |
| # The structure pycaps expects is a dictionary with a "segments" key, | |
| # which contains the list of segment dictionaries. | |
| output_data = { | |
| "text": " ".join([seg.get('text', '') for seg in transcription_segments]), | |
| "segments": transcription_segments, | |
| "language": "en" | |
| } | |
| try: | |
| with open(json_path, 'w', encoding='utf-8') as f: | |
| json.dump(output_data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to write transcription JSON file at {json_path}") from e |