Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import os | |
| import json | |
| from numbers import Number | |
| from tqdm import tqdm | |
| from ._cmd_utils import ffmpeg_has_loudnorm # get_ffmpeg_exe | |
| from ._media_file import MediaFile | |
| from ._errors import FFmpegNormalizeError | |
| from ._logger import setup_custom_logger | |
| logger = setup_custom_logger("ffmpeg_normalize") | |
| NORMALIZATION_TYPES = ["ebu", "rms", "peak"] | |
| PCM_INCOMPATIBLE_FORMATS = ["mp4", "mp3", "ogg", "webm"] | |
| PCM_INCOMPATIBLE_EXTS = ["mp4", "m4a", "mp3", "ogg", "webm"] | |
| def check_range(number, min_r, max_r, name=""): | |
| """ | |
| Check if a number is within a given range | |
| """ | |
| try: | |
| number = float(number) | |
| if number < min_r or number > max_r: | |
| raise FFmpegNormalizeError( | |
| f"{name} must be within [{min_r},{max_r}]" | |
| ) | |
| return number | |
| pass | |
| except Exception as e: | |
| raise e | |
| class FFmpegNormalize: | |
| """ | |
| ffmpeg-normalize class. | |
| """ | |
| def __init__( | |
| self, | |
| normalization_type="ebu", | |
| target_level=-23.0, | |
| print_stats=False, | |
| # threshold=0.5, | |
| loudness_range_target=7.0, | |
| true_peak=-2.0, | |
| offset=0.0, | |
| dual_mono=False, | |
| audio_codec="pcm_s16le", | |
| audio_bitrate=None, | |
| sample_rate=None, | |
| keep_original_audio=False, | |
| pre_filter=None, | |
| post_filter=None, | |
| video_codec="copy", | |
| video_disable=False, | |
| subtitle_disable=False, | |
| metadata_disable=False, | |
| chapters_disable=False, | |
| extra_input_options=None, | |
| extra_output_options=None, | |
| output_format=None, | |
| dry_run=False, | |
| debug=False, | |
| progress=False, | |
| ffmpeg_exe=None | |
| ): | |
| # self.ffmpeg_exe = get_ffmpeg_exe() | |
| self.ffmpeg_exe = ffmpeg_exe | |
| self.has_loudnorm_capabilities = ffmpeg_has_loudnorm(self.ffmpeg_exe) | |
| self.normalization_type = normalization_type | |
| if not self.has_loudnorm_capabilities and self.normalization_type == "ebu": | |
| raise FFmpegNormalizeError( | |
| "Your ffmpeg version does not support the 'loudnorm' EBU R128 filter. " | |
| "Please install ffmpeg v3.1 or above, or choose another normalization type." | |
| ) | |
| if self.normalization_type == "ebu": | |
| self.target_level = check_range(target_level, -70, -5, name="target_level") | |
| else: | |
| self.target_level = check_range(target_level, -99, 0, name="target_level") | |
| self.print_stats = print_stats | |
| # self.threshold = float(threshold) | |
| self.loudness_range_target = check_range( | |
| loudness_range_target, 1, 20, name="loudness_range_target" | |
| ) | |
| self.true_peak = check_range(true_peak, -9, 0, name="true_peak") | |
| self.offset = check_range(offset, -99, 99, name="offset") | |
| self.dual_mono = True if dual_mono in ["true", True] else False | |
| self.audio_codec = audio_codec | |
| self.audio_bitrate = audio_bitrate | |
| self.sample_rate = int(sample_rate) if sample_rate is not None else None | |
| self.keep_original_audio = keep_original_audio | |
| self.video_codec = video_codec | |
| self.video_disable = video_disable | |
| self.subtitle_disable = subtitle_disable | |
| self.metadata_disable = metadata_disable | |
| self.chapters_disable = chapters_disable | |
| self.extra_input_options = extra_input_options | |
| self.extra_output_options = extra_output_options | |
| self.pre_filter = pre_filter | |
| self.post_filter = post_filter | |
| self.output_format = output_format | |
| self.dry_run = dry_run | |
| self.debug = debug | |
| self.progress = progress | |
| self.stats = [] | |
| if ( | |
| self.output_format | |
| and (self.audio_codec is None or "pcm" in self.audio_codec) | |
| and self.output_format in PCM_INCOMPATIBLE_FORMATS | |
| ): | |
| raise FFmpegNormalizeError( | |
| f"Output format {self.output_format} does not support PCM audio. " | |
| + "Please choose a suitable audio codec with the -c:a option." | |
| ) | |
| if normalization_type not in NORMALIZATION_TYPES: | |
| raise FFmpegNormalizeError( | |
| f"Normalization type must be one of {NORMALIZATION_TYPES}" | |
| ) | |
| if self.target_level and not isinstance(self.target_level, Number): | |
| raise FFmpegNormalizeError("target_level must be a number") | |
| if self.loudness_range_target and not isinstance( | |
| self.loudness_range_target, Number | |
| ): | |
| raise FFmpegNormalizeError("loudness_range_target must be a number") | |
| if self.true_peak and not isinstance(self.true_peak, Number): | |
| raise FFmpegNormalizeError("true_peak must be a number") | |
| if float(target_level) > 0: | |
| raise FFmpegNormalizeError("Target level must be below 0") | |
| self.media_files = [] | |
| self.file_count = 0 | |
| def add_media_file(self, input_file, output_file): | |
| """ | |
| Add a media file to normalize | |
| Arguments: | |
| input_file {str} -- Path to input file | |
| output_file {str} -- Path to output file | |
| """ | |
| if not os.path.exists(input_file): | |
| raise FFmpegNormalizeError("file " + input_file + " does not exist") | |
| ext = os.path.splitext(output_file)[1][1:] | |
| if ( | |
| self.audio_codec is None or "pcm" in self.audio_codec | |
| ) and ext in PCM_INCOMPATIBLE_EXTS: | |
| raise FFmpegNormalizeError( | |
| f"Output extension {ext} does not support PCM audio. " + | |
| "Please choose a suitable audio codec with the -c:a option." | |
| ) | |
| mf = MediaFile(self, input_file, output_file) | |
| self.media_files.append(mf) | |
| self.file_count += 1 | |
| def run_normalization(self): | |
| """ | |
| Run the normalization procedures | |
| """ | |
| for index, media_file in enumerate( | |
| tqdm(self.media_files, desc="File", disable=not self.progress, position=0) | |
| ): | |
| logger.info( | |
| f"Normalizing file {media_file} ({index + 1} of {self.file_count})" | |
| ) | |
| try: | |
| media_file.run_normalization() | |
| except Exception as e: | |
| if len(self.media_files) > 1: | |
| # simply warn and do not die | |
| logger.error( | |
| "Error processing input file {}, will continue batch-processing. Error was: {}".format( | |
| media_file, e | |
| ) | |
| ) | |
| else: | |
| # raise the error so the program will exit | |
| raise e | |
| logger.info(f"Normalized file written to {media_file.output_file}") | |
| if self.print_stats and self.stats: | |
| print(json.dumps(self.stats, indent=4)) | |