Spaces:
Runtime error
Runtime error
| """ | |
| Audio segment processing for creating meaningful lyric segments for video generation. | |
| This module takes Whisper transcription results and intelligently segments them | |
| at natural pause points for synchronized video scene changes. | |
| """ | |
| import re | |
| from typing import List, Dict, Any | |
| def segment_lyrics(transcription_result: Dict[str, Any], min_segment_duration: float = 2.0, max_segment_duration: float = 8.0) -> List[Dict[str, Any]]: | |
| """ | |
| Segment the transcription into meaningful chunks for video generation. | |
| This function takes the raw Whisper transcription and creates logical segments | |
| by identifying natural pause points in the audio. Each segment represents | |
| a coherent lyrical phrase that will correspond to one video scene. | |
| Args: | |
| transcription_result: Dictionary from Whisper transcription containing 'segments' | |
| min_segment_duration: Minimum duration for a segment in seconds | |
| max_segment_duration: Maximum duration for a segment in seconds | |
| Returns: | |
| List of segment dictionaries with keys: | |
| - 'text': The lyrical text for this segment | |
| - 'start': Start time in seconds | |
| - 'end': End time in seconds | |
| - 'words': List of word-level timestamps (if available) | |
| """ | |
| if not transcription_result or 'segments' not in transcription_result: | |
| return [] | |
| raw_segments = transcription_result['segments'] | |
| if not raw_segments: | |
| return [] | |
| # First, merge very short segments and split very long ones | |
| processed_segments = [] | |
| for segment in raw_segments: | |
| duration = segment.get('end', 0) - segment.get('start', 0) | |
| text = segment.get('text', '').strip() | |
| if duration < min_segment_duration: | |
| # Try to merge with previous segment if it exists and won't exceed max duration | |
| if (processed_segments and | |
| (processed_segments[-1]['end'] - processed_segments[-1]['start'] + duration) <= max_segment_duration): | |
| # Merge with previous segment | |
| processed_segments[-1]['text'] += ' ' + text | |
| processed_segments[-1]['end'] = segment.get('end', processed_segments[-1]['end']) | |
| if 'words' in segment and 'words' in processed_segments[-1]: | |
| processed_segments[-1]['words'].extend(segment['words']) | |
| else: | |
| # Add as new segment even if short | |
| processed_segments.append({ | |
| 'text': text, | |
| 'start': segment.get('start', 0), | |
| 'end': segment.get('end', 0), | |
| 'words': segment.get('words', []) | |
| }) | |
| elif duration > max_segment_duration: | |
| # Split long segments at natural break points | |
| split_segments = _split_long_segment(segment, max_segment_duration) | |
| processed_segments.extend(split_segments) | |
| else: | |
| # Duration is just right | |
| processed_segments.append({ | |
| 'text': text, | |
| 'start': segment.get('start', 0), | |
| 'end': segment.get('end', 0), | |
| 'words': segment.get('words', []) | |
| }) | |
| # Second pass: apply intelligent segmentation based on content | |
| final_segments = _apply_intelligent_segmentation(processed_segments, max_segment_duration) | |
| # Ensure no empty segments | |
| final_segments = [seg for seg in final_segments if seg['text'].strip()] | |
| return final_segments | |
| def _split_long_segment(segment: Dict[str, Any], max_duration: float) -> List[Dict[str, Any]]: | |
| """ | |
| Split a long segment into smaller ones at natural break points. | |
| """ | |
| text = segment.get('text', '').strip() | |
| words = segment.get('words', []) | |
| start_time = segment.get('start', 0) | |
| end_time = segment.get('end', 0) | |
| duration = end_time - start_time | |
| if not words or duration <= max_duration: | |
| return [segment] | |
| # Try to split at punctuation marks or word boundaries | |
| split_points = [] | |
| # Find punctuation-based split points | |
| for i, word in enumerate(words): | |
| word_text = word.get('word', '').strip() | |
| if re.search(r'[.!?;,:]', word_text): | |
| split_points.append(i) | |
| # If no punctuation, split at word boundaries roughly evenly | |
| if not split_points: | |
| target_splits = int(duration / max_duration) | |
| words_per_split = len(words) // (target_splits + 1) | |
| split_points = [i * words_per_split for i in range(1, target_splits + 1) if i * words_per_split < len(words)] | |
| if not split_points: | |
| return [segment] | |
| # Create segments from split points | |
| segments = [] | |
| last_idx = 0 | |
| for split_idx in split_points: | |
| if split_idx >= len(words): | |
| continue | |
| segment_words = words[last_idx:split_idx + 1] | |
| if segment_words: | |
| segments.append({ | |
| 'text': ' '.join([w.get('word', '') for w in segment_words]).strip(), | |
| 'start': segment_words[0].get('start', start_time), | |
| 'end': segment_words[-1].get('end', end_time), | |
| 'words': segment_words | |
| }) | |
| last_idx = split_idx + 1 | |
| # Add remaining words as final segment | |
| if last_idx < len(words): | |
| segment_words = words[last_idx:] | |
| segments.append({ | |
| 'text': ' '.join([w.get('word', '') for w in segment_words]).strip(), | |
| 'start': segment_words[0].get('start', start_time), | |
| 'end': segment_words[-1].get('end', end_time), | |
| 'words': segment_words | |
| }) | |
| return segments | |
| def _apply_intelligent_segmentation(segments: List[Dict[str, Any]], max_duration: float) -> List[Dict[str, Any]]: | |
| """ | |
| Apply intelligent segmentation rules based on lyrical content and timing. | |
| """ | |
| if not segments: | |
| return [] | |
| final_segments = [] | |
| current_segment = None | |
| for segment in segments: | |
| text = segment['text'].strip() | |
| # Skip empty segments | |
| if not text: | |
| continue | |
| # If no current segment, start a new one | |
| if current_segment is None: | |
| current_segment = segment.copy() | |
| continue | |
| # Check if we should merge with current segment | |
| should_merge = _should_merge_segments(current_segment, segment, max_duration) | |
| if should_merge: | |
| # Merge segments | |
| current_segment['text'] += ' ' + segment['text'] | |
| current_segment['end'] = segment['end'] | |
| if 'words' in segment and 'words' in current_segment: | |
| current_segment['words'].extend(segment['words']) | |
| else: | |
| # Finalize current segment and start new one | |
| final_segments.append(current_segment) | |
| current_segment = segment.copy() | |
| # Add the last segment | |
| if current_segment is not None: | |
| final_segments.append(current_segment) | |
| return final_segments | |
| def _should_merge_segments(current: Dict[str, Any], next_seg: Dict[str, Any], max_duration: float) -> bool: | |
| """ | |
| Determine if two segments should be merged based on content and timing. | |
| """ | |
| # Check duration constraint | |
| merged_duration = next_seg['end'] - current['start'] | |
| if merged_duration > max_duration: | |
| return False | |
| current_text = current['text'].strip() | |
| next_text = next_seg['text'].strip() | |
| # Don't merge if current segment ends with strong punctuation | |
| if re.search(r'[.!?]$', current_text): | |
| return False | |
| # Merge if current segment is very short (likely incomplete phrase) | |
| if len(current_text.split()) < 3: | |
| return True | |
| # Merge if next segment starts with a lowercase word (continuation) | |
| if next_text and next_text[0].islower(): | |
| return True | |
| # Merge if there's a short gap between segments (< 0.5 seconds) | |
| gap = next_seg['start'] - current['end'] | |
| if gap < 0.5: | |
| return True | |
| # Don't merge by default | |
| return False | |
| def get_segment_info(segments: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Get summary information about the segments. | |
| Args: | |
| segments: List of segment dictionaries | |
| Returns: | |
| Dictionary with segment statistics | |
| """ | |
| if not segments: | |
| return { | |
| 'total_segments': 0, | |
| 'total_duration': 0, | |
| 'average_duration': 0, | |
| 'shortest_duration': 0, | |
| 'longest_duration': 0 | |
| } | |
| durations = [seg['end'] - seg['start'] for seg in segments] | |
| total_duration = segments[-1]['end'] - segments[0]['start'] if segments else 0 | |
| return { | |
| 'total_segments': len(segments), | |
| 'total_duration': total_duration, | |
| 'average_duration': sum(durations) / len(durations), | |
| 'shortest_duration': min(durations), | |
| 'longest_duration': max(durations), | |
| 'segments_preview': [{'text': seg['text'][:50] + '...', 'duration': seg['end'] - seg['start']} for seg in segments[:5]] | |
| } |