Pipeline builder incremental mediasets pdf extraction

Hi,

Is incremental computation mode supported for media sets? I am not seeing the option under the media set icon in pipeline builder.

My workflow is pdf to text with OCR and then text to language model.

To avoid wasting tokens on documents that have already been processed I have created an intermediary dataset that holds the extracted text, written with append rows. Because OCR is not guaranteed to be deterministic, I need to key on file name. This works, but still involves extra OCR extractions on documents that have already been processed.

Also worth noting I am unable to extract the text from a media reference from a dataset due to permission issues.

Currently incremental media sets are not supported in Pipeline Builder (https://community.palantir.com/t/does-pipeline-builder-support-incremental-builds-off-of-a-mediaset/182)

If you’re using the use llm node you can also skip recomputing rows which will help you save time and costs: https://www.palantir.com/docs/foundry/pipeline-builder/pipeline-builder-llm/#skip-computing-already-processed-rows

GPT4o in Pipeline Builder also supports vision and will roll out next week!

1 Like

Hi @jkishk

Is it required to process the mediasets in pipeline builder? To complement @helenq 's answer - You could use @incremental(v2_semantics=True) or @incremental (semantic_v2) in code repositories. Yet, it is still beta but it works.

1 Like

Hi @Hugo, the transform does not have to be in Pipeline Builder. As we put the workflow into prod, we may have to move the transform into code repositories.

I’ve been setting up my own incremental logic to subtract rows based on a custom join. I use document id and page number, but you could optionally check the extracted text to see if it’s changed. Full credit to the Palantir team who showed me this approach and provided the boilerplate:

def get_incremental_data(ctx, input_dataset, output_dataset, limit_rows=True, limit=2):
    # We enforce the read of the input dataframe as a snapshot, via the snapshot_input decorator
    input_df_all_dataframe = input_dataset.dataframe(mode="current")

    if hasattr(ctx, '_is_incremental') and ctx._is_incremental:
        # We read the current output to see what we already processed in previous builds
        # Note: We have to specify the schema for the first run
        # page_id is a hash of the page number and media set item rid which should be unique for each page and stable between builds
        out_schema = T.StructType([
            T.StructField('page_id', T.StringType()),
            # T.StructField('page_number', T.IntegerType()),
        ])
        output_df_previous_dataframe = output_dataset.dataframe('current', out_schema)

        # ==== Example processing here ====
        # We diff the input with the current output, to find the "new rows".
        # We do this with a LEFT ANTI join : A - B <==> A LEFT ANTI B
        KEY = ["page_id"]
        new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY)
    else:
        # On first run
        new_rows_df = input_df_all_dataframe

    # We had a timestamp for easier tracking/debugging/understanding of the example
    new_rows_df = new_rows_df.withColumn('incremental_ts', F.current_timestamp())

    # 2. Conditional row limiting based on the parameter
    if limit_rows:
        new_rows_df = new_rows_df.limit(limit)

    return new_rows_df

Like you said though OCR isn’t deterministic. But if you are lucky enough to be extracting the text layer you cold know with a fair degree of certainty if the text of the page has changed in any way. If you are processing images, audio, etc you might be able to use checksums. Even with incremental support for media sets it might not have the level of granularity your want.

Hi @jkishk

We have been using v2_semantics and haven’t had issues (see code below).

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    output=Output("..."),
    media_refs=Input("..."),
    mediaset=MediaSetInput("...")
    ),
)
def compute(ctx, output, media_refs, mediaset):

    def extract_text_from_pdf(media_item_rid, page_number):
        response = mediaset.transform_document_to_text_raw(media_item_rid, page_number)
        return response.read().decode("utf-8")

    text_udf = F.udf(extract_text_from_pdf, T.StringType())

    df = media_refs.dataframe().withColumn('extracted_text', text_udf(F.col('mediaItemRid'), F.col("page_number")))

    output.write_dataframe(df)

I am not sure how close to GA v2_semantic is but maybe your colleagues closer to the development team can elaborate further.

1 Like