PySpark Incremental Transform: write_dataframe schema Error

Hi Everyone,

I’m encountering a persistent schema mismatch error when writing a DataFrame in a PySpark transform using @incremental(). My goal is to append new rows to an already existing output dataset.

The transform fails with the following error:

com.palantir.logsafe.exceptions.SafeIllegalStateException: Attempting to write a schema with smaller amount of columns than schema of the existing dataset: {existingSchema=StructType(... actual schema, 14 columns ...), newSchema=StructType(... actual schema, 6 columns ...)}

However, when I print the schema right before the output_table.write_dataframe(df) call, it shows all 14 expected columns, but the newSchema reported in the error shows only 6 columns.

I suspect the error might be related to a pivot operation performed earlier in the transformation, somehow affecting the schema that PySpark internally tracks, even if printSchema() displays the correct one at the end.

Here’s a breakdown of my transformation steps and what I’ve tried:

Transformation Steps (Simplified):

  1. Join: I join several DataFrames together.
  2. Transformations: Apply various column-level transformations (e.g., withColumn, cast).
  3. Explode: An array column is exploded, potentially creating many rows from a single original row.
  4. Pivot: I perform a pivot operation on a categorical column, which expands distinct values of that column into new columns.
  5. Schema Check & Alignment: Before the final write, I have logic to explicitly align the DataFrame’s schema with my schema_variable. This includes adding missing columns with null values and ensuring data types match.

What I’ve Tried:

  • Confirmed df.printSchema(): df.printSchema() consistently shows all 14 columns with their correct data types right before the write statement.
  • Explicit Schema Definition and Alignment:
    • I explicitly define the target schema using T.StructType(all_fields), which correctly displays all 14 columns when printed.
    • I also implemented a schema alignment step, as shown in the code snippet below.
  • Broke Down Transformation into Multiple Spark Jobs: I also tried to break down the transformation by persisting the DataFrame after the Join and Transformations steps, writing it to a dataset, and then reading it back in for the subsequent Explode and Pivot operations. And the Error persist in the Expldoe and Pivot step.
  • Referenced Foundry Incremental Examples: I’ve tried to adapt patterns from the incremental-examples#merge-and-append-with-varying-schemas documentation.

Specific Concerns about the Pivot Step:

My primary suspicion is the pivot step. While df.printSchema() after the pivot and subsequent alignment looks correct, I’m wondering if the underlying Spark plan or some internal metadata related to the pivot operation is retaining a “slimmer” schema, which then gets reported to the Foundry backend.

Questions:

  1. Given that df.printSchema() reports the correct 14-column schema, what could cause the backend in Foundry to see a “slimmer” (6-column) schema during the @incremental() write? Is there a known issue with PySpark’s schema inference or internal representation after complex operations like pivot that might manifest specifically with Foundry’s write_dataframe?
  2. Are there any specific considerations or best practices when using pivot in PySpark transforms that are then written incrementally to Foundry datasets that could lead to this kind of schema mismatch?
  3. Any other known causes, debugging tips, or methods?

Thanks for any insights!