SQLFrame blowing up in tranform

I have a transform that previouls worked fine that now is throwing the following error:

Error <IndexError: list index out of range> raised from /home/user/repo/transforms-python/src/myproject/datasets/hospital_beds.py:72

Traceback from Python:
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/bin/preview_daemon.py", line 83, in main
    success, data = process_request(request)
                    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/bin/dispatcher.py", line 26, in process_request
    execute_preview(request)
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/handlers/preview_handler.py", line 58, in execute_preview
    outputs, code_checks = execute_lightweight(
                           ^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/handlers/lightweight_handler.py", line 96, in execute_lightweight
    transform.compute()
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms/api/_lightweight/_transform.py", line 201, in compute
    self._compute()
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms/api/_lightweight/_transform.py", line 292, in _compute
    self._user_code(**kwargs)
  File "/home/user/repo/transforms-python/src/myproject/datasets/hospital_beds.py", line 72, in analyze_employees
    df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/sqlframe/base/operations.py", line 69, in wrapper
    df = func(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/sqlframe/base/dataframe.py", line 1106, in join
    join_expression = join_expression.join(join_expression.ctes[-1].alias, join_type=join_type)
                                           ~~~~~~~~~~~~~~~~~~~~^^^^

The code for the transform is below:


from transforms.api import LightweightOutput
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
from transforms.api import LightweightInput, transform, Output, Input


@transform.using(
    staff=Input("ri.foundry.main.dataset.xxx"),
    patients=Input("ri.foundry.main.dataset.xxx"),
    staff_stats=Output(
        "ri.foundry.main.dataset.xxx"
    ),
    employee_stats=Output(
        "ri.foundry.main.dataset.xxx"
    ),
)
def analyze_employees(
    staff: LightweightInput,
    patients: LightweightInput,
    staff_stats: LightweightOutput,
    employee_stats: LightweightOutput,
):
    # Documenttion on the opensource dataset can be found here:
    # https://www.kaggle.com/datasets/jaderz/hospital-beds-management?resource=download
    # Documentation on SQLFrame can be found here:
    # https://github.com/eakmanrq/sqlframe
    # Documentation on Foundry Transforms API can be found here:
    # https://www.palantir.com/docs/foundry/transforms-python/advanced-compute

    # 1. Initialize the SQLFrame DuckDB session
    session = DuckDBSession.builder.getOrCreate()

    # 2) Read inputs as Arrow (works in lightweight), for development in the lesson
    # staff_arrow = staff.arrow()         # pyarrow.Table
    # patients_arrow = patients.arrow()   # pyarrow.Table

    # 3) Create SQLFrame DFs (Arrow -> pandas -> SQLFrame)
    df_staff = session.createDataFrame(staff.pandas())
    df_patients = session.createDataFrame(patients.pandas())

    # Ensure column types are correct if necessary (SQLFrame infers schema automatically)
    df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
    df_patients = df_patients.withColumn(
        "age", F.col("age").cast("integer")
    ).withColumn("satisfaction", F.col("satisfaction").cast("integer"))

    # 4. Perform an AGGREGATION operation and select specific columns
    df_agg_patients = (
        df_patients.groupBy("service")
        .agg(
            F.count("*").alias("total_patients"),
            F.avg("age").alias("average_age"),
        )
        .orderBy(F.col("average_age").desc())
    )

    # 5. Perform a WINDOW FUNCTION operation
    # window_spec = Window.partitionBy("service").orderBy(F.col("satisfaction").desc())
    df_ranked = (
        df_patients
        .groupBy("service")
        .agg(
            F.count("*").alias("service_count"),
            F.avg("satisfaction").alias("average_satisfaction"),  # 0–100
        )
    )

    # Example Join to test more of the Spark API
    df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")

    # 6. Write results back to Foundry: Collect results into a Pandas DataFrame and write it
    staff_stats.write_table(df_agg_with_rank.toPandas())

    # Write ranked employee results: Collect results into an Arrow Table and write it
    # arrow_table_employee_stats = df_ranked.to_arrow()
    # employee_stats.write_table(arrow_table_employee_stats)

As part of troubleshooting I rewrote the transform locally and removed all Foundry deps to make sure this is not a bug in SQLFrame:

import os
import pandas as pd
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
# from sqlframe.duckdb import Window


def analyze_employees():
    """
    Standalone version of the hospital beds transform.

    - Reads `staff.csv` and `patients.csv` from the same directory as this file.
    - Uses SQLFrame (DuckDB backend) to perform groupBy/agg and join operations.
    - Writes:
        - staff_stats.csv (joined aggregate)
        - employee_stats.csv (ranked aggregate)
    """

    # ----------------------------------------------------------------------------
    # 0. Resolve file paths for local CSVs
    # ----------------------------------------------------------------------------
    base_dir = os.path.dirname(os.path.abspath(__file__))
    staff_path = os.path.join(base_dir, "staff.csv")
    patients_path = os.path.join(base_dir, "patients.csv")

    # ----------------------------------------------------------------------------
    # 1. Initialize the SQLFrame DuckDB session
    # ----------------------------------------------------------------------------
    # Equivalent to what you had in Foundry
    session = DuckDBSession.builder.getOrCreate()

    # ----------------------------------------------------------------------------
    # 2. Read CSV inputs with pandas
    # ----------------------------------------------------------------------------
    staff_pdf = pd.read_csv(staff_path)
    patients_pdf = pd.read_csv(patients_path)

    # ----------------------------------------------------------------------------
    # 3. Create SQLFrame DataFrames from pandas DataFrames
    # ----------------------------------------------------------------------------
    df_staff = session.createDataFrame(staff_pdf)
    df_patients = session.createDataFrame(patients_pdf)

    # Ensure column types are correct if necessary (SQLFrame infers schema automatically)
    df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
    df_patients = (
        df_patients
        .withColumn("age", F.col("age").cast("integer"))
        .withColumn("satisfaction", F.col("satisfaction").cast("integer"))
    )

    # ----------------------------------------------------------------------------
    # 4. Perform an AGGREGATION operation and select specific columns
    # ----------------------------------------------------------------------------
    df_agg_patients = (
        df_patients
        .groupBy("service")
        .agg(
            F.count("*").alias("total_patients"),
            F.avg("age").alias("average_age"),
        )
        .orderBy(F.col("average_age").desc())
    )

    # ----------------------------------------------------------------------------
    # 5. Perform a second aggregation (rank-like stats)
    # ----------------------------------------------------------------------------
    # window_spec = Window.partitionBy("service").orderBy(F.col("satisfaction").desc())
    df_ranked = (
        df_patients
        .groupBy("service")
        .agg(
            F.count("*").alias("service_count"),
            F.avg("satisfaction").alias("average_satisfaction"),  # 0–100
        )
        .orderBy(F.col("average_satisfaction").desc())
    )

    # ----------------------------------------------------------------------------
    # 6. Join the two aggregates on "service"
    # ----------------------------------------------------------------------------
    df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")

    # Optional: debug prints (uncomment if you want to see them in the console)
    # df_agg_patients.show()
    # df_ranked.show()
    # df_agg_with_rank.show()

    # ----------------------------------------------------------------------------
    # 7. Write results out to CSV locally
    # ----------------------------------------------------------------------------
    staff_stats_path = os.path.join(base_dir, "staff_pateints_stats.csv")
    # employee_stats_path = os.path.join(base_dir, "employee_stats.csv")

    df_agg_with_rank.toPandas().to_csv(staff_stats_path, index=False)
    # df_ranked.toPandas().to_csv(employee_stats_path, index=False)

    print(f"Wrote staff_stats to: {staff_stats_path}")
    # print(f"Wrote employee_stats to: {employee_stats_path}")


if __name__ == "__main__":
    analyze_employees()

The transform works fine locally. The duckdb versions are the same in Foundry as they are locally:
DuckDB 1.4.2.
SQLFrame 3.43.8.

This feels like there is some environment issue in Foundry that recently emerged. Any ideas on what could be causing this? If it’s operator error/skill issue please LMK how to resolve.

1 Like

Checkout the difference in the SQLGlot version and pin to the locally used version on Foundry. I think there might be a regression in SQLGlot that is causing this.