My incremental pipeline ran as a snapshot and now the build is too big

I have a pipeline where I pull in data from a bunch of different sources, parse the data using an LLM call, and then feed it into the backing dataset for an object. There are 32 columns, 3,004 files, 16 GB. It only has 113,692 rows which should not have that many files.

Right now the build is way too big. How can we either force stack resources to focus on this job or find a quick way to make the job faster so we can get this build back to incremental asap?

I need a short term fix to get back to incremental but also am curious about a long term solution.

In short: You can process chunk by chunk, with some custom code.

The answer I’ll give is not-LLM specificy, and is usually met once you have very large volume of data (TB). In case of LLM, given the per-row processing is fairly slow, you can indeed face the same kind of issues earlier.

If you use code repositories, you can “distill” an incremental into multiple builds.
The high level principle is fairly simple. You will have 3 datasets:

  • [A] The original dataset with all the data
  • [B] The “distilling” dataset - which will have rows appended “by chunk”, e.g. 10k by 10k.
  • [C] The output dataset, with the result of the computation, always appending

The idea is that you regularly trigger a build containing B and C, a batch of news rows will be appended to B and C will process them. Repeat as many time as needed.

Example code that defines dataset B:

@incremental(snapshot_inputs=["input_dataset"], semantic_version=1)
# @transform to have more control over inputs and outputs.
@transform(
    output_dataset = Output(".../rows_by_chunk"),
    input_dataset=Input(".../all_rows"),
)
def compute_snapshot_to_incremental(ctx, input_dataset, output_dataset):
    # We enforce the read of the input dataframe as a snapshot
    # So that, if it were to one day run as incremental, the logic here won't break.
    input_df_all_dataframe = input_dataset.dataframe(mode="current")

    if ctx._is_incremental:
        # We read the current output to see what we already processed in previous builds
        # Note: We have to specify the schema for the first run
        # you can get the below schema by going on the dataset preview, schema, copy, as Python StructTypes - here I leave the schema of a mediaset for reference.
        out_schema = T.StructType([
                                T.StructField('mediaItemRid', T.StringType()),
                                T.StructField('path', T.StringType()),
                                T.StructField('mediaReference', T.StringType()),
                                T.StructField('text', T.StringType()),
                                T.StructField('first_line', T.StringType()),
                                T.StructField("incremental_ts", T.TimestampType(), True)
                            ])
        output_df_previous_dataframe = output_dataset.dataframe('current', out_schema)

        # We diff the input with the current output, to find the "new rows".
        # We do this with a LEFT ANTI join : A - B <==> A LEFT ANTI B
        KEY = ["path"] # the primary keys of the rows
        new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY)
    else:
        # On first run
        new_rows_df = input_df_all_dataframe

    # We had a timestamp for easier tracking/debugging/understanding of the example
    new_rows_df = new_rows_df.withColumn('incremental_ts', F.current_timestamp())

    # We limit at 1k rows
    new_rows_df = new_rows_df.limit(1000)

    # ==== End of example processing ====

    # This will append rows
    output_dataset.set_mode("modify")
    output_dataset.write_dataframe(new_rows_df)

1000 rows will be added on each run (configurable).

Hello @naomicr,

Would be good to get a few more details to understand the problem.

You’re saying that the job became way too big. But 113,962 rows is not that big, neither is 3004 files.

Are you seeing issues because re-running 113,962 rows through LLM is too much? In this case, the long term solution could be to enable memoization for the LLM board in Pipeline Builder. This re-uses results even across snapshot builds.

For the number of files you can consider using a .repartition transform which will force a shuffle and let Spark redistribute the data.

In any case - a few more details here will be helpful to better answer.

Hi @mtelling,

Here’s some more background. We have a pipeline builder incremental transform that randomly snapshotted due to one of our upstream datasets being misconfigured. This snapshot has failed after 23 hours with the error below:

Spark module ‘ri.spark-module-manager.main.spark-module.’ died while job ‘ri.foundry.main.job.’ was using it. (ExitReason: UNEXPECTED_MODULE_EXIT)

We believe this error refers to the job being too large to process, and are looking for ways to complete this snapshot so the pipeline can go back to being incremental.

More context: Previously, due to the large number of files in the output dataset, we would snapshot the transform to repartition the files once a month. Those previous snapshots never took this long, and we’re wondering why this time it’s taking so long.

In the long term, we would look into enabling memoization and using a .repartition transform to force a shuffle instead of a snapshot. But is there any way for us to get out of this issue in the short term?

@charlesq you can try to use an advanced spark profile and give it more executors and more memory.
It should be possible to scale to a point where your transform can handle the data at this scale.

In Pipeline Builder, which profile was this using?