I have an incremental transform in which I want to read the output dataset at the start in order to get a set of IDs such that I create duplicate rows of data or do unnecessary work.
Below is my current attempt to extract all unique IDs from the existing output dataset:
@incremental()
@transform(
output_dataset=Output("<rid>"),
input_dataset=Input("<rid>"),
)
def extract_matches(ctx, input_dataset, output_dataset):
schema = StructType([StructField("unique_id", StringType(), True)])
unique_ids = set(processed_matches_output.dataframe(mode="previous", schema=schema).select("unique_id").distinct().rdd.map(lambda row: row[0]).collect())
However, the result of this is an empty set despite there being data in the output dataset.
After reading the docs more carefully here, it seems as though dataframe() only gets the previous output of the transform and not the entire dataset (note: I could very well not be understanding the documentation correctly).
In any case, how can I read the entirety of an existing dataset in an incremental transform and then subsequently write to it?
Earlier I tried adding the output dataset as an input dataset as well, but that creates a cyclic dependency which is not allowed.