Skip to content

Video Encoder

renderkit.processing.video_encoder.VideoEncoder

Video encoder for creating MP4 files using FFmpeg.

Source code in src/renderkit/processing/video_encoder.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
class VideoEncoder:
    """Video encoder for creating MP4 files using FFmpeg."""

    def __init__(
        self,
        output_path: Path,
        fps: float,
        codec: str = "libx264",
        bitrate: Optional[int] = None,
        quality: Optional[int] = 10,
        macro_block_size: int = 16,
    ) -> None:
        """Initialize video encoder.

        Args:
            output_path: Path to output video file
            fps: Frame rate
            codec: Video codec (FFmpeg name like 'libx264', 'libx265', 'libaom-av1')
            bitrate: Video bitrate in kbps (optional)
            quality: Video quality (0-10), 10 is best (optional, used only if bitrate is None)
            macro_block_size: Macro block size for codec compatibility (default: 16)
                             Frame dimensions will be rounded up to multiples of this value
        """
        self.output_path = output_path.absolute()
        self.fps = fps
        self.codec = codec
        self.bitrate = bitrate
        self.quality = quality
        self.macro_block_size = macro_block_size
        self._writer = None
        self._width: Optional[int] = None
        self._height: Optional[int] = None
        self._adjusted_width: Optional[int] = None
        self._adjusted_height: Optional[int] = None
        self._ffmpeg_report_path: Optional[Path] = None
        self._ffreport_prev: Optional[str] = None
        self._ffreport_set: bool = False

    def _configure_ffmpeg_report(self) -> Optional[Path]:
        """Enable ffmpeg report logging (on by default)."""
        value = os.environ.get("RENDERKIT_FFMPEG_LOG", "1")
        if value.lower() in {"0", "false", "no", "off"}:
            return None
        if value.lower() in {"1", "true", "yes", "on"}:
            stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
            path = Path(tempfile.gettempdir()) / f"renderkit-ffmpeg-{stamp}.log"
        else:
            path = Path(value).expanduser()
        path.parent.mkdir(parents=True, exist_ok=True)

        if "FFREPORT" in os.environ:
            self._ffreport_prev = os.environ["FFREPORT"]
        else:
            self._ffreport_prev = None
        escaped_path = _escape_ffreport_path(path)
        os.environ["FFREPORT"] = f"file={escaped_path}:level=48"
        self._ffreport_set = True
        return path

    def _restore_ffmpeg_report_env(self) -> None:
        if not self._ffreport_set:
            return
        if self._ffreport_prev is None:
            os.environ.pop("FFREPORT", None)
        else:
            os.environ["FFREPORT"] = self._ffreport_prev
        self._ffreport_set = False

    def _read_ffmpeg_report_tail(self, max_lines: int = 80) -> Optional[str]:
        if self._ffmpeg_report_path is None:
            return None
        if not self._ffmpeg_report_path.exists():
            return None
        try:
            content = self._ffmpeg_report_path.read_text(errors="ignore")
        except Exception:
            return None
        lines = [line for line in content.splitlines() if line.strip()]
        if not lines:
            return None
        tail = "\n".join(lines[-max_lines:])
        return f"FFMPEG REPORT (tail) [{self._ffmpeg_report_path}]:\n{tail}"

    @staticmethod
    def _make_divisible(dimension: int, divisor: int = 16) -> int:
        """Round up dimension to be divisible by divisor for codec compatibility.

        Args:
            dimension: Original dimension
            divisor: Divisor (typically 16 for macro block size)

        Returns:
            Dimension rounded up to nearest multiple of divisor
        """
        return ((dimension + divisor - 1) // divisor) * divisor

    def __enter__(self) -> "VideoEncoder":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit - close video writer."""
        self.close()

    def initialize(self, width: int, height: int) -> None:
        """Initialize the video writer with frame dimensions.

        Args:
            width: Frame width
            height: Frame height

        Raises:
            VideoEncodingError: If encoder cannot be initialized
        """
        self._width = width
        self._height = height

        # Adjust dimensions to be divisible by macro_block_size for codec compatibility
        # This prevents ffmpeg from needing to auto-resize frames.
        self._adjusted_width = self._make_divisible(width, self.macro_block_size)
        self._adjusted_height = self._make_divisible(height, self.macro_block_size)

        if self._adjusted_width != width or self._adjusted_height != height:
            logger.warning(
                f"Frame dimensions adjusted from {width}x{height} to "
                f"{self._adjusted_width}x{self._adjusted_height} for codec compatibility"
            )

        # Ensure output directory exists
        self.output_path.parent.mkdir(parents=True, exist_ok=True)

        # Map UI/OpenCV-style codecs to FFmpeg codecs
        codec_map = {
            "avc1": "libx264",
            "hevc": "libx265",
            "mp4v": "mpeg4",
            "XVID": "mpeg4",
        }
        ffmpeg_codec = codec_map.get(self.codec, self.codec)
        available_encoders = get_available_encoders()
        ffmpeg_codec, fallback_warning = select_available_encoder(ffmpeg_codec, available_encoders)
        if fallback_warning:
            logger.warning(fallback_warning)
        if available_encoders and ffmpeg_codec not in available_encoders:
            available = ", ".join(sorted(available_encoders))
            raise VideoEncodingError(
                f"Requested FFmpeg encoder '{ffmpeg_codec}' is not available. "
                f"Available encoders: {available}"
            )

        ffmpeg_log_level = "warning"
        self._ffmpeg_report_path = self._configure_ffmpeg_report()
        if self._ffmpeg_report_path is not None:
            ffmpeg_log_level = "info"
            logger.info("Logging to file: %s", self._ffmpeg_report_path)

        # Set default FFmpeg parameters for broad compatibility and web optimization
        # -movflags +faststart enables progressive loading for web playback
        # Add explicit SDR color tags for predictable playback across platforms
        ffmpeg_params = [
            "-movflags",
            "+faststart",
            "-color_primaries",
            "bt709",
            "-color_trc",
            "bt709",
            "-colorspace",
            "bt709",
        ]

        bitrate: Optional[str] = None

        # Bitrate takes precedence over quality tuning when provided.
        if self.bitrate is not None:
            bitrate = f"{self.bitrate}k"
            if ffmpeg_codec == "libaom-av1":
                ffmpeg_params.extend(["-cpu-used", "6"])
            logger.debug("%s tuning: bitrate=%s", ffmpeg_codec, bitrate)
        # Codec-specific tuning and quality mapping (used only when bitrate is not provided).
        elif ffmpeg_codec in ["libx264", "libx265"]:
            # Map quality (0-10) to CRF (35-18)
            # 10 -> 18 (Excellent), 0 -> 35 (Low quality)
            # Default to 23 if not specified
            crf = 18 + (10 - self.quality) * 1.7 if self.quality is not None else 23
            ffmpeg_params.extend(["-crf", f"{int(crf)}"])
            logger.debug(f"{ffmpeg_codec} tuning: crf={int(crf)}")

        elif ffmpeg_codec == "libaom-av1":
            # AV1 is extremely slow by default. Use -cpu-used 6 for better speed.
            # Map 0-10 quality to CRF 50-20 (lower is better)
            crf = 20 + (10 - self.quality) * 3 if self.quality is not None else 32
            ffmpeg_params.extend(["-crf", f"{int(crf)}", "-cpu-used", "6"])
            # libaom-av1 needs bitrate=0 to enable CRF mode in FFmpeg
            bitrate = "0"
            logger.debug(f"AV1 tuning: crf={int(crf)}, cpu-used=6")

        elif ffmpeg_codec == "mpeg4":
            # Map quality (0-10) to -q:v (31-2)
            # Higher -q:v is lower quality.
            qv = 2 + (10 - self.quality) * 2.9 if self.quality is not None else 4
            ffmpeg_params.extend(["-q:v", f"{int(qv)}"])
            logger.debug(f"MPEG-4 tuning: q:v={int(qv)}")

        # Apply final FFmpeg parameters
        if self._ffmpeg_report_path is not None:
            ffmpeg_params.extend(["-report", "-loglevel", "info"])
        try:
            self._writer = _RawFfmpegPipeWriter(
                self.output_path,
                self._adjusted_width,
                self._adjusted_height,
                self.fps,
                ffmpeg_codec,
                "rgb24",
                "yuv420p",
                ffmpeg_params,
                ffmpeg_log_level,
                bitrate,
            )
            logger.debug(
                f"Initialized FFmpeg pipe writer: {width}x{height} @ {self.fps}fps, "
                f"codec={ffmpeg_codec}, quality={self.quality}, bitrate={bitrate}, "
                f"params={ffmpeg_params}"
            )
        except RuntimeError as e:
            msg = str(e)
            self._restore_ffmpeg_report_env()
            if "ffmpeg" in msg.lower():
                raise VideoEncodingError(
                    "FFmpeg backend not found. Ensure a bundled FFmpeg is available or ffmpeg is on PATH."
                ) from e
            raise VideoEncodingError(f"Failed to initialize video encoder: {e}") from e
        except Exception as e:
            self._restore_ffmpeg_report_env()
            raise VideoEncodingError(f"Failed to initialize video encoder: {e}") from e

    def write_frame(self, buf: oiio.ImageBuf) -> None:
        """Write an ImageBuf frame to the video."""
        if self._writer is None:
            raise VideoEncodingError("Video encoder not initialized. Call initialize() first.")

        spec = buf.spec()
        if spec.width != self._adjusted_width or spec.height != self._adjusted_height:
            buf = ImageScaler.scale_buf(
                buf,
                width=self._adjusted_width,
                height=self._adjusted_height,
                filter_name="lanczos3",
            )
            spec = buf.spec()

        pixels = buf.get_pixels(oiio.FLOAT)
        if pixels is None or pixels.size == 0:
            raise VideoEncodingError("Failed to extract pixel data from ImageBuf.")
        if pixels.ndim == 1:
            frame = pixels.reshape((spec.height, spec.width, spec.nchannels))
        else:
            frame = pixels

        frame_f32 = frame.astype(np.float32, copy=False)
        frame = np.clip(frame_f32, 0.0, 1.0)
        frame = (frame * np.float32(255.0)).astype(np.uint8)

        # FFmpeg rawvideo expects RGB (standard)
        # If RGBA, drop alpha channel
        if len(frame.shape) == 3 and frame.shape[2] == 4:
            frame = frame[:, :, :3]
        elif len(frame.shape) == 3 and frame.shape[2] == 1:
            frame = np.repeat(frame, 3, axis=2)
        # If Grayscale, ensure 3D
        elif len(frame.shape) == 2:
            frame = np.stack([frame] * 3, axis=-1)

        try:
            self._writer.append_data(frame)
        except Exception as e:
            report_tail = self._read_ffmpeg_report_tail()
            if report_tail:
                raise VideoEncodingError(
                    f"Failed to write frame to video: {e}\n\n{report_tail}"
                ) from e
            raise VideoEncodingError(f"Failed to write frame to video: {e}") from e

    def close(self) -> None:
        """Close the video writer."""
        if self._writer is not None:
            try:
                self._writer.close()
                logger.info("Video encoding completed.")
            except Exception as e:
                logger.error(f"Error closing video writer: {e}")
            finally:
                self._writer = None
        self._restore_ffmpeg_report_env()

    def is_initialized(self) -> bool:
        """Check if encoder is initialized."""
        return self._writer is not None

__enter__()

Context manager entry.

Source code in src/renderkit/processing/video_encoder.py
254
255
256
def __enter__(self) -> "VideoEncoder":
    """Context manager entry."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Context manager exit - close video writer.

Source code in src/renderkit/processing/video_encoder.py
258
259
260
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Context manager exit - close video writer."""
    self.close()

__init__(output_path, fps, codec='libx264', bitrate=None, quality=10, macro_block_size=16)

Initialize video encoder.

Parameters:

Name Type Description Default
output_path Path

Path to output video file

required
fps float

Frame rate

required
codec str

Video codec (FFmpeg name like 'libx264', 'libx265', 'libaom-av1')

'libx264'
bitrate Optional[int]

Video bitrate in kbps (optional)

None
quality Optional[int]

Video quality (0-10), 10 is best (optional, used only if bitrate is None)

10
macro_block_size int

Macro block size for codec compatibility (default: 16) Frame dimensions will be rounded up to multiples of this value

16
Source code in src/renderkit/processing/video_encoder.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(
    self,
    output_path: Path,
    fps: float,
    codec: str = "libx264",
    bitrate: Optional[int] = None,
    quality: Optional[int] = 10,
    macro_block_size: int = 16,
) -> None:
    """Initialize video encoder.

    Args:
        output_path: Path to output video file
        fps: Frame rate
        codec: Video codec (FFmpeg name like 'libx264', 'libx265', 'libaom-av1')
        bitrate: Video bitrate in kbps (optional)
        quality: Video quality (0-10), 10 is best (optional, used only if bitrate is None)
        macro_block_size: Macro block size for codec compatibility (default: 16)
                         Frame dimensions will be rounded up to multiples of this value
    """
    self.output_path = output_path.absolute()
    self.fps = fps
    self.codec = codec
    self.bitrate = bitrate
    self.quality = quality
    self.macro_block_size = macro_block_size
    self._writer = None
    self._width: Optional[int] = None
    self._height: Optional[int] = None
    self._adjusted_width: Optional[int] = None
    self._adjusted_height: Optional[int] = None
    self._ffmpeg_report_path: Optional[Path] = None
    self._ffreport_prev: Optional[str] = None
    self._ffreport_set: bool = False

close()

Close the video writer.

Source code in src/renderkit/processing/video_encoder.py
441
442
443
444
445
446
447
448
449
450
451
def close(self) -> None:
    """Close the video writer."""
    if self._writer is not None:
        try:
            self._writer.close()
            logger.info("Video encoding completed.")
        except Exception as e:
            logger.error(f"Error closing video writer: {e}")
        finally:
            self._writer = None
    self._restore_ffmpeg_report_env()

initialize(width, height)

Initialize the video writer with frame dimensions.

Parameters:

Name Type Description Default
width int

Frame width

required
height int

Frame height

required

Raises:

Type Description
VideoEncodingError

If encoder cannot be initialized

Source code in src/renderkit/processing/video_encoder.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def initialize(self, width: int, height: int) -> None:
    """Initialize the video writer with frame dimensions.

    Args:
        width: Frame width
        height: Frame height

    Raises:
        VideoEncodingError: If encoder cannot be initialized
    """
    self._width = width
    self._height = height

    # Adjust dimensions to be divisible by macro_block_size for codec compatibility
    # This prevents ffmpeg from needing to auto-resize frames.
    self._adjusted_width = self._make_divisible(width, self.macro_block_size)
    self._adjusted_height = self._make_divisible(height, self.macro_block_size)

    if self._adjusted_width != width or self._adjusted_height != height:
        logger.warning(
            f"Frame dimensions adjusted from {width}x{height} to "
            f"{self._adjusted_width}x{self._adjusted_height} for codec compatibility"
        )

    # Ensure output directory exists
    self.output_path.parent.mkdir(parents=True, exist_ok=True)

    # Map UI/OpenCV-style codecs to FFmpeg codecs
    codec_map = {
        "avc1": "libx264",
        "hevc": "libx265",
        "mp4v": "mpeg4",
        "XVID": "mpeg4",
    }
    ffmpeg_codec = codec_map.get(self.codec, self.codec)
    available_encoders = get_available_encoders()
    ffmpeg_codec, fallback_warning = select_available_encoder(ffmpeg_codec, available_encoders)
    if fallback_warning:
        logger.warning(fallback_warning)
    if available_encoders and ffmpeg_codec not in available_encoders:
        available = ", ".join(sorted(available_encoders))
        raise VideoEncodingError(
            f"Requested FFmpeg encoder '{ffmpeg_codec}' is not available. "
            f"Available encoders: {available}"
        )

    ffmpeg_log_level = "warning"
    self._ffmpeg_report_path = self._configure_ffmpeg_report()
    if self._ffmpeg_report_path is not None:
        ffmpeg_log_level = "info"
        logger.info("Logging to file: %s", self._ffmpeg_report_path)

    # Set default FFmpeg parameters for broad compatibility and web optimization
    # -movflags +faststart enables progressive loading for web playback
    # Add explicit SDR color tags for predictable playback across platforms
    ffmpeg_params = [
        "-movflags",
        "+faststart",
        "-color_primaries",
        "bt709",
        "-color_trc",
        "bt709",
        "-colorspace",
        "bt709",
    ]

    bitrate: Optional[str] = None

    # Bitrate takes precedence over quality tuning when provided.
    if self.bitrate is not None:
        bitrate = f"{self.bitrate}k"
        if ffmpeg_codec == "libaom-av1":
            ffmpeg_params.extend(["-cpu-used", "6"])
        logger.debug("%s tuning: bitrate=%s", ffmpeg_codec, bitrate)
    # Codec-specific tuning and quality mapping (used only when bitrate is not provided).
    elif ffmpeg_codec in ["libx264", "libx265"]:
        # Map quality (0-10) to CRF (35-18)
        # 10 -> 18 (Excellent), 0 -> 35 (Low quality)
        # Default to 23 if not specified
        crf = 18 + (10 - self.quality) * 1.7 if self.quality is not None else 23
        ffmpeg_params.extend(["-crf", f"{int(crf)}"])
        logger.debug(f"{ffmpeg_codec} tuning: crf={int(crf)}")

    elif ffmpeg_codec == "libaom-av1":
        # AV1 is extremely slow by default. Use -cpu-used 6 for better speed.
        # Map 0-10 quality to CRF 50-20 (lower is better)
        crf = 20 + (10 - self.quality) * 3 if self.quality is not None else 32
        ffmpeg_params.extend(["-crf", f"{int(crf)}", "-cpu-used", "6"])
        # libaom-av1 needs bitrate=0 to enable CRF mode in FFmpeg
        bitrate = "0"
        logger.debug(f"AV1 tuning: crf={int(crf)}, cpu-used=6")

    elif ffmpeg_codec == "mpeg4":
        # Map quality (0-10) to -q:v (31-2)
        # Higher -q:v is lower quality.
        qv = 2 + (10 - self.quality) * 2.9 if self.quality is not None else 4
        ffmpeg_params.extend(["-q:v", f"{int(qv)}"])
        logger.debug(f"MPEG-4 tuning: q:v={int(qv)}")

    # Apply final FFmpeg parameters
    if self._ffmpeg_report_path is not None:
        ffmpeg_params.extend(["-report", "-loglevel", "info"])
    try:
        self._writer = _RawFfmpegPipeWriter(
            self.output_path,
            self._adjusted_width,
            self._adjusted_height,
            self.fps,
            ffmpeg_codec,
            "rgb24",
            "yuv420p",
            ffmpeg_params,
            ffmpeg_log_level,
            bitrate,
        )
        logger.debug(
            f"Initialized FFmpeg pipe writer: {width}x{height} @ {self.fps}fps, "
            f"codec={ffmpeg_codec}, quality={self.quality}, bitrate={bitrate}, "
            f"params={ffmpeg_params}"
        )
    except RuntimeError as e:
        msg = str(e)
        self._restore_ffmpeg_report_env()
        if "ffmpeg" in msg.lower():
            raise VideoEncodingError(
                "FFmpeg backend not found. Ensure a bundled FFmpeg is available or ffmpeg is on PATH."
            ) from e
        raise VideoEncodingError(f"Failed to initialize video encoder: {e}") from e
    except Exception as e:
        self._restore_ffmpeg_report_env()
        raise VideoEncodingError(f"Failed to initialize video encoder: {e}") from e

is_initialized()

Check if encoder is initialized.

Source code in src/renderkit/processing/video_encoder.py
453
454
455
def is_initialized(self) -> bool:
    """Check if encoder is initialized."""
    return self._writer is not None

write_frame(buf)

Write an ImageBuf frame to the video.

Source code in src/renderkit/processing/video_encoder.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def write_frame(self, buf: oiio.ImageBuf) -> None:
    """Write an ImageBuf frame to the video."""
    if self._writer is None:
        raise VideoEncodingError("Video encoder not initialized. Call initialize() first.")

    spec = buf.spec()
    if spec.width != self._adjusted_width or spec.height != self._adjusted_height:
        buf = ImageScaler.scale_buf(
            buf,
            width=self._adjusted_width,
            height=self._adjusted_height,
            filter_name="lanczos3",
        )
        spec = buf.spec()

    pixels = buf.get_pixels(oiio.FLOAT)
    if pixels is None or pixels.size == 0:
        raise VideoEncodingError("Failed to extract pixel data from ImageBuf.")
    if pixels.ndim == 1:
        frame = pixels.reshape((spec.height, spec.width, spec.nchannels))
    else:
        frame = pixels

    frame_f32 = frame.astype(np.float32, copy=False)
    frame = np.clip(frame_f32, 0.0, 1.0)
    frame = (frame * np.float32(255.0)).astype(np.uint8)

    # FFmpeg rawvideo expects RGB (standard)
    # If RGBA, drop alpha channel
    if len(frame.shape) == 3 and frame.shape[2] == 4:
        frame = frame[:, :, :3]
    elif len(frame.shape) == 3 and frame.shape[2] == 1:
        frame = np.repeat(frame, 3, axis=2)
    # If Grayscale, ensure 3D
    elif len(frame.shape) == 2:
        frame = np.stack([frame] * 3, axis=-1)

    try:
        self._writer.append_data(frame)
    except Exception as e:
        report_tail = self._read_ffmpeg_report_tail()
        if report_tail:
            raise VideoEncodingError(
                f"Failed to write frame to video: {e}\n\n{report_tail}"
            ) from e
        raise VideoEncodingError(f"Failed to write frame to video: {e}") from e