Source code for poseinterface.io

"""Functions to convert annotations and videos to ``poseinterface`` format."""

import copy
import json
import logging
import re
import shutil
from pathlib import Path
from typing import Literal, TypeAlias

import numpy as np
import sleap_io as sio
import xarray as xr
from movement.io import load_dataset
from sleap_io.io import coco
from sleap_io.io.cli import _get_video_encoding_info, _is_ffmpeg_available
from sleap_io.io.dlc import is_dlc_file

PoseInterfaceFormat: TypeAlias = Literal["clip", "frame"]

_EMPTY_LABELS_ERROR_MSG = {
    "default": (
        "No annotations could be extracted from the input file. "
        "Please check that the input file contains labeled frames. "
    ),
    "dlc": (
        "Ensure that the paths to the labelled frames are in the "
        "standard DLC project format: "
        "labeled-data / <video-name> / "
        "<filename-with-frame-number>.<extension> "
        "and that the frames files exist."
    ),
}
POSEINTERFACE_FRAME_REGEXP = r"frame-(\d+)"
DLC_FRAME_REGEXP = r"(\d+)"

# We support sleap's MediaVideo files
EXPECTED_SUFFIX = ".mp4"
EXPECTED_ENCODING = {
    "pixelformat": "yuv420p",
    "codec": "h264",  # codec name
}
REENCODING_PARAMS = {
    **EXPECTED_ENCODING,
    "codec": "libx264",  # overwrite with encoder to use
    "crf": 25,
    "preset": "superfast",
}


