How can I archive data in Transforms?

I want to record all the different version of the data I’m processing.
For example, every day I have a dataset that updates with new data. I want to keep all the day’s data, as an historical log/archive of what was the data at a given point in time.

How can I archive data day after each other ?

You can use this community post’s code to generate fake data, if you want to test the behavior end-to-end: https://community.palantir.com/t/how-can-i-mock-incremental-behavior-in-code-repository/1381/2

Then you can archive the data, by keep appending the data to a given dataset, like in the below code.

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output, incremental

@incremental(snapshot_inputs=["source_dataset"], require_incremental=True)
@transform(
    out=Output("/path/example_archival"),
    source_dataset=Input("/path/example_output"),
)
def compute(source_dataset, out):
    # If I want to copy paste the entire input on each run
    # I need to set the input as a "@incremental(snapshot_inputs=["source_dataset"])"
    # The dataset will then be read entirely on each run
    # Otherwise, only the new data will be read
    new_data = source_dataset.dataframe()

    # Note: We can add a timestamp or additional informaton here
    new_data = new_data.withColumn("archival_ts", F.current_timestamp())

    # We "append" to the output
    # Note: If you want to make sure that the output will never snapshot, or be replaced,
    # you can add an argument to the @incremental decorator to make any snapshot build to fail
    # @incremental(require_incremental=True)
    out.set_mode("modify")
    out.write_dataframe(new_data)

Note: If you hit an error on build Profiles with invalid permissions: "KUBERNETES_NO_EXECUTORS". please import the KUBERNETES_NO_EXECUTORS spark profiles in the settings of your code repository