I want to record all the different version of the data I’m processing.
For example, every day I have a dataset that updates with new data. I want to keep all the day’s data, as an historical log/archive of what was the data at a given point in time.
How can I archive data day after each other ?
You can use this community post’s code to generate fake data, if you want to test the behavior end-to-end: https://community.palantir.com/t/how-can-i-mock-incremental-behavior-in-code-repository/1381/2
Then you can archive the data, by keep appending the data to a given dataset, like in the below code.
from pyspark.sql import functions as F
from transforms.api import transform, Input, Output, incremental
@incremental(snapshot_inputs=["source_dataset"], require_incremental=True)
@transform(
out=Output("/path/example_archival"),
source_dataset=Input("/path/example_output"),
)
def compute(source_dataset, out):
# If I want to copy paste the entire input on each run
# I need to set the input as a "@incremental(snapshot_inputs=["source_dataset"])"
# The dataset will then be read entirely on each run
# Otherwise, only the new data will be read
new_data = source_dataset.dataframe()
# Note: We can add a timestamp or additional informaton here
new_data = new_data.withColumn("archival_ts", F.current_timestamp())
# We "append" to the output
# Note: If you want to make sure that the output will never snapshot, or be replaced,
# you can add an argument to the @incremental decorator to make any snapshot build to fail
# @incremental(require_incremental=True)
out.set_mode("modify")
out.write_dataframe(new_data)
Note: If you hit an error on build Profiles with invalid permissions: "KUBERNETES_NO_EXECUTORS".
please import the KUBERNETES_NO_EXECUTORS
spark profiles in the settings of your code repository