Media analysis of the video to detect labels

Looking for solutions on
Media analysis of the video to detect pre-defined labels and provide time the label was detected in the video.

perform video recognition of warning signs:

  1. Use the Video Display widget or a data pipeline to extract key frames from your videos.

  2. Apply a machine learning model (such as an object detection or image classification model) to those frames, run inference using a pre-trained or custom-trained model.

  3. Store and visualize the results in Foundry, linking detected warning signs back to the original video data.strong text

Hey @95ac6765511dd7a8c058 ,

I’ve just been working on something like this and have some handy code snippets for you!

First, here’s a transform to extract scene frames using this transform:

https://www.palantir.com/docs/foundry/transforms-python/media-set-transforms-api#get_scene_frame_timestamps

import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from transforms.api import configure, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput

log = logging.getLogger(__name__)


@configure(profile=['DRIVER_MEMORY_EXTRA_LARGE', 'DRIVER_CORES_EXTRA_LARGE'])
@transform(
    video_input=MediaSetInput("ri.mio.main.media-set.AAAAA"),
    scene_frames_output=MediaSetOutput("ri.mio.main.media-set.BBBBBB"),
)
def compute(ctx, video_input, scene_frames_output):
    """Extract individual scene frames from video segments — parallelised version.

    1. Detect scene transitions using STANDARD sensitivity to get timestamps (bulk)
    2. Parallelise frame extraction across videos using ThreadPoolExecutor
    """

    # Step 1: Get scene frame timestamps for all videos in one bulk call
    timestamps_df = video_input.transform().get_scene_frame_timestamps(
        scene_sensitivity="STANDARD"
    )
    rows = timestamps_df.collect()
    log.info("Found %d videos to process for scene frames", len(rows))

    # Step 2: Build work items — (media_item_rid, base_name, timestamp, idx)
    work_items = []
    for row in rows:
        media_item_rid = row["media_item_rid"]
        path = row["path"]
        base_name = path.replace(".mp4", "")
        scene_data = json.loads(row["scene_frames"])
        timestamps = [float(f["timestamp"]) for f in scene_data.get("frames", [])]
        for idx, ts in enumerate(timestamps):
            work_items.append((media_item_rid, base_name, ts, idx))

    log.info("Total scene frames to extract: %d", len(work_items))

    # Step 3: Process frames in parallel
    def extract_single_frame(item):
        media_item_rid, base_name, ts, idx = item
        single_frame = video_input.transform().extract_frames_at_timestamp(
            ts,
            media_item_rid=media_item_rid,
            output_format="png",
        )
        frame_path = f"{base_name}_scene_{idx:04d}.png"
        scene_frames_output.write(
            single_frame,
            lambda _input_path, _page=None, fp=frame_path: fp,
        )
        return frame_path

    completed = 0
    errors = 0
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {
            executor.submit(extract_single_frame, item): item
            for item in work_items
        }
        for future in as_completed(futures):
            item = futures[future]
            try:
                path = future.result()
                completed += 1
                if completed % 50 == 0:
                    log.info("Progress: %d/%d frames extracted", completed, len(work_items))
            except Exception as e:
                errors += 1
                log.warning("Failed to extract frame %s: %s", item, e)

    log.info("Scene frame extraction complete: %d succeeded, %d failed", completed, errors)

Once you have those scene frames in a media set, I would recommend starting with a pipeline builder with a VLM running detections on those scene frames like this:

Good luck! And let us know if you have any issued with this