How to "jump over" transactions in pipeline transforms, for datasets and mediaset?

Hello,

I have a transform that processes files stored in a dataset, and another one that processes files stored in a mediaset.
The transforms is incremental and uses v2_semantics.

For some reason, I need to reprocess files that were already processed or “jump over” files to avoid processing them.

How can I do this ?

The general idea is that you can filter for the exact cut of files you want to process “manually”, and to force to write on the output via set_mode("modify") on dataset and set_write_mode("modify") on mediasets, which is respected even in the case of an incremental transform that “wants” to snapshot for some reason.

If the incremental transform tries to snapshot (e.g. you bumped the semantic version) you can still write your output as an append !
However, reading your output will yield to an empty dataframe (e.g. if you do my_output.filesystems().files() it will be empty).

Case 1 - with a mediaset as input

The exact code snippet to do so is:
list_media_filtered: DataFrame = input_mediaset.list_media_items(mode="current").filter( F.col("logicalOrdering") >= 1742579979575000)

This allow to filter the items of the mediaset by timestamp, and so to granularly control what to include or not.

To obtain the timestamp, you can create a quick pipeline builder, and use the transform “convert media sets to table rows”

You need to append three zeroes to the obtained timestamp !

  • Correct: 1742495355003950
  • Incorrect: 1742546845951

The below transform copy paste files from one input mediaset to an output mediaset, and logs each file in an additional output dataset.

import logging

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# from pyspark.sql import functions as F
from transforms.api import Input, Output, incremental, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


def process_normally(list_of_medias, input_mediaset, output_mediaset, output_dataset):
    # See https://www.palantir.com/docs/foundry/transforms-python/media-sets
    def upload_pdfs(media_item_rid, path):
        with input_mediaset.get_media_item(media_item_rid) as curr_file:
            response = output_mediaset.put_media_item(curr_file, path)
        return response.media_item_rid

    upload_udf = F.udf(upload_pdfs, StringType())

    media_reference_template = output_mediaset.media_reference_template()
    uploaded_pdfs = (
        list_of_medias.withColumn("uploaded_media_item_rid", upload_udf(F.col("mediaItemRid"), F.col("path")))
        .select("path", "uploaded_media_item_rid")
        .withColumn("mediaReference", F.format_string(media_reference_template, "uploaded_media_item_rid"))
    )

    # Add current timestamp as a new column
    uploaded_pdfs = uploaded_pdfs.withColumn("current_ts", F.current_timestamp())

    column_typeclasses = {"mediaReference": [{"kind": "reference", "name": "media_reference"}]}
    output_dataset.write_dataframe(uploaded_pdfs, column_typeclasses=column_typeclasses)


def fix_pointers(input_mediaset, output_mediaset, output_dataset):
    list_media_filtered: DataFrame = input_mediaset.list_media_items(mode="current").filter(
        F.col("logicalOrdering") >= 1742579979575000
    )

    logging.warning(f"show : {list_media_filtered.collect()}")
    logging.warning(f"column : {list_media_filtered.columns}")

    # As the transform is in "incremental" mode, but will try to snapshot the output, we enforce an "append" on the outputs
    output_dataset.set_mode("modify")
    output_mediaset.set_write_mode("modify")

    # do the code
    process_normally(list_media_filtered, input_mediaset, output_mediaset, output_dataset)


@incremental(v2_semantics=True, semantic_version=2)
@transform(
    output_dataset=Output("ri.foundry.main.dataset.11eff7fb-e9c6-4d13-9344-04736e6f46b6"),
    output_mediaset=MediaSetOutput("ri.mio.main.media-set.f60d8552-7d45-44b0-8752-d017e40526f1"),
    input_mediaset=MediaSetInput("ri.mio.main.media-set.bbcb55bc-eafc-4d34-80b9-3a010d324c0b"),
)
def compute(ctx, input_mediaset, output_dataset, output_mediaset):
    # Do the normal process like "everyday"
    # list_of_medias = input_mediaset.list_media_items_by_path(ctx)
    # process_normally(list_of_medias, input_mediaset, output_mediaset, output_dataset)

    # Alternative processing:
    fix_pointers(input_mediaset, output_mediaset, output_dataset)

Case 2 - with a dataset as input

The important line is input_dataset.filesystem().files().filter(F.col("modified") > 1742831163000) which allows to filter for the raw input files that are “after” a specific timestamp. Of course the transform needs to operate on the file level (e.g. via a mapPartitions)

import logging
import os

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import StringType

# from pyspark.sql import functions as F
from transforms.api import Input, Output, incremental, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


def process_normally(input_dataset, output_mediaset, output_dataset):
    # See https://www.palantir.com/docs/foundry/transforms-python/media-sets

    # all_files = list(input_dataset.filesystem().ls())
    # for current_file in all_files:
    #     with input_dataset.filesystem().open(current_file.path, 'rb') as f:
    #         filename = os.path.basename(current_file.path)
    #         response = output_mediaset.put_media_item(f, filename)
    #         # return response.media_item_rid

    def upload_pdf(current_file):
        with input_dataset.filesystem().open(current_file, "rb") as f:
            filename = os.path.basename(current_file)
            response = output_mediaset.put_media_item(f, filename)
            return response.media_item_rid

    upload_udf = F.udf(upload_pdf, T.StringType())

    media_reference_template = output_mediaset.media_reference_template()
    uploaded_pdfs = (
        input_dataset.filesystem().files().filter(F.col("modified") > 1742831163000)
        .withColumn("uploaded_media_item_rid", upload_udf(F.col("path")))
        .select("path", "uploaded_media_item_rid")
        .withColumn("mediaReference", F.format_string(media_reference_template, "uploaded_media_item_rid"))
    )

    # Add current timestamp as a new column
    uploaded_pdfs = uploaded_pdfs.withColumn("current_ts", F.current_timestamp())

    column_typeclasses = {"mediaReference": [{"kind": "reference", "name": "media_reference"}]}
    output_dataset.write_dataframe(uploaded_pdfs, column_typeclasses=column_typeclasses)


def fix_pointers(input_dataset, output_mediaset, output_dataset):
    # Note: the filtering could happen here instead:
    # input_dataset.filesystem().files().filter(F.col("modified") > 1742831163000)

    # As the transform is in "incremental" mode, but will try to snapshot the output, we enforce an "append" on the outputs
    output_dataset.set_mode("modify")
    output_mediaset.set_write_mode("modify")

    # do the code
    process_normally(input_dataset, output_mediaset, output_dataset)


@incremental(v2_semantics=True, semantic_version=3)
@transform(
    output_dataset=Output("ri.foundry.main.dataset.16d0fa2e-ff21-4341-bfed-6ce961984b8c"),
    output_mediaset=MediaSetOutput("ri.mio.main.media-set.39b7c986-1ca7-4256-ab55-0a61cbaa7143"),
    input_dataset=Input("ri.foundry.main.dataset.f20ec271-7b9b-4961-bda7-1b7f965b3a12"),
)
def compute(ctx, input_dataset, output_dataset, output_mediaset):
    # Do the normal process like "everyday"
    # process_normally(input_dataset, output_mediaset, output_dataset)

    # Alternative processing:
    fix_pointers(input_dataset, output_mediaset, output_dataset)

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.