I have a pipeline, which is incremental and keep appending data. I need from time to time to retention away some data that is no longer relevant.
How can I “snapshot” my pipeline from time to time automatically ?
1 Like
High level: You can use a “control dataset” as a way to “queue” a particular behavior to execute (like an alternative processing, snapshotting, …) and use incremental transforms to keep state, as they keep track of which row was consumed (and so which “behavior order” got already “consumed”).
Example of typical code that would allow this is:
from transforms.api import transform, Input, Output, incremental, configure
from pyspark.sql import types as T
from pyspark.sql import functions as F
# Generation of empty dataframe, that just appends the current timestamp
def get_empty_df(ctx):
schema = T.StructType([T.StructField("key", T.StringType(), True)])
df = ctx.spark_session.createDataFrame([("dummy_key",)], schema)
df = df.withColumn('when', F.current_timestamp())
return df
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental()
@transform(
out=Output("/path/control_dataset")
)
def out_C(ctx, out):
df = get_empty_df(ctx)
# build all the time and append the timestamp
out.set_mode("modify")
# Custom type writing : JSON. Otherwise defaults to parquet which benefits from compression, partitioning, etc.
# See https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#transformoutput
out.write_dataframe(df, output_format="json")
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@transform(
out=Output("/path/dataset_A_to_process")
)
def out_A(ctx, out):
df = get_empty_df(ctx)
# build all the time and append the timestamp
out.set_mode("modify")
# Custom type writing : JSON. Otherwise defaults to parquet which benefits from compression, partitioning, etc.
# See https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#transformoutput
out.write_dataframe(df, output_format="json")
@incremental()
@transform(
output_dataset=Output("/path/output_dataset"),
input_dataset=Input("/path/dataset_A_to_process"),
control_dataset=Input("/path/control_dataset")
)
def conditional_logic(ctx, input_dataset, control_dataset, output_dataset):
control_df = control_dataset.dataframe() # Maybe one row (if it ran since last time) otherwise empty
# Note: Can as well check if the incremental inputs are incremental with
# if ctx.is_incremental:
# No need in this particular implementation
if control_df.count() > 0:
# Some other logic = e.g. in our case, we snapshot
input_df = input_dataset.dataframe("current") # All the data, including historical and new data if any
modified_df = input_df.withColumn("processing_ts", F.current_timestamp())
# Snapshot write on the output (replace the full output)
output_dataset.set_mode("replace")
output_dataset.write_dataframe(modified_df)
else:
# TODO : Custom logic here :) = e.g. in our case, the day to day processing where we forward input to output
input_df = input_dataset.dataframe() # Only the new data if any
modified_df = input_df.withColumn("processing_ts", F.current_timestamp())
# Incrementally write on the output
output_dataset.set_mode("modify")
output_dataset.write_dataframe(modified_df)
This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.