Previous build timestamp in transforms

I have a workflow where I need to get rows from the input that were only added after the last build. Input is a snapshot (non-negotiable) so incremental doesn’t help.

Currently, I am making this work by having a column in the output dataset that stores the last build timestamp and at the start of every build, I am getting the max of the last build timestamp column and then filtering the input data based on the max value.

Is there a better way to achieve this? Would it be possible to include the last build timestamp as part of the TransformOutput or IncrementalTransformOutput object as a metadata?

1 Like

You could create an incremental output from the snapshot input.
In incremental builds you can reference the previous build.

This of course would have the disadvantage that you would have to store the input dataset essentially twice.

It would look something like this:

@incremental(snapshot_inputs=["source_df"])
@transform(
    output=Output("output_path"),
    new_rows=Output("new_rows_path"),
    source_df=Input("input_path")
):
    source_df = source_df.dataframe()
    previous_df = output.dataframe("previous", source_df_schema)
    new_rows_df = source_df.join(previous_df, "anti")

    output.set_mode("replace")
    output.write_dataframe(source_df)

    new_rows.set_mode("replace")
    new_rows.write_dataframe(new_rows_df)