Is there a way to have a pipeline periodically snapshot? We use the option to append only new rows to read a dataset as incremental downstream, but there are some mapping tables that will change every so often, and we’d like changes to that mapping table to persist through all historic data. I know we can do this with code repos, but wondering if it’s also possible through pipeline builder?
The below solution 1 should work with code repo and pipeline builder.
Solution 2, because it’s more advanced in the control of the input/output, is only for code repository.
Solution 1 - rely on the incremental system of Foundry
You can add to your pipeline a “snapshot trigger” input, with a dedicated schedule. Whenever it builds, as a snapshot, it will make a snapshot to propagate in your pipeline.
Solution 2 - More fine grained control
You might not always want to rely on the incremental behavior for the snapshot to propagate, but you might want to keep the actual build incremental to have access to the incremental benefits: reading your output etc.
A typical example would be to periodically “resnapshot” your output to optimize the partitioning. In that case, you want to read your output, partition it, re-write it.
Full code example
from transforms.api import transform, Input, Output, incremental, configure
from pyspark.sql import types as T
from pyspark.sql import functions as F
# Generation of empty dataframe, that just appends the current timestamp
def get_empty_df(ctx):
schema = T.StructType([T.StructField("key", T.StringType(), True)])
df = ctx.spark_session.createDataFrame([("dummy_key",)], schema)
df = df.withColumn('when', F.current_timestamp())
return df
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental()
@transform(
out=Output("/path/control_dataset")
)
def out_C(ctx, out):
df = get_empty_df(ctx)
# build all the time and append the timestamp
out.set_mode("modify")
# Custom type writing : JSON. Otherwise defaults to parquet which benefits from compression, partitioning, etc.
# See https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#transformoutput
out.write_dataframe(df, output_format="json")
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
out=Output("/path/dataset_A_to_process")
)
def out_A(ctx, out):
df = get_empty_df(ctx)
# build all the time and append the timestamp
out.set_mode("modify")
# Custom type writing : JSON. Otherwise defaults to parquet which benefits from compression, partitioning, etc.
# See https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#transformoutput
out.write_dataframe(df, output_format="json")
@incremental()
@transform(
output_dataset=Output("/path/output_dataset"),
input_dataset=Input("/path/dataset_A_to_process"),
control_dataset=Input("/path/control_dataset")
)
def conditional_logic(ctx, input_dataset, control_dataset, output_dataset):
control_df = control_dataset.dataframe() # Maybe one row (if it ran since last time) otherwise empty
# Note: Can as well check if the incremental inputs are incremental with
# if ctx.is_incremental:
# No need in this particular implementation
if control_df.count() > 0:
# Some other logic = e.g. in our case, we snapshot
input_df = input_dataset.dataframe("current") # All the data, including historical and new data if any
modified_df = input_df.withColumn("processing_ts", F.current_timestamp())
# Snapshot write on the output (replace the full output)
output_dataset.set_mode("replace")
output_dataset.write_dataframe(modified_df)
else:
# TODO : Custom logic here :) = e.g. in our case, the day to day processing where we forward input to output
input_df = input_dataset.dataframe() # Only the new data if any
modified_df = input_df.withColumn("processing_ts", F.current_timestamp())
# Incrementally write on the output
output_dataset.set_mode("modify")
output_dataset.write_dataframe(modified_df)
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental(snapshot_inputs=["input_dataset"])
@transform(
output_dataset=Output("/path/output_dataset_lf_example"),
output_aggregated_dataset=Output("/path/output_dataset_lf_example_aggregated"),
input_dataset=Input("/path /dataset_A_to_process"),
control_dataset=Input("/path/control_dataset")
)
def conditional_logic_lf(ctx, input_dataset, control_dataset, output_dataset, output_aggregated_dataset):
input_df = input_dataset.dataframe() # All the data
control_df = control_dataset.dataframe() # Maybe one row (if we rare logic) other empty
# TODO : Custom logic here :)
modified_df = input_df.withColumn("key", F.lit("simple logic executed"))
# Base case
output_dataset.set_mode("replace")
output_dataset.write_dataframe(modified_df)
if control_df.count() > 0:
# Some other logic
modified_df = input_df.withColumn("key", F.lit("other logic executed"))
# Base case
output_aggregated_dataset.set_mode("replace")
output_aggregated_dataset.write_dataframe(modified_df)
A more extensive explanation, with transaction-level explanation:
You can use a “control dataset” as a way to “queue” a particular behavior to execute (like an alternative processing, snapshotting, …) and use incremental transforms to keep state, as they keep track of which row was consumed (and so which “behavior order” got already “consumed”).
Can be used with any custom logic (clean-up, retention, repartitioning of the output, aggregations …) at a specific controlled schedule, on top of the standard logic. Resilient to failures. In essence: we queue an order to “process differently” a dataset.
Just to clarify for the pipeline builder answer (understanding this is a unique case) - as the inputs are already snapshots and the output is set to append only new rows, the snapshot trigger adding a snapshot input won’t really change anything, right? I’m using that write mode to pick the output up as incremental downstream, but i’m not sure if I can reconfigure the pipeline to get the desired results or if in this case it’s best to just periodically redeploy as a full snapshot output and then switch back or to convert the pipeline to code. Thanks for the detailed answer!
Hey @jaufderheide! Instead of setting the output to append only new rows, you could also make the entire pipeline incremental (see image below or docs here: https://www.palantir.com/docs/foundry/building-pipelines/create-incremental-pipeline-pb/
)
This way if any of the input datasets snapshot, the pipeline will automatically snapshot instead of running incrementally