How to run external transforms in chunks

I have a external transform that would take well over 24 hours to run on the entire dataset. What is the best way to partition it? I only need to run it through one time.

The below code illustrates the most typical approach. It assumes that the input data has a primary key named record_id.

@external_systems(
    my_source=Source("...")
)
@incremental()
@transform(
    my_input=Input("...")
    output=Output("...")
)
def process_in_batches(my_source, my_input, output):
    output_schema = T.StructType([
        T.StructField("record_id", T.StringType())
    ])
    existing_output_data = output.dataframe(mode="previous", schema=output_schema)
    unprocessed_records = my_input.dataframe(mode="current").join(
        existing_output_data,
        on="record_id",
        how="left_anti"
    )
    records_to_process = unprocessed_records.limit(5000).checkpoint()

    if records_to_process.count() == 0:
        output.abort()
        return

    # Interact with the external system here. 

    output.write_dataframe(records_to_process.select("record_id"))

Tweak the code as necessary if you want to capture the results of interacting with the external system, not just the fact that you processed a record.

You’ll want to configure your schedule to force-build so that it will keep running even if the input dataset hasn’t updated since the last build. Most folks also configure the schedule to run every minute so it keeps running “in a loop.”

Once the transform starts aborting, you know that the “work is done” and all of the input records have been processed. You can then get rid of the every-minute force-build schedule.

I believe that the use of checkpoint guarantees that the records you process and the records you write to the output should be the same, but in the interest of full disclosure, I personally usually use take(5000) to get a Python list of rows instead of limit(5000).checkpoint() to get a row-limited Spark dataframe. I then process the data in parallel on the driver using Python’s ThreadPoolExecutor, and finally convert the processed data back into a Spark dataframe before writing it to the output. This gives me more confidence that I’m definitely writing the same data that I processed, and for interactions with an external system, using threads on the driver is generally preferable to using a Spark UDF anyway. Threads give you better control over the degree of concurrency and are also more efficient in this case, because Spark is designed for distributing and parallelizing compute, not for performing concurrent requests.