[docs] def annotations_to_poseinterface( input_path: Path, output_dir: Path, *, sub_id: str, ses_id: str, cam_id: str, format: PoseInterfaceFormat = "frame", ) -> Path: """Export annotations file from a single video to ``poseinterface`` format. Parameters ---------- input_path Path to the input annotations file. output_dir Directory where the output ``poseinterface`` COCO JSON file will be saved. sub_id Subject ID to include in the generated filenames. ses_id Session ID to include in the generated filenames. cam_id Camera ID to include in the generated filenames. format Whether to generate :ref:`frame labels<target-framelabels>` or :ref:`clip labels<target-cliplabels>`. Default is "frame". Returns ------- pathlib.Path Path to the saved ``poseinterface`` COCO JSON file. Raises ------ ValueError If no labeled frames could be extracted from the input file, or if the annotations refer to multiple videos. Notes ----- The format of the input annotations file is automatically inferred based on its extension. See :func:`sleap_io.io.main.load_file` for supported formats. See Also -------- sleap_io.io.main.load_file The underlying function used to load the input annotations file as a SLEAP labels object. sleap_io.io.coco.convert_labels The underlying function used to convert SLEAP labels to COCO format. Example ------- >>> from pathlib import Path >>> from poseinterface.io import annotations_to_poseinterface >>> coco_json_path = annotations_to_poseinterface( ... input_path=Path("path/to/annotations.slp"), ... output_dir=Path("path/to/output_directory"), ... sub_id="testSub123", ... ses_id="testSes123", ... cam_id="testCam123", ... ) """ labels = sio.load_file(input_path) if len(labels.labeled_frames) == 0: error_msg = _EMPTY_LABELS_ERROR_MSG["default"] if is_dlc_file(input_path): error_msg += _EMPTY_LABELS_ERROR_MSG["dlc"] raise ValueError(error_msg) if len(labels.videos) > 1: raise ValueError( "The annotations refer to multiple videos " f"(n={len(labels.videos)}). " "Please check that the input file contains annotations " "for a single video only." ) # Generate image filenames in the poseinterface format image_filenames = _generate_poseinterface_filenames( labels, sub_id=sub_id, ses_id=ses_id, cam_id=cam_id, include_file_extension=(format == "frame"), ) # Generate COCO dict coco_data = coco.convert_labels(labels, image_filenames=image_filenames) # Update image IDs in coco_data coco_data = _update_image_ids(coco_data, format=format) output_json_path = _build_output_json_path( output_dir=output_dir, coco_data=coco_data, sub_id=sub_id, ses_id=ses_id, cam_id=cam_id, format=format, ) with open(output_json_path, "w") as f: json.dump(coco_data, f) return output_json_path
def _build_output_json_path( *, output_dir: Path, coco_data: dict, sub_id: str, ses_id: str, cam_id: str, format: PoseInterfaceFormat, ) -> Path: """Build output JSON path using poseinterface naming conventions.""" output_dir.mkdir(parents=True, exist_ok=True) prefix = f"sub-{sub_id}_ses-{ses_id}_cam-{cam_id}" if format == "frame": return output_dir / f"{prefix}_framelabels.json" if len(coco_data["images"]) == 0: raise ValueError( "No images were found in the COCO data. " "Cannot infer start frame and duration for cliplabels format." ) frame_numbers = [ _extract_frame_number(img["file_name"]) for img in coco_data["images"] ] start_frame = min(frame_numbers) n_frames = len(frame_numbers) padded_start = str(start_frame).zfill(len(str(max(frame_numbers)))) return ( output_dir / f"{prefix}_start-{padded_start}_dur-{n_frames}_cliplabels.json" ) def _update_image_ids( coco_data: dict, format: PoseInterfaceFormat = "frame" ) -> dict: """Assign new image IDs based on the format. For frame format, each image ID is set to the session-video frame number extracted from the filename. For clip format, images are sorted by frame number and assigned 0-based indices within the clip. """ file_names = [img["file_name"] for img in coco_data["images"]] if len(file_names) != len(set(file_names)): raise ValueError( "Duplicate image filenames were found. Please check that the " "input annotations do not contain duplicate frames." ) data = copy.deepcopy(coco_data) old_to_new_id = {} if format == "frame": for img in data["images"]: old_img_id = img["id"] new_img_id = _extract_frame_number(img["file_name"]) old_to_new_id[old_img_id] = new_img_id else: data["images"].sort( key=lambda img: _extract_frame_number(img["file_name"]) ) for idx, img in enumerate(data["images"]): old_to_new_id[img["id"]] = idx if len(old_to_new_id) != len(set(old_to_new_id.values())): raise ValueError( "Extracted image IDs are not unique. Please check that the frame " "numbers as specified in the filename are unique." ) for img in data["images"]: img["id"] = old_to_new_id[img["id"]] for annot in data["annotations"]: annot["image_id"] = old_to_new_id[annot["image_id"]] return data def _extract_frame_number( filename: str, frame_regexp: str = POSEINTERFACE_FRAME_REGEXP ) -> int: """Extract the frame number in the input filename. If no frame number is found, a ValueError is raised. """ match = re.search(frame_regexp, filename) if match is None: raise ValueError( "No frame number could be extracted from filename " f"{filename}. Please check that the filename contains a " "frame number matching the provided regexp pattern " rf"'{frame_regexp}'." ) return int(match.group(1)) def _generate_poseinterface_filenames( labels: sio.Labels, *, sub_id: str, ses_id: str, cam_id: str, include_file_extension: bool = False, ) -> list[str]: """Generate PoseInterface image filenames for frames in the input labels. The generated filenames are in the format: {sub_id}_{ses_id}_{cam_id}_frame-{0-padded_frame_number} If `include_file_extension` is True, the generated filenames will include the file extension of the original frame files, in the format: {sub_id}_{ses_id}_{cam_id}_frame-{0-padded_frame_number}.{file_extension} Parameters ---------- labels SLEAP labels object containing the annotations and video information. sub_id Subject ID to include in the generated filenames. ses_id Session ID to include in the generated filenames. cam_id Camera ID to include in the generated filenames. include_file_extension Whether to include the file extension of the original frame files in the generated filenames. Default is False. Returns ------- list[str] List of generated COCO image filenames corresponding to each labeled frame. Raises ------ ValueError If no labeled frames could be extracted from the input file. Notes ----- When the SLEAP labels video object is a video file, per-frame file extensions are not available. Therefore, when ``include_file_extension`` is True, the generated filenames assume a ``.png`` extension. """ video_filenames = labels.videos[0].filename if isinstance(video_filenames, list): # Sequence of frame images frame_numbers = [ _extract_frame_number(Path(fn).stem, frame_regexp=DLC_FRAME_REGEXP) for fn in video_filenames ] file_extensions = ( [Path(fn).suffix for fn in video_filenames] if include_file_extension else [] ) else: # Video file frame_numbers = [lf.frame_idx for lf in labels.labeled_frames] file_extensions = ( [".png"] * len(frame_numbers) if include_file_extension else [] ) padded_frame_numbers = _pad_integers_to_same_width(frame_numbers) prefix = f"sub-{sub_id}_ses-{ses_id}_cam-{cam_id}_frame-" if include_file_extension: return [ prefix + frame_id + ext for frame_id, ext in zip(padded_frame_numbers, file_extensions) ] else: return [prefix + frame_id for frame_id in padded_frame_numbers] def _pad_integers_to_same_width(input: list[int]) -> list[str]: """Pad a list of integers to the same width with leading zeros.""" width = len(str(max(input))) padded_numbers = [str(number).zfill(width) for number in input] return padded_numbers
[docs] def video_to_poseinterface( input_video: Path | str, output_video_dir: Path | str, *, sub_id: str, ses_id: str, cam_id: str, ) -> Path: """Reencode and rename a video to ``poseinterface`` format. Copies the input video to ``output_video_dir`` with the filename ``sub-<sub_id>_ses-<ses_id>_cam-<cam_id>.mp4``. If the video is not already encoded as H.264 + yuv420p in an ``.mp4`` container, it is re-encoded with ffmpeg before saving. Parameters ---------- input_video Path to the video to convert. output_video_dir Directory where the converted video will be written (created automatically if it does not exist). sub_id Subject ID used to build the output filename. ses_id Session ID used to build the output filename. cam_id Camera ID used to build the output filename. Returns ------- Path Path to the saved ``.mp4`` file. Raises ------ RuntimeError If ffmpeg is not available on the system PATH. """ _check_ffmpeg() output_video = ( Path(output_video_dir) / f"sub-{sub_id}_ses-{ses_id}_cam-{cam_id}.mp4" ) Path(output_video_dir).mkdir(parents=True, exist_ok=True) if not _needs_reencoding(input_video): shutil.copy(input_video, output_video) else: _reencode_video(input_video, output_video) return output_video
def _check_ffmpeg() -> None: """Check ffmpeg is available and can be executed.""" if not _is_ffmpeg_available(): raise RuntimeError("ffmpeg is required but not found") sio.set_default_video_plugin("ffmpeg") def _needs_reencoding(input_video_path: str | Path) -> bool: """Check if reencoding is required for input video.""" input_video_path = Path(input_video_path) logging.info(f"Input video: {input_video_path}") if input_video_path.suffix.lower() != EXPECTED_SUFFIX: return True encoding = _get_codec_pixelformat(input_video_path) if encoding != EXPECTED_ENCODING: logging.info( f"Video encoding ({encoding}) does not match " f"the expected values ({EXPECTED_ENCODING}). " "The video will be reencoded." ) return True return False def _get_codec_pixelformat( input_video_path: str | Path, ) -> dict[str, str | None]: """Get relevant video encoding parameters as a dictionary. It wraps sleap-io's `_get_video_encoding_info`, which uses `ffmpeg -i` to extract metadata without requiring `ffprobe` to be in PATH. Notes ----- `_get_video_encoding_info` returns a `VideoEncodingInfo` object with the following attributes: - codec: Video codec name (e.g., "h264", "hevc"). - codec_profile: Codec profile (e.g., "Main", "High"). - pixel_format: Pixel format (e.g., "yuv420p"). - bitrate_kbps: Bitrate in kilobits per second. - fps: Frames per second. - gop_size: Group of pictures size (keyframe interval). - container: Container format (e.g., "mov", "avi"). """ info = _get_video_encoding_info(input_video_path) if info is None: raise RuntimeError( f"Could not read encoding info from {input_video_path}. " "Ensure ffmpeg is installed and the file is a valid video." ) return { "codec": info.codec, "pixelformat": info.pixel_format, } def _reencode_video( input_video_path: str | Path, output_video_path: str | Path, ) -> Path: """Reencode video to default format.""" video = sio.load_video(Path(input_video_path)) reencoded_video_path = sio.save_video( video, filename=output_video_path, fps=video.fps, **REENCODING_PARAMS, ) logging.info(f"Re-encoded video saved to {reencoded_video_path}") return reencoded_video_path
[docs] def predictions_to_poseinterface( input_path: Path | str, video_path: Path | str, output_dir: Path | str, *, sub_id: str, ses_id: str, cam_id: str, ) -> Path: """Convert a prediction file to ``poseinterface`` COCO JSON format. This function reads predictions for a given video and writes the corresponding "video-level" COCO JSON labels in the ``poseinterface`` format, (i.e. a ``sub-<sub_id>_ses-<ses_id>_cam-<cam_id>_videolabels.json`` file). The output JSON file is meant to facilitate the extraction of "clip-level" labels, (i.e. files of the format ``sub-<sub_id>_ses-<ses_id>_cam-<cam_id>_start-<frame_id>_dur-<n_frames>_cliplabels.json``). Parameters ---------- input_path Path to the predictions file. It should be one of the formats supported by ``movement`` (see `movement supported formats`_) video_path Path to the corresponding video file. Used to attach video metadata (resolution) to the COCO output. output_dir Path to the directory where to save the output JSON file. sub_id Subject ID to include in the generated filenames. ses_id Session ID to include in the generated filenames. cam_id Camera ID to include in the generated filenames. Returns ------- Path Path to the saved COCO JSON file. Notes ------- For the full list of supported formats for the input file, see `movement supported formats`_. .. _movement supported formats: https://movement.neuroinformatics.dev/dev/user_guide/input_output.html#supported-third-party-formats """ # Read input file as movement dataset # NOTE: fps=None is ignore with NWB files ds = load_dataset( file=input_path, source_software="auto", # infer from validators fps=None, ) # Read video object video_path = Path(video_path) if not video_path.is_file(): raise FileNotFoundError( f"Input video file does not exist: {video_path}" ) video = sio.load_video(video_path) # Get video image width and height if video.shape is None: raise ValueError(f"Could not extract video shape from {video_path}. ") _, img_h, img_w, _ = video.shape # Convert movement dataset to videolabels dict coco_data = _convert_movement_ds_to_videolabels( ds, sub_id=sub_id, ses_id=ses_id, cam_id=cam_id, img_h=img_h, img_w=img_w, ) # Export dict as JSON output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) output_json_path = ( output_dir / f"sub-{sub_id}_ses-{ses_id}_cam-{cam_id}_videolabels.json" ) with open(output_json_path, "w") as f: json.dump(coco_data, f) return output_json_path
def _convert_movement_ds_to_videolabels( ds: xr.Dataset, *, sub_id: str, ses_id: str, cam_id: str, img_w: int, img_h: int, ) -> dict[str, list[dict]]: """Convert predictions in movement dataset to videolabels dict.""" # Extract position array and coordinates from dataset positions = ds["position"].values # (time, space, keypoints, individuals) n_frames = positions.shape[0] keypoint_names = ds.coords["keypoints"].values.tolist() individual_names = ds.coords["individuals"].values.tolist() # Build categories list (one entry per individual) # NOTE: categories are 1-indexed to avoid conflicts # with models that treat category 0 as background. categories = [ { "id": i, "name": name, "keypoints": keypoint_names, "skeleton": [], } for i, name in enumerate(individual_names, start=1) ] # Build images list (one entry per frame) # NOTE: image id values are always 0-indexed frame_idx_width = len(str(n_frames - 1)) images = [ { "id": t, "file_name": ( f"sub-{sub_id}_ses-{ses_id}_cam-{cam_id}_frame-{t:0{frame_idx_width}d}" ), "width": img_w, "height": img_h, } for t in range(n_frames) ] # Build annotations list (one entry per frame per individual) annotations = [] annot_id = 1 for t in range(n_frames): for i in range(len(individual_names)): # Get position data for this frame and individual xy = positions[t, :, :, i].T # (n_keypoints, 2) # Determine kpt visibility: # 0: not labeled # 1: labeled but not visible (occluded) # 2: labeled and visible # NOTE: The current code only assigns 0 or 2 because the movement # dataset doesn't carry occlusion information visible_array = ~np.isnan(xy[:, 0]) & ~np.isnan( xy[:, 1] ) # (n_keypoints,) n_visible = int(visible_array.sum()) # Compute bbox from visible keypoints # (zeros if no keypoints are visible) if n_visible > 0: x_visible = xy[visible_array, 0] y_visible = xy[visible_array, 1] x_min = float(x_visible.min()) y_min = float(y_visible.min()) bbox_w = float(x_visible.max()) - x_min bbox_h = float(y_visible.max()) - y_min else: x_min, y_min, bbox_w, bbox_h = 0.0, 0.0, 0.0, 0.0 # Append results to list of annotations annotations.append( { "id": annot_id, "image_id": t, "category_id": i + 1, "keypoints": coco.encode_keypoints( np.c_[xy, visible_array] ), # returns flattened kpts [x1, y1, v1, x2, y2, v2, ...] "num_keypoints": n_visible, "bbox": [x_min, y_min, bbox_w, bbox_h], "area": bbox_w * bbox_h, "iscrowd": 0, } ) annot_id += 1 return { "images": images, "annotations": annotations, "categories": categories, }