Existing Batch Load to Incremental Fails with SCD Implementation

I encountered an issue where existing batch load to incremental fails when there is a Slowly Changing Dimension (SCD) implementation. I found a workaround by saving the dataset as a new dataset, which works. In order words creating a new dataset with fresh load works. But if you have a existing batch loaded dataset, where if you want to read the previous output (for SCD type 2 implementation, it fails) . I would like to know if its a bug or is there any docs/way to resolve this. In my case did not have new data coming in that day, (didn’t refresh the input dataset) as was the goal. Below is the code snipped (details removed)

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from transforms.api import transform, Input, Output, Check, incremental
from transforms import expectations as E

@incremental()
@transform(
    output=Output("/new_dataset_path",
    checks=[
        Check(E.primary_key("surrogate_key"), "Primary Key", on_error="FAIL"),
        Check(E.col("primary_id").non_null(), "primary_id must not be null"),
        Check(E.col("primary_update_date").non_null(), "primary_update_date must not be null")
    ]),
    source=Input("input dataset RID"),
)
def compute(source, output, ctx):
    df = source.dataframe().select(
        "cols you need"
    )

    if ctx.is_incremental:
        previous_df = output.dataframe('previous')
        if not previous_df.isEmpty():
            # Filter out records that already exist in the previous output
            new_df = df.join(previous_df, ["primary_id", "primary_update_date"], "left_anti")
            
            if new_df.isEmpty():
                # No new data, use previous data
                df = previous_df
                # enable it after first load
                # if "surrogate_key" in previous_df.columns:
                    # return "No changes needed, surrogate key exists in previous data"
            else:
                # Combine previous and new data
                df = previous_df.union(new_df)

    # Create surrogate key and update is_latest flag for all records
    window_spec = Window.partitionBy("primary_id").orderBy(F.desc("primary_update_date"))
    
    df = df.withColumn("row_num", F.row_number().over(window_spec))
    df = df.withColumn("surrogate_key", F.concat(
        F.col("primary_id"), 
        F.lit("_"), 
        F.col("primary_update_date").cast("string"),
        F.lit("_"),
        F.col("row_num").cast("string"),
        F.lit("_"),
        F.monotonically_increasing_id().cast("string")  # Add a unique identifier
    ))
    
    df = df.withColumn("is_latest", F.row_number().over(window_spec) == 1)

    # Drop rows where surrogate_key is null
    df = df.filter(F.col("surrogate_key").isNotNull())

    # Ensure final dataframe has the correct schema and is deduplicated
    df = df.select(
        
    ).dropDuplicates(["surrogate_key"])

    output.write_dataframe(df)```

Hello

Please could you elaborate on how the transform fails? Is there an error message, or some unexpected behavior?

Also, I note that your code to read the previous output looks incorrect…

output.dataframe('previous')

Should be:

output.dataframe('previous', schema)

where the schema matches that of the output dataset (since under an empty output condition, the transform needs to know the schema). Schema is of format StructType

The primary key checks fails with the current. It was fine when i created a new dataset. Thanks the syntax correction, i will try if it works for me

From your code, it looks like you are trying to:

  • Read new rows incrementally if possible
  • Remove from the new rows any rows which match a composite primary key in the previous output
  • Union previous data with new data to get a full representation of new data that should be written out
  • make a “surrogate key” which appends a row number across a primary key window spec, and a monotonically increasing id to the existing key

I’m not sure I understand the value of adding the monotonically increasing ID, since adding the row number from the window function to the surrogate key should ensure uniqueness.

I think the main problem here is that when the build system runs in incremental mode, it will read incrementally and write incrementally. This is a sensible default for “read new data only, write new data only”. However, you’re not doing this - by retrieving the entire previous output and union-ing with new data, your output isn’t really incremental because each time it contains all of the output rows.

Put another way, for every build except the first, you’re writing an APPEND transaction. The output dataset will contain all of its previous data, plus the new output rows that you have processed. This is unlikely to be what you want, and is probably the cause of your primary key errors.

Take a look at incremental docs, and consider setting “output.set_mode(replace)” before you write the dataframe. Please test on a branch first to ensure this behaves as expected, noting that the first such build on a branch will not run incrementally. You’ll need to add more input data and run a second time to observe incremental behaviour.