ffmpeg_progress_yield

 1from importlib import metadata
 2
 3from .ffmpeg_progress_yield import FfmpegProgress
 4
 5try:
 6    __version__ = metadata.version("ffmpeg-progress-yield")
 7except metadata.PackageNotFoundError:
 8    __version__ = "unknown"
 9
10__all__ = ["FfmpegProgress"]
class FfmpegProgress:
 20class FfmpegProgress:
 21    DUR_REGEX = re.compile(
 22        r"Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
 23    )
 24    TIME_REGEX = re.compile(
 25        r"out_time=(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
 26    )
 27    PROGRESS_REGEX = re.compile(r"[a-z0-9_]+=.+")
 28
 29    def __init__(
 30        self,
 31        cmd: List[str],
 32        dry_run: bool = False,
 33        exclude_progress: bool = False,
 34        ffprobe_path: str = "ffprobe",
 35    ) -> None:
 36        """Initialize the FfmpegProgress class.
 37
 38        Args:
 39            cmd (List[str]): A list of command line elements, e.g. ["ffmpeg", "-i", ...]
 40            dry_run (bool, optional): Only show what would be done. Defaults to False.
 41            exclude_progress (bool, optional): Exclude progress lines from output. Defaults to False.
 42            ffprobe_path (str, optional): Path to ffprobe executable. Defaults to "ffprobe".
 43        """
 44        self.cmd = cmd
 45        self.stderr: Union[str, None] = None
 46        self.dry_run = dry_run
 47        self.exclude_progress = exclude_progress
 48        self.ffprobe_path = ffprobe_path
 49        self.process: Any = None
 50        self.stderr_callback: Union[Callable[[str], None], None] = None
 51        self.base_popen_kwargs = {
 52            "stdin": subprocess.PIPE,  # Apply stdin isolation by creating separate pipe.
 53            "stdout": subprocess.PIPE,
 54            "stderr": subprocess.STDOUT,
 55            "universal_newlines": False,
 56        }
 57
 58        self.cmd_with_progress = (
 59            [self.cmd[0]] + ["-progress", "-", "-nostats"] + self.cmd[1:]
 60        )
 61        self.inputs_with_options = FfmpegProgress._get_inputs_with_options(self.cmd)
 62
 63        self.current_input_idx: int = 0
 64        self.total_dur: Union[None, int] = None
 65        # Skip probing duration in dry-run mode to avoid running ffprobe
 66        if not self.dry_run and FfmpegProgress._uses_error_loglevel(self.cmd):
 67            self.total_dur = self._probe_duration(self.cmd)
 68
 69        # Set up cleanup on garbage collection as a fallback
 70        self._cleanup_ref = weakref.finalize(self, self._cleanup_process, None)
 71
 72    @staticmethod
 73    def _cleanup_process(process: Any) -> None:
 74        """Clean up a process if it's still running."""
 75        if process is not None and hasattr(process, "poll"):
 76            try:
 77                if process.poll() is None:  # Process is still running
 78                    process.kill()
 79                    if hasattr(process, "wait"):
 80                        try:
 81                            process.wait(timeout=1.0)
 82                        except subprocess.TimeoutExpired:
 83                            pass  # Process didn't terminate gracefully, but we killed it
 84            except Exception:
 85                pass  # Ignore any errors during cleanup
 86
 87    def __del__(self) -> None:
 88        """Fallback cleanup when object is garbage collected."""
 89        if hasattr(self, "process") and self.process is not None:
 90            self._cleanup_process(self.process)
 91
 92    def __enter__(self) -> "FfmpegProgress":
 93        """Context manager entry."""
 94        return self
 95
 96    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
 97        """Context manager exit - ensures process cleanup."""
 98        if self.process is not None:
 99            try:
