Multiple transforms in on external transforms

I have an external transformation that I need to do two things to. The first is append new rows, and the second is re-process any data in the previous row that have a specific status.

How would I combine these two in one transform?

@incremental()
@transform(
    input=Input('ri.foundry.main.dataset.617be4f8-164e-4ed5-8d49-03e59314ad76'),
    output=Output('ri.foundry.main.dataset.d8593d84-90aa-4a22-ac9d-59964768c1ad'),
)
def create_status(input, output, poke_source, ctx):

    input_df = input.dataframe()
    previous = output.dataframe('previous', schema)

    filtered_previous_df = previous[~previous["status"].isin(["REDACTED", "REDACTED2"])]

    # Get all new records
    new_records = input_df.join(
        previous,
        on="REDACTED",
        how="left_anti" 
    )

      combined_df = filtered_previous_df.join(new_records)


// Code functionality running

   result_df = ctx.spark_session.createDataFrame(results, schema)
    output.set_mode("modify")
    output.write_dataframe(result_df)

What am I missing? I am unable to test this in preview because of it being incremental but I am not getting the results I expect (no new data or replacing old data completely)

There are a few open questions to have a code that exactly behaves the way you want:

  • Is your input snapshoting ? Incremental ?
  • What do you want the exact behavior to be:
    • If a row is new on the input ? Should it be processed always or only if not already present on the output ?
    • If a row has failed and is stored on the output ? Should it get reprocessed ? Always ?
    • If a row has failed, is on the output and is now present again in the input ? Should you take the one from the input or the failed output or both ?
  • What should the output be ? Do you want to keep appending (which means that for one primary key, you will have multiple failures and potentially a success, piled up together) or to replace (hence you will have only the “latest” wether it is a success or not)

Making a lot of assumptions, your code could look like this:

@incremental()
@transform(
    input=Input('ri.foundry.main.dataset.xxx'),
    output=Output('ri.foundry.main.dataset.yyy'),
)
def create_status(ctx, input, output, poke_source, ctx):
    # Read the input 
    # Note: assuming this is an incremental ingest
    input_df = input.dataframe("added")

    if ctx.is_incremental:
        # Read the current output
        output_df = output.dataframe('current')

        # Filter out some rows of the output
        values_to_filter = ["REDACTED", "REDACTED2"]
        condition = F.col("status").isin(values_to_filter)
        output_not_in_specific_state = output_df.filter(~condition)
        output_in_specific_state = output_df.filter(condition)

       # Get all new records
       # This is a bit weird to me - Are you sure your original code means you want to anitjoin this way ? I'm missing information about your assumptions, but usually you might have "new stuff to process" and "old broken rows to reprocess" and you want to remove the "old rows" if updated version of them arised in the input
       output_not_in_specific_state_without_updated_rows = output_not_in_specific_state.join(input_df, how="left_anti", on=[REDACTED])

        # Process everything: the new rows + the old rows without updates that failed
        input_processed_rows = process_rows(input_df)
        output_processed_rows = process_rows(output_not_in_specific_state_without_updated_rows)

        # Union everything
        # Old ones 
        all_rows = output_in_specific_state.unionByName(input_processed_rows).unionByName(output_processed_rows))

    else:
        # Process input only
        input_processed_rows = process_rows(input_df)
        all_rows = input_processed_rows
        
    # Write on output
    output.set_mode("replace") # this will snapshot the output, and so "update" the existing rows
    output.write_dataframe(all_rows)