How to snapshot a dataset from time to time, automatically?

I have a pipeline, which is incremental and keep appending data. I need from time to time to retention away some data that is no longer relevant.
How can I “snapshot” my pipeline from time to time automatically ?

1 Like

High level: You can use a “control dataset” as a way to “queue” a particular behavior to execute (like an alternative processing, snapshotting, …) and use incremental transforms to keep state, as they keep track of which row was consumed (and so which “behavior order” got already “consumed”).

Example of typical code that would allow this is:

from transforms.api import transform, Input, Output, incremental, configure
from pyspark.sql import types as T
from pyspark.sql import functions as F


# Generation of empty dataframe, that just appends the current timestamp
def get_empty_df(ctx):
    schema = T.StructType([T.StructField("key", T.StringType(), True)])
    df = ctx.spark_session.createDataFrame([("dummy_key",)], schema)
    df = df.withColumn('when', F.current_timestamp())
    return df

@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental()
@transform(
    out=Output("/path/control_dataset")
)
def out_C(ctx, out):
    df = get_empty_df(ctx)

    # build all the time and append the timestamp
    out.set_mode("modify")

    # Custom type writing : JSON. Otherwise defaults to parquet which benefits from compression, partitioning, etc.
    # See https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#transformoutput
    out.write_dataframe(df, output_format="json")


@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
    out=Output("/path/dataset_A_to_process")
)
def out_A(ctx, out):
    df = get_empty_df(ctx)

    # build all the time and append the timestamp
    out.set_mode("modify")

    # Custom type writing : JSON. Otherwise defaults to parquet which benefits from compression, partitioning, etc.
    # See https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#transformoutput
    out.write_dataframe(df, output_format="json")




@incremental()
@transform(
    output_dataset=Output("/path/output_dataset"),
    input_dataset=Input("/path/dataset_A_to_process"),
    control_dataset=Input("/path/control_dataset")
)
def conditional_logic(ctx, input_dataset, control_dataset, output_dataset):
    control_df = control_dataset.dataframe() # Maybe one row (if it ran since last time) otherwise empty

    # Note: Can as well check if the incremental inputs are incremental with 
    # if ctx.is_incremental:
    # No need in this particular implementation

    if control_df.count() > 0:
        # Some other logic = e.g. in our case, we snapshot
        input_df = input_dataset.dataframe("current") # All the data, including historical and new data if any
        modified_df = input_df.withColumn("processing_ts", F.current_timestamp())
        # Snapshot write on the output (replace the full output)
        output_dataset.set_mode("replace")
        output_dataset.write_dataframe(modified_df)
    else:
        # TODO : Custom logic here :) = e.g. in our case, the day to day processing where we forward input to output
        input_df = input_dataset.dataframe() # Only the new data if any
        modified_df = input_df.withColumn("processing_ts", F.current_timestamp())

        # Incrementally write on the output
        output_dataset.set_mode("modify")
        output_dataset.write_dataframe(modified_df)



This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.