100                if hasattr(self.process, "poll") and self.process.poll() is None:
101                    self.quit()
102            except Exception:
103                pass  # Ignore errors during cleanup
104
105    async def __aenter__(self) -> "FfmpegProgress":
106        """Async context manager entry."""
107        return self
108
109    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
110        """Async context manager exit - ensures process cleanup."""
111        if self.process is not None:
112            try:
113                if (
114                    hasattr(self.process, "returncode")
115                    and self.process.returncode is None
116                ):
117                    await self.async_quit()
118            except Exception:
119                pass  # Ignore errors during cleanup
120
121    def _process_output(
122        self,
123        stderr_line: str,
124        stderr: List[str],
125        duration_override: Union[float, None],
126    ) -> Union[float, None]:
127        """
128        Process the output of the ffmpeg command.
129
130        Args:
131            stderr_line (str): The line of stderr output.
132            stderr (List[str]): The list of stderr output.
133            duration_override (Union[float, None]): The duration of the video in seconds.
134
135        Returns:
136            Union[float, None]: The progress in percent.
137        """
138
139        if self.stderr_callback:
140            self.stderr_callback(stderr_line)
141
142        stderr.append(stderr_line.strip())
143        self.stderr = "\n".join(
144            filter(
145                lambda line: not (
146                    self.exclude_progress and self.PROGRESS_REGEX.match(line)
147                ),
148                stderr,
149            )
150        )
151
152        progress: Union[float, None] = None
153        # assign the total duration if it was found. this can happen multiple times for multiple inputs,
154        # in which case we have to determine the overall duration by taking the min/max (dependent on -shortest being present)
155        if (
156            current_dur_match := self.DUR_REGEX.search(stderr_line)
157        ) and duration_override is None:
158            input_options = self.inputs_with_options[self.current_input_idx]
159            current_dur_ms: int = to_ms(**current_dur_match.groupdict())
160            # if the previous line had "image2", it's a single image and we assume a really short intrinsic duration (4ms),
161            # but if it's a loop, we assume infinity
162            if "image2" in stderr[-2] and "-loop 1" in " ".join(input_options):
163                current_dur_ms = 2**64
164            if "-shortest" in self.cmd:
165                self.total_dur = (
166                    min(self.total_dur, current_dur_ms)
167                    if self.total_dur is not None
168                    else current_dur_ms
169                )
170            else:
171                self.total_dur = (
172                    max(self.total_dur, current_dur_ms)
173                    if self.total_dur is not None
174                    else current_dur_ms
175                )
176            self.current_input_idx += 1
177
178        if (
179            progress_time := self.TIME_REGEX.search(stderr_line)
180        ) and self.total_dur is not None:
181            elapsed_time = to_ms(**progress_time.groupdict())
182            progress = min(max(round(elapsed_time / self.total_dur * 100, 2), 0), 100)
183
184        return progress
185
186    def _probe_duration(self, cmd: List[str]) -> Optional[int]:
187        """
188        Get the duration via ffprobe from input media file
189        in case ffmpeg was run with loglevel=error.
190
191        Args:
192            cmd (List[str]): A list of command line elements, e.g. ["ffmpeg", "-i", ...]
193
194        Returns:
195            Optional[int]: The duration in milliseconds.
196        """
197        file_names = []
198        for i, arg in enumerate(cmd):
199            if arg == "-i":
200                file_name = cmd[i + 1]
201
202                # filter for filenames that we can probe, i.e. regular files
203                if os.path.isfile(file_name):
204                    file_names.append(file_name)
205
206        if len(file_names) == 0:
207            return None
208
209        durations = []
210
211        for file_name in file_names:
212            try:
213                output = subprocess.check_output(
214                    [
215                        self.ffprobe_path,
216                        "-loglevel",
217                        "error",
218                        "-hide_banner",
219                        "-show_entries",
220                        "format=duration",
221                        "-of",
222                        "default=noprint_wrappers=1:nokey=1",
223                        file_name,
224                    ],
225                    universal_newlines=True,
226                )
227                durations.append(int(float(output.strip()) * 1000))
228            except Exception:
229                # TODO: add logging
230                return None
231
232        return max(durations) if "-shortest" not in cmd else min(durations)
233
234    @staticmethod
235    def _uses_error_loglevel(cmd: List[str]) -> bool:
236        try:
237            idx = cmd.index("-loglevel")
238            if cmd[idx + 1] == "error":
239                return True
240            else:
241                return False
242        except ValueError:
243            return False
244
245    @staticmethod
246    def _get_inputs_with_options(cmd: List[str]) -> List[List[str]]:
247        """
248        Collect all inputs with their options.
249        For example, input is:
250
251            ffmpeg -i input1.mp4 -i input2.mp4 -i input3.mp4 -filter_complex ...
252
253        Output is:
254
255            [
256                ["-i", "input1.mp4"],
257                ["-i", "input2.mp4"],
258                ["-i", "input3.mp4"],
259            ]
260
261        Another example:
262
263            ffmpeg -f lavfi -i color=c=black:s=1920x1080 -loop 1 -i image.png -filter_complex ...
264
265        Output is:
266
267            [
268                ["-f", "lavfi", "-i", "color=c=black:s=1920x1080"],
269                ["-loop", "1", "-i", "image.png"],
270            ]
271        """
272        inputs = []
273        prev_index = 0
274        for i, arg in enumerate(cmd):
275            if arg == "-i":
276                inputs.append(cmd[prev_index : i + 2])
277                prev_index = i + 2
278
279        return inputs
280
281    def run_command_with_progress(
282        self, popen_kwargs=None, duration_override: Union[float, None] = None
283    ) -> Iterator[float]:
284        """
285        Run an ffmpeg command, trying to capture the process output and calculate
286        the duration / progress.
287        Yields the progress in percent.
288
289        Args:
290            popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
291            duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
292
293        Raises:
294            RuntimeError: If the command fails, an exception is raised.
295
296        Yields:
297            Iterator[float]: A generator that yields the progress in percent.
298        """
299        if self.dry_run:
300            yield from [0, 100]
301            return
302
303        if duration_override:
304            self.total_dur = int(duration_override * 1000)
305
306        base_popen_kwargs = self.base_popen_kwargs.copy()
307        if popen_kwargs is not None:
308            base_popen_kwargs.update(popen_kwargs)
309
310        self.process = subprocess.Popen(self.cmd_with_progress, **base_popen_kwargs)  # type: ignore
311
312        # Update the cleanup finalizer with the actual process
313        self._cleanup_ref.detach()
314        self._cleanup_ref = weakref.finalize(self, self._cleanup_process, self.process)
315
316        try:
317            yield 0
318
319            stderr: List[str] = []
320            while True:
321                if self.process.stdout is None:
322                    continue
323
324                stderr_line: str = (
325                    self.process.stdout.readline()
326                    .decode("utf-8", errors="replace")
327                    .strip()
328                )
329
330                if stderr_line == "" and self.process.poll() is not None:
331                    break
332
333                progress = self._process_output(stderr_line, stderr, duration_override)
334                if progress is not None:
335                    yield progress
336
337            if self.process.returncode != 0:
338                raise RuntimeError(f"Error running command {self.cmd}: {self.stderr}")
339
340            yield 100
341        finally:
342            # Ensure process cleanup even if an exception occurs
343            if self.process is not None:
344                try:
345                    if self.process.poll() is None:  # Process is still running
346                        self.process.kill()
347                        try:
348                            self.process.wait(timeout=1.0)
349                        except subprocess.TimeoutExpired:
350                            pass  # Process didn't terminate gracefully, but we killed it
351                except Exception:
352                    pass  # Ignore any errors during cleanup
353                finally:
354                    self.process = None
355                    # Detach the finalizer since we've cleaned up manually
356                    if hasattr(self, "_cleanup_ref"):
357                        self._cleanup_ref.detach()
358
359    async def async_run_command_with_progress(
360        self, popen_kwargs=None, duration_override: Union[float, None] = None
361    ) -> AsyncIterator[float]:
362        """
363        Asynchronously run an ffmpeg command, trying to capture the process output and calculate
364        the duration / progress.
365        Yields the progress in percent.
366
367        Args:
368            popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
369            duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
370
371        Raises:
372            RuntimeError: If the command fails, an exception is raised.
373        """
374        if self.dry_run:
375            yield 0
376            yield 100
377            return
378
379        if duration_override:
380            self.total_dur = int(duration_override * 1000)
381
382        base_popen_kwargs = self.base_popen_kwargs.copy()
383        if popen_kwargs is not None:
384            base_popen_kwargs.update(popen_kwargs)
385
386        # Remove stdout and stderr from base_popen_kwargs as we're setting them explicitly
387        base_popen_kwargs.pop("stdout", None)
388        base_popen_kwargs.pop("stderr", None)
389
390        self.process = await asyncio.create_subprocess_exec(
391            *self.cmd_with_progress,
392            stdout=asyncio.subprocess.PIPE,
393            stderr=asyncio.subprocess.STDOUT,
394            **base_popen_kwargs,  # type: ignore
395        )
396
397        # Update the cleanup finalizer with the actual process
398        self._cleanup_ref.detach()
399        self._cleanup_ref = weakref.finalize(self, self._cleanup_process, self.process)
400
401        try:
402            yield 0
403
404            stderr: List[str] = []
405            while True:
406                if self.process.stdout is None:
407                    continue
408
409                stderr_line: Union[bytes, None] = await self.process.stdout.readline()
410                if not stderr_line:
411                    # Process has finished, check the return code
412                    await self.process.wait()
413                    if self.process.returncode != 0:
414                        raise RuntimeError(
415                            f"Error running command {self.cmd}: {self.stderr}"
416                        )
417                    break
418                stderr_line_str = stderr_line.decode("utf-8", errors="replace").strip()
419
420                progress = self._process_output(
421                    stderr_line_str, stderr, duration_override
422                )
423                if progress is not None:
424                    yield progress
425
426            yield 100
427        except GeneratorExit:
428            # Handle case where async generator is closed prematurely
429            await self._async_cleanup_process()
430            raise
431        except Exception:
432            # Handle any other exception
433            await self._async_cleanup_process()
434            raise
435        finally:
436            # Normal cleanup
437            await self._async_cleanup_process()
438
439    async def _async_cleanup_process(self) -> None:
440        """Clean up the async process."""
441        if self.process is not None:
442            try:
443                if self.process.returncode is None:  # Process is still running
444                    self.process.kill()
445                    try:
446                        await self.process.wait()
447                    except Exception:
448                        pass  # Ignore any errors during cleanup
449            except Exception:
450                pass  # Ignore any errors during cleanup
451            finally:
452                self.process = None
453                # Detach the finalizer since we've cleaned up manually
454                if hasattr(self, "_cleanup_ref"):
455                    self._cleanup_ref.detach()
456
457    def quit_gracefully(self) -> None:
458        """
459        Quit the ffmpeg process by sending 'q'
460
461        Raises:
462            RuntimeError: If no process is found.
463        """
464        if self.process is None:
465            raise RuntimeError("No process found. Did you run the command?")
466
467        self.process.communicate(input=b"q")
468        self.process.kill()
469        self.process = None
470
471    def quit(self) -> None:
472        """
473        Quit the ffmpeg process by sending SIGKILL.
474
475        Raises:
476            RuntimeError: If no process is found.
477        """
478        if self.process is None:
479            raise RuntimeError("No process found. Did you run the command?")
480
481        self.process.kill()
482        self.process = None
483
484    async def async_quit_gracefully(self) -> None:
485        """
486        Quit the ffmpeg process by sending 'q' asynchronously
487
488        Raises:
489            RuntimeError: If no process is found.
490        """
491        if self.process is None:
492            raise RuntimeError("No process found. Did you run the command?")
493
494        self.process.stdin.write(b"q")
495        await self.process.stdin.drain()
496        await self.process.wait()
497        self.process = None
498
499    async def async_quit(self) -> None:
500        """
501        Quit the ffmpeg process by sending SIGKILL asynchronously.
502
503        Raises:
504            RuntimeError: If no process is found.
505        """
506        if self.process is None:
507            raise RuntimeError("No process found. Did you run the command?")
508
509        self.process.kill()
510        await self.process.wait()
511        self.process = None
512
513    def set_stderr_callback(self, callback: Callable[[str], None]) -> None:
514        """
515        Set a callback function to be called on stderr output.
516        The callback function must accept a single string argument.
517        Note that this is called on every line of stderr output, so it can be called a lot.
518        Also note that stdout/stderr are joined into one stream, so you might get stdout output in the callback.
519
520        Args:
521            callback (Callable[[str], None]): A callback function that accepts a single string argument.
522        """
523        if (
524            not callable(callback)
525            or not isinstance(callback, types.FunctionType)
526            or len(callback.__code__.co_varnames) != 1
527        ):
528            raise ValueError(
529                "Callback must be a function that accepts only one argument"
530            )
531
532        self.stderr_callback = callback
FfmpegProgress( cmd: List[str], dry_run: bool = False, exclude_progress: bool = False, ffprobe_path: str = 'ffprobe')
29    def __init__(
30        self,
31        cmd: List[str],
32        dry_run: bool = False,
33        exclude_progress: bool = False,
34        ffprobe_path: str = "ffprobe",
35    ) -> None:
36        """Initialize the FfmpegProgress class.
37
38        Args:
39            cmd (List[str]): A list of command line elements, e.g. ["ffmpeg", "-i", ...]
40            dry_run (bool, optional): Only show what would be done. Defaults to False.
41            exclude_progress (bool, optional): Exclude progress lines from output. Defaults to False.
42            ffprobe_path (str, optional): Path to ffprobe executable. Defaults to "ffprobe".
43        """
44        self.cmd = cmd
45        self.stderr: Union[str, None] = None
46        self.dry_run = dry_run
47        self.exclude_progress = exclude_progress
48        self.ffprobe_path = ffprobe_path
49        self.process: Any = None
50        self.stderr_callback: Union[Callable[[str], None], None] = None
51        self.base_popen_kwargs = {
52            "stdin": subprocess.PIPE,  # Apply stdin isolation by creating separate pipe.
53            "stdout": subprocess.PIPE,
54            "stderr": subprocess.STDOUT,
55            "universal_newlines": False,
56        }
57
58        self.cmd_with_progress = (
59            [self.cmd[0]] + ["-progress", "-", "-nostats"] + self.cmd[1:]
60        )
61        self.inputs_with_options = FfmpegProgress._get_inputs_with_options(self.cmd)
62
63        self.current_input_idx: int = 0
64        self.total_dur: Union[None, int] = None
65        # Skip probing duration in dry-run mode to avoid running ffprobe
66        if not self.dry_run and FfmpegProgress._uses_error_loglevel(self.cmd):
67            self.total_dur = self._probe_duration(self.cmd)
68
69        # Set up cleanup on garbage collection as a fallback
70        self._cleanup_ref = weakref.finalize(self, self._cleanup_process, None)

