Hey @95ac6765511dd7a8c058 ,
I’ve just been working on something like this and have some handy code snippets for you!
First, here’s a transform to extract scene frames using this transform:
https://www.palantir.com/docs/foundry/transforms-python/media-set-transforms-api#get_scene_frame_timestamps
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from transforms.api import configure, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput
log = logging.getLogger(__name__)
@configure(profile=['DRIVER_MEMORY_EXTRA_LARGE', 'DRIVER_CORES_EXTRA_LARGE'])
@transform(
video_input=MediaSetInput("ri.mio.main.media-set.AAAAA"),
scene_frames_output=MediaSetOutput("ri.mio.main.media-set.BBBBBB"),
)
def compute(ctx, video_input, scene_frames_output):
"""Extract individual scene frames from video segments — parallelised version.
1. Detect scene transitions using STANDARD sensitivity to get timestamps (bulk)
2. Parallelise frame extraction across videos using ThreadPoolExecutor
"""
# Step 1: Get scene frame timestamps for all videos in one bulk call
timestamps_df = video_input.transform().get_scene_frame_timestamps(
scene_sensitivity="STANDARD"
)
rows = timestamps_df.collect()
log.info("Found %d videos to process for scene frames", len(rows))
# Step 2: Build work items — (media_item_rid, base_name, timestamp, idx)
work_items = []
for row in rows:
media_item_rid = row["media_item_rid"]
path = row["path"]
base_name = path.replace(".mp4", "")
scene_data = json.loads(row["scene_frames"])
timestamps = [float(f["timestamp"]) for f in scene_data.get("frames", [])]
for idx, ts in enumerate(timestamps):
work_items.append((media_item_rid, base_name, ts, idx))
log.info("Total scene frames to extract: %d", len(work_items))
# Step 3: Process frames in parallel
def extract_single_frame(item):
media_item_rid, base_name, ts, idx = item
single_frame = video_input.transform().extract_frames_at_timestamp(
ts,
media_item_rid=media_item_rid,
output_format="png",
)
frame_path = f"{base_name}_scene_{idx:04d}.png"
scene_frames_output.write(
single_frame,
lambda _input_path, _page=None, fp=frame_path: fp,
)
return frame_path
completed = 0
errors = 0
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(extract_single_frame, item): item
for item in work_items
}
for future in as_completed(futures):
item = futures[future]
try:
path = future.result()
completed += 1
if completed % 50 == 0:
log.info("Progress: %d/%d frames extracted", completed, len(work_items))
except Exception as e:
errors += 1
log.warning("Failed to extract frame %s: %s", item, e)
log.info("Scene frame extraction complete: %d succeeded, %d failed", completed, errors)
Once you have those scene frames in a media set, I would recommend starting with a pipeline builder with a VLM running detections on those scene frames like this:
Good luck! And let us know if you have any issued with this