I encountered an issue where existing batch load to incremental fails when there is a Slowly Changing Dimension (SCD) implementation. I found a workaround by saving the dataset as a new dataset, which works. In order words creating a new dataset with fresh load works. But if you have a existing batch loaded dataset, where if you want to read the previous output (for SCD type 2 implementation, it fails) . I would like to know if its a bug or is there any docs/way to resolve this. In my case did not have new data coming in that day, (didn’t refresh the input dataset) as was the goal. Below is the code snipped (details removed)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from transforms.api import transform, Input, Output, Check, incremental
from transforms import expectations as E
@incremental()
@transform(
output=Output("/new_dataset_path",
checks=[
Check(E.primary_key("surrogate_key"), "Primary Key", on_error="FAIL"),
Check(E.col("primary_id").non_null(), "primary_id must not be null"),
Check(E.col("primary_update_date").non_null(), "primary_update_date must not be null")
]),
source=Input("input dataset RID"),
)
def compute(source, output, ctx):
df = source.dataframe().select(
"cols you need"
)
if ctx.is_incremental:
previous_df = output.dataframe('previous')
if not previous_df.isEmpty():
# Filter out records that already exist in the previous output
new_df = df.join(previous_df, ["primary_id", "primary_update_date"], "left_anti")
if new_df.isEmpty():
# No new data, use previous data
df = previous_df
# enable it after first load
# if "surrogate_key" in previous_df.columns:
# return "No changes needed, surrogate key exists in previous data"
else:
# Combine previous and new data
df = previous_df.union(new_df)
# Create surrogate key and update is_latest flag for all records
window_spec = Window.partitionBy("primary_id").orderBy(F.desc("primary_update_date"))
df = df.withColumn("row_num", F.row_number().over(window_spec))
df = df.withColumn("surrogate_key", F.concat(
F.col("primary_id"),
F.lit("_"),
F.col("primary_update_date").cast("string"),
F.lit("_"),
F.col("row_num").cast("string"),
F.lit("_"),
F.monotonically_increasing_id().cast("string") # Add a unique identifier
))
df = df.withColumn("is_latest", F.row_number().over(window_spec) == 1)
# Drop rows where surrogate_key is null
df = df.filter(F.col("surrogate_key").isNotNull())
# Ensure final dataframe has the correct schema and is deduplicated
df = df.select(
).dropDuplicates(["surrogate_key"])
output.write_dataframe(df)```