I want to robustify my code for a “Rolling History”. Currently I take the output-dataset in my repo, filter for the interval and then union the input.
@transform_df(
Output(HISTORY_DS, sever_permissions=True),
df=Input(INPUT_DS),
)
def my_compute_function(ctx, df):
inputWithDateAddCol = df.withColumn(DATE_ADD_COLUMN_NAME, current_timestamp())
historized_ds = ctx._foundry.input(HISTORY_DS, branch="master").dataframe()
historized_ds = historized_ds.filter(datediff(current_timestamp(), col(DATE_ADD_COLUMN_NAME)) < 7)
return inputWithDateAddCol.unionAll(historized_ds.select(inputWithDateAddCol.columns)).drop_duplicates()
But for the first time of execution (e.g. in a new branch) there is no dataset to grab. For sure I can comment out, run it once and then uncomment the access, but it would be far more elegant to have a piece of code that checks for the existence of the old and does the filter/union logic only in case of existence.
Any idea how to implement this?
A few things are missing in this code, and I’m not sure it exactly does it the way you describe it.
In order to read your output, you need an @incremental() decorator. You can’t read your output without it (I’m curious if you’ve seen your code being successful on any run ?)
In order to know if there is currently an output, you can check if the transform runs as a snapshot or as incremental via the ctx
context. It’s not a 1-1 matching with your need (it doesn’t really “check” if there is an output already written) but should be a good enough proxy (Did that ever ran already ? Is it incrementally running) and will allow you to snapshot and so reset your output with standard parameters of the decorator (semantic_version =… ).
@incremental()
@transform(
out=Output(HISTORY_DS, sever_permissions=True),
df=Input(INPUT_DS),
)
def my_compute_function(ctx, df, out):
inputWithDateAddCol = df.withColumn(DATE_ADD_COLUMN_NAME, current_timestamp())
if ctx.is_incremental:
historized_ds = out.dataframe()
historized_ds = historized_ds.filter(datediff(current_timestamp(), col(DATE_ADD_COLUMN_NAME)) < 7)
to_write = inputWithDateAddCol.unionAll(historized_ds.select(inputWithDateAddCol.columns)).drop_duplicates()
else:
to_write = inputWithDateAddCol
out.write_dataset(to_write)
I think this is what I need. The build is running in a test environment. But wouln’t I need to tell the ctx to do an update? If it goes through the incremental path, it will append a file with the data?? Or not.
in other context where I want to force the update id did
mode = 'replace'
history.set_mode(mode)
history.write_dataframe(df)
Why don’t I need this here?
So finally, with the examples and the guidance I was able to create a version that does all the needed stuff including the goodie to have the cleanup only done at a given day so that the rest of the week the performance of increments will be there.
from transforms.api import transform, Input, Output, incremental
from pyspark.sql.functions import current_timestamp, datediff, col
from pyspark.sql import types as T
from datetime import datetime
DATE_ADD_COLUMN_NAME = "date_of_adding"
schema = T.StructType([
T.StructField('xxx', T.StringType()),
..... copied from the compass view
T.StructField('date_of_adding', T.TimestampType())
])
@incremental(semantic_version=1, snapshot_inputs=['df'])
@transform(
output=Output("ri.foundry.main.dataset.xxxx", sever_permissions=True),
df=Input("ri.foundry.main.dataset.xxxxy"),
)
def my_compute_function(df, output):
inputWithDateAddCol = df.dataframe().withColumn(DATE_ADD_COLUMN_NAME, current_timestamp())
dt = datetime.now()
weekday = dt.isoweekday() # sunday is seven
# the cleanup shall run only on sunday
if weekday == 7: # sunday is 7
# get the previous dataset, this works as well, if no data is there (e.g. the first run)
previous = output.dataframe('previous', schema)
# now calculate the delta to now and exclude row that are too old
# for testing reasons a 30 minutes interval
"""
previous = previous.withColumn(
'minutes_diff',
(current_timestamp().cast('long') - col(DATE_ADD_COLUMN_NAME).cast('long')) / 60)
previous = previous.filter(col("minutes_diff") < 20)
previous = previous.drop("minutes_diff")
"""
# production code, filter out rows older than 14 days
previous = previous.filter(datediff(current_timestamp(), col(DATE_ADD_COLUMN_NAME)) < 14)
# essential to not have an append!
output.set_mode('replace')
output.write_dataframe(inputWithDateAddCol.unionAll(previous))
else:
output.set_mode('modify')
output.write_dataframe(inputWithDateAddCol)
1 Like