from argparse import ArgumentParser from pathlib import Path import cv2 as cv parser = ArgumentParser(description="Subtract the background from a video with OpenCV MOG2.") parser.add_argument("input_video", help="Input video path.") parser.add_argument("mask_video", help="Output video path for the foreground mask.") parser.add_argument("--preview", default="output/foreground-mask-preview.png", help="Output PNG path for the strongest mask frame.") parser.add_argument("--history", type=int, default=80, help="Number of frames kept in the background model.") parser.add_argument("--var-threshold", type=float, default=25.0, help="MOG2 variance threshold for foreground decisions.") parser.add_argument("--warmup", type=int, default=5, help="Frames to ignore while the background model initializes.") args = parser.parse_args() capture = cv.VideoCapture(args.input_video) if not capture.isOpened(): raise SystemExit(f"Could not open video: {args.input_video}") fps = capture.get(cv.CAP_PROP_FPS) or 25.0 width = int(capture.get(cv.CAP_PROP_FRAME_WIDTH)) height = int(capture.get(cv.CAP_PROP_FRAME_HEIGHT)) if width <= 0 or height <= 0: raise SystemExit("Could not read frame size from the input video.") mask_path = Path(args.mask_video) preview_path = Path(args.preview) mask_path.parent.mkdir(parents=True, exist_ok=True) preview_path.parent.mkdir(parents=True, exist_ok=True) fourcc = cv.VideoWriter_fourcc(*"mp4v") writer = cv.VideoWriter(str(mask_path), fourcc, fps, (width, height)) if not writer.isOpened(): raise SystemExit(f"Could not open VideoWriter for: {mask_path}") subtractor = cv.createBackgroundSubtractorMOG2( history=args.history, varThreshold=args.var_threshold, detectShadows=True, ) frame_count = 0 active_frames = 0 peak_pixels = 0 peak_mask = None while True: ok, frame = capture.read() if not ok: break frame_count += 1 raw_mask = subtractor.apply(frame) _, foreground_mask = cv.threshold(raw_mask, 254, 255, cv.THRESH_BINARY) writer.write(cv.cvtColor(foreground_mask, cv.COLOR_GRAY2BGR)) if frame_count > args.warmup: foreground_pixels = cv.countNonZero(foreground_mask) if foreground_pixels: active_frames += 1 if foreground_pixels > peak_pixels: peak_pixels = foreground_pixels peak_mask = foreground_mask.copy() capture.release() writer.release() if frame_count == 0: raise SystemExit(f"No frames were read from: {args.input_video}") if peak_mask is None: peak_mask = foreground_mask if not cv.imwrite(str(preview_path), peak_mask): raise SystemExit(f"Could not write preview image: {preview_path}") print(f"input={args.input_video}") print(f"frames={frame_count} size={width}x{height} fps={fps:.2f}") print(f"warmup_frames={min(args.warmup, frame_count)}") print(f"active_frames={active_frames} peak_foreground_pixels={peak_pixels}") print(f"wrote_video={mask_path}") print(f"wrote_preview={preview_path}")