Initialize the FfmpegProgress class.

Arguments:
  • cmd (List[str]): A list of command line elements, e.g. ["ffmpeg", "-i", ...]
  • dry_run (bool, optional): Only show what would be done. Defaults to False.
  • exclude_progress (bool, optional): Exclude progress lines from output. Defaults to False.
  • ffprobe_path (str, optional): Path to ffprobe executable. Defaults to "ffprobe".
DUR_REGEX = re.compile('Duration: (?P<hour>\\d{2}):(?P<min>\\d{2}):(?P<sec>\\d{2})\\.(?P<ms>\\d{2})')
TIME_REGEX = re.compile('out_time=(?P<hour>\\d{2}):(?P<min>\\d{2}):(?P<sec>\\d{2})\\.(?P<ms>\\d{2})')
PROGRESS_REGEX = re.compile('[a-z0-9_]+=.+')
cmd
stderr: Optional[str]
dry_run
exclude_progress
ffprobe_path
process: Any
stderr_callback: Optional[Callable[[str], NoneType]]
base_popen_kwargs
cmd_with_progress
inputs_with_options
current_input_idx: int
total_dur: Optional[int]
def run_command_with_progress( self, popen_kwargs=None, duration_override: Optional[float] = None) -> Iterator[float]:
281    def run_command_with_progress(
282        self, popen_kwargs=None, duration_override: Union[float, None] = None
283    ) -> Iterator[float]:
284        """
285        Run an ffmpeg command, trying to capture the process output and calculate
286        the duration / progress.
287        Yields the progress in percent.
288
289        Args:
290            popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
291            duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
292
293        Raises:
294            RuntimeError: If the command fails, an exception is raised.
295
296        Yields:
297            Iterator[float]: A generator that yields the progress in percent.
298        """
299        if self.dry_run:
300            yield from [0, 100]
301            return
302
303        if duration_override:
304            self.total_dur = int(duration_override * 1000)
305
306        base_popen_kwargs = self.base_popen_kwargs.copy()
307        if popen_kwargs is not None:
308            base_popen_kwargs.update(popen_kwargs)
309
310        self.process = subprocess.Popen(self.cmd_with_progress, **base_popen_kwargs)  # type: ignore
311
312        # Update the cleanup finalizer with the actual process
313        self._cleanup_ref.detach()
314        self._cleanup_ref = weakref.finalize(self, self._cleanup_process, self.process)
315
316        try:
317            yield 0
318
319            stderr: List[str] = []
320            while True:
321                if self.process.stdout is None:
322                    continue
323
324                stderr_line: str = (
325                    self.process.stdout.readline()
326                    .decode("utf-8", errors="replace")
327                    .strip()
328                )
329
330                if stderr_line == "" and self.process.poll() is not None:
331                    break
332
333                progress = self._process_output(stderr_line, stderr, duration_override)
334                if progress is not None:
335                    yield progress
336
337            if self.process.returncode != 0:
338                raise RuntimeError(f"Error running command {self.cmd}: {self.stderr}")
339
340            yield 100
341        finally:
342            # Ensure process cleanup even if an exception occurs
343            if self.process is not None:
344                try:
345                    if self.process.poll() is None:  # Process is still running
346                        self.process.kill()
347                        try:
348                            self.process.wait(timeout=1.0)
349                        except subprocess.TimeoutExpired:
350                            pass  # Process didn't terminate gracefully, but we killed it
351                except Exception:
352                    pass  # Ignore any errors during cleanup
353                finally:
354                    self.process = None
355                    # Detach the finalizer since we've cleaned up manually
356                    if hasattr(self, "_cleanup_ref"):
357                        self._cleanup_ref.detach()

Run an ffmpeg command, trying to capture the process output and calculate the duration / progress. Yields the progress in percent.

Arguments:
  • popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
  • duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
Raises:
  • RuntimeError: If the command fails, an exception is raised.
Yields:

Iterator[float]: A generator that yields the progress in percent.

async def async_run_command_with_progress( self, popen_kwargs=None, duration_override: Optional[float] = None) -> AsyncIterator[float]:
359    async def async_run_command_with_progress(
360        self, popen_kwargs=None, duration_override: Union[float, None] = None
361    ) -> AsyncIterator[float]:
362        """
363        Asynchronously run an ffmpeg command, trying to capture the process output and calculate
364        the duration / progress.
365        Yields the progress in percent.
366
367        Args:
368            popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
369            duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
370
371        Raises:
372            RuntimeError: If the command fails, an exception is raised.
373        """
374        if self.dry_run:
375            yield 0
376            yield 100
377            return
378
379        if duration_override:
380            self.total_dur = int(duration_override * 1000)
381
382        base_popen_kwargs = self.base_popen_kwargs.copy()
383        if popen_kwargs is not None:
384            base_popen_kwargs.update(popen_kwargs)
385
386        # Remove stdout and stderr from base_popen_kwargs as we're setting them explicitly
387        base_popen_kwargs.pop("stdout", None)
388        base_popen_kwargs.pop("stderr", None)
389
390        self.process = await asyncio.create_subprocess_exec(
391            *self.cmd_with_progress,
392            stdout=asyncio.subprocess.PIPE,
393            stderr=asyncio.subprocess.STDOUT,
394            **base_popen_kwargs,  # type: ignore
395        )
396
397        # Update the cleanup finalizer with the actual process
398        self._cleanup_ref.detach()
399        self._cleanup_ref = weakref.finalize(self, self._cleanup_process, self.process)
400
401        try:
402            yield 0
403
404            stderr: List[str] = []
405            while True:
406                if self.process.stdout is None:
407                    continue
408
409                stderr_line: Union[bytes, None] = await self.process.stdout.readline()
410                if not stderr_line:
411                    # Process has finished, check the return code
412                    await self.process.wait()
413                    if self.process.returncode != 0:
414                        raise RuntimeError(
415                            f"Error running command {self.cmd}: {self.stderr}"
416                        )
417                    break
418                stderr_line_str = stderr_line.decode("utf-8", errors="replace").strip()
419
420                progress = self._process_output(
421                    stderr_line_str, stderr, duration_override
422                )
423                if progress is not None:
424                    yield progress
425
426            yield 100
427        except GeneratorExit:
428            # Handle case where async generator is closed prematurely
429            await self._async_cleanup_process()
430            raise
431        except Exception:
432            # Handle any other exception
433            await self._async_cleanup_process()
434            raise
435        finally:
436            # Normal cleanup
437            await self._async_cleanup_process()

Asynchronously run an ffmpeg command, trying to capture the process output and calculate the duration / progress. Yields the progress in percent.

Arguments:
  • popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
  • duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
Raises:
  • RuntimeError: If the command fails, an exception is raised.
def quit_gracefully(self) -> None:
457    def quit_gracefully(self) -> None:
458        """
459        Quit the ffmpeg process by sending 'q'
460
461        Raises:
462            RuntimeError: If no process is found.
463        """
464        if self.process is None:
465            raise RuntimeError("No process found. Did you run the command?")
466
467        self.process.communicate(input=b"q")
468        self.process.kill()
469        self.process = None

Quit the ffmpeg process by sending 'q'

Raises:
  • RuntimeError: If no process is found.
def quit(self) -> None:
471    def quit(self) -> None:
472        """
473        Quit the ffmpeg process by sending SIGKILL.
474
475        Raises:
476            RuntimeError: If no process is found.
477        """
478        if self.process is None:
479            raise RuntimeError("No process found. Did you run the command?")
480
481        self.process.kill()
482        self.process = None

Quit the ffmpeg process by sending SIGKILL.

Raises:
  • RuntimeError: If no process is found.
async def async_quit_gracefully(self) -> None:
484    async def async_quit_gracefully(self) -> None:
485        """
486        Quit the ffmpeg process by sending 'q' asynchronously
487
488        Raises:
489            RuntimeError: If no process is found.
490        """
491        if self.process is None:
492            raise RuntimeError("No process found. Did you run the command?")
493
494        self.process.stdin.write(b"q")
495        await self.process.stdin.drain()
496        await self.process.wait()
497        self.process = None

Quit the ffmpeg process by sending 'q' asynchronously

Raises:
  • RuntimeError: If no process is found.
async def async_quit(self) -> None:
499    async def async_quit(self) -> None:
500        """
501        Quit the ffmpeg process by sending SIGKILL asynchronously.
502
503        Raises:
504            RuntimeError: If no process is found.
505        """
506        if self.process is None:
507            raise RuntimeError("No process found. Did you run the command?")
508
509        self.process.kill()
510        await self.process.wait()
511        self.process = None

Quit the ffmpeg process by sending SIGKILL asynchronously.

Raises:
  • RuntimeError: If no process is found.
def set_stderr_callback(self, callback: Callable[[str], NoneType]) -> None:
513    def set_stderr_callback(self, callback: Callable[[str], None]) -> None:
514        """
515        Set a callback function to be called on stderr output.
516        The callback function must accept a single string argument.
517        Note that this is called on every line of stderr output, so it can be called a lot.
518        Also note that stdout/stderr are joined into one stream, so you might get stdout output in the callback.
519
520        Args:
521            callback (Callable[[str], None]): A callback function that accepts a single string argument.
522        """
523        if (
524            not callable(callback)
525            or not isinstance(callback, types.FunctionType)
526            or len(callback.__code__.co_varnames) != 1
527        ):
528            raise ValueError(
529                "Callback must be a function that accepts only one argument"
530            )
531
532        self.stderr_callback = callback

Set a callback function to be called on stderr output. The callback function must accept a single string argument. Note that this is called on every line of stderr output, so it can be called a lot. Also note that stdout/stderr are joined into one stream, so you might get stdout output in the callback.

Arguments:
  • callback (Callable[[str], None]): A callback function that accepts a single string argument.