SQLFrame blowing up in tranform

I have a transform that previouls worked fine that now is throwing the following error:

Error <IndexError: list index out of range> raised from /home/user/repo/transforms-python/src/myproject/datasets/hospital_beds.py:72

Traceback from Python:
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/bin/preview_daemon.py", line 83, in main
    success, data = process_request(request)
                    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/bin/dispatcher.py", line 26, in process_request
    execute_preview(request)
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/handlers/preview_handler.py", line 58, in execute_preview
    outputs, code_checks = execute_lightweight(
                           ^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/handlers/lightweight_handler.py", line 96, in execute_lightweight
    transform.compute()
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms/api/_lightweight/_transform.py", line 201, in compute
    self._compute()
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms/api/_lightweight/_transform.py", line 292, in _compute
    self._user_code(**kwargs)
  File "/home/user/repo/transforms-python/src/myproject/datasets/hospital_beds.py", line 72, in analyze_employees
    df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/sqlframe/base/operations.py", line 69, in wrapper
    df = func(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/repo/.maestro/lib/python3.12/site-packages/sqlframe/base/dataframe.py", line 1106, in join
    join_expression = join_expression.join(join_expression.ctes[-1].alias, join_type=join_type)
                                           ~~~~~~~~~~~~~~~~~~~~^^^^

The code for the transform is below:


from transforms.api import LightweightOutput
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
from transforms.api import LightweightInput, transform, Output, Input


@transform.using(
    staff=Input("ri.foundry.main.dataset.xxx"),
    patients=Input("ri.foundry.main.dataset.xxx"),
    staff_stats=Output(
        "ri.foundry.main.dataset.xxx"
    ),
    employee_stats=Output(
        "ri.foundry.main.dataset.xxx"
    ),
)
def analyze_employees(
    staff: LightweightInput,
    patients: LightweightInput,
    staff_stats: LightweightOutput,
    employee_stats: LightweightOutput,
):
    # Documenttion on the opensource dataset can be found here:
    # https://www.kaggle.com/datasets/jaderz/hospital-beds-management?resource=download
    # Documentation on SQLFrame can be found here:
    # https://github.com/eakmanrq/sqlframe
    # Documentation on Foundry Transforms API can be found here:
    # https://www.palantir.com/docs/foundry/transforms-python/advanced-compute

    # 1. Initialize the SQLFrame DuckDB session
    session = DuckDBSession.builder.getOrCreate()

    # 2) Read inputs as Arrow (works in lightweight), for development in the lesson
    # staff_arrow = staff.arrow()         # pyarrow.Table
    # patients_arrow = patients.arrow()   # pyarrow.Table

    # 3) Create SQLFrame DFs (Arrow -> pandas -> SQLFrame)
    df_staff = session.createDataFrame(staff.pandas())
    df_patients = session.createDataFrame(patients.pandas())

    # Ensure column types are correct if necessary (SQLFrame infers schema automatically)
    df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
    df_patients = df_patients.withColumn(
        "age", F.col("age").cast("integer")
    ).withColumn("satisfaction", F.col("satisfaction").cast("integer"))

    # 4. Perform an AGGREGATION operation and select specific columns
    df_agg_patients = (
        df_patients.groupBy("service")
        .agg(
            F.count("*").alias("total_patients"),
            F.avg("age").alias("average_age"),
        )
        .orderBy(F.col("average_age").desc())
    )

    # 5. Perform a WINDOW FUNCTION operation
    # window_spec = Window.partitionBy("service").orderBy(F.col("satisfaction").desc())
    df_ranked = (
        df_patients
        .groupBy("service")
        .agg(
            F.count("*").alias("service_count"),
            F.avg("satisfaction").alias("average_satisfaction"),  # 0–100
        )
    )

    # Example Join to test more of the Spark API
    df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")

    # 6. Write results back to Foundry: Collect results into a Pandas DataFrame and write it
    staff_stats.write_table(df_agg_with_rank.toPandas())

    # Write ranked employee results: Collect results into an Arrow Table and write it
    # arrow_table_employee_stats = df_ranked.to_arrow()
    # employee_stats.write_table(arrow_table_employee_stats)

As part of troubleshooting I rewrote the transform locally and removed all Foundry deps to make sure this is not a bug in SQLFrame:

import os
import pandas as pd
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
# from sqlframe.duckdb import Window


def analyze_employees():
    """
    Standalone version of the hospital beds transform.

    - Reads `staff.csv` and `patients.csv` from the same directory as this file.
    - Uses SQLFrame (DuckDB backend) to perform groupBy/agg and join operations.
    - Writes:
        - staff_stats.csv (joined aggregate)
        - employee_stats.csv (ranked aggregate)
    """

    # ----------------------------------------------------------------------------
    # 0. Resolve file paths for local CSVs
    # ----------------------------------------------------------------------------
    base_dir = os.path.dirname(os.path.abspath(__file__))
    staff_path = os.path.join(base_dir, "staff.csv")
    patients_path = os.path.join(base_dir, "patients.csv")

    # ----------------------------------------------------------------------------
    # 1. Initialize the SQLFrame DuckDB session
    # ----------------------------------------------------------------------------
    # Equivalent to what you had in Foundry
    session = DuckDBSession.builder.getOrCreate()

    # ----------------------------------------------------------------------------
    # 2. Read CSV inputs with pandas
    # ----------------------------------------------------------------------------
    staff_pdf = pd.read_csv(staff_path)
    patients_pdf = pd.read_csv(patients_path)

    # ----------------------------------------------------------------------------
    # 3. Create SQLFrame DataFrames from pandas DataFrames
    # ----------------------------------------------------------------------------
    df_staff = session.createDataFrame(staff_pdf)
    df_patients = session.createDataFrame(patients_pdf)

    # Ensure column types are correct if necessary (SQLFrame infers schema automatically)
    df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
    df_patients = (
        df_patients
        .withColumn("age", F.col("age").cast("integer"))
        .withColumn("satisfaction", F.col("satisfaction").cast("integer"))
    )

    # ----------------------------------------------------------------------------
    # 4. Perform an AGGREGATION operation and select specific columns
    # ----------------------------------------------------------------------------
    df_agg_patients = (
        df_patients
        .groupBy("service")
        .agg(
            F.count("*").alias("total_patients"),
            F.avg("age").alias("average_age"),
        )
        .orderBy(F.col("average_age").desc())
    )

    # ----------------------------------------------------------------------------
    # 5. Perform a second aggregation (rank-like stats)
    # ----------------------------------------------------------------------------
    # window_spec = Window.partitionBy("service").orderBy(F.col("satisfaction").desc())
    df_ranked = (
        df_patients
        .groupBy("service")
        .agg(
            F.count("*").alias("service_count"),
            F.avg("satisfaction").alias("average_satisfaction"),  # 0–100
        )
        .orderBy(F.col("average_satisfaction").desc())
    )

    # ----------------------------------------------------------------------------
    # 6. Join the two aggregates on "service"
    # ----------------------------------------------------------------------------
    df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")

    # Optional: debug prints (uncomment if you want to see them in the console)
    # df_agg_patients.show()
    # df_ranked.show()
    # df_agg_with_rank.show()

    # ----------------------------------------------------------------------------
    # 7. Write results out to CSV locally
    # ----------------------------------------------------------------------------
    staff_stats_path = os.path.join(base_dir, "staff_pateints_stats.csv")
    # employee_stats_path = os.path.join(base_dir, "employee_stats.csv")

    df_agg_with_rank.toPandas().to_csv(staff_stats_path, index=False)
    # df_ranked.toPandas().to_csv(employee_stats_path, index=False)

    print(f"Wrote staff_stats to: {staff_stats_path}")
    # print(f"Wrote employee_stats to: {employee_stats_path}")


if __name__ == "__main__":
    analyze_employees()

The transform works fine locally. The duckdb versions are the same in Foundry as they are locally:
DuckDB 1.4.2.
SQLFrame 3.43.8.

This feels like there is some environment issue in Foundry that recently emerged. Any ideas on what could be causing this? If it’s operator error/skill issue please LMK how to resolve.

1 Like

Checkout the difference in the SQLGlot version and pin to the locally used version on Foundry. I think there might be a regression in SQLGlot that is causing this.

1 Like

Hey @CodeStrap,

Thanks for posting on Community.

If you require a response to this, I would suggest filing a Foundry Issue in your enrolment to help with code debugging. Also wanted to highlight that we don’t officially support SQLFrame.

Thanks!

1 Like

The difference in version between what? I haven’t changed anything.

yep I filed a ticket on my enrollment

My local SQLGLot is 27.29.0. In the Foundry repo in VS Code I see 28.0.0. So you may be right. However when I run conda install sqlglot=27.29.0 I get the error below:

ERROR \x1b[31m❌\x1b[0m HTTP status client error (403 Forbidden) for url (https://waypoint-envoy.rubix-system.svc.cluster.local:8443/compute/fc27e2/foundry/foundry-artifacts-api-mesh/artifacts/api/repositories/ri.containers-addons-bundle.main.artifacts.repository/contents/isolated/conda/linux-64/repodata.json)
  Hawk version 0.446.0
ERROR ❌  Hawk error: GenericError HTTP status client error (403 Forbidden) for url (https://waypoint-envoy.rubix-system.svc.cluster.local:8443/compute/fc27e2/foundry/foundry-artifacts-api-mesh/artifacts/api/repositories/ri.containers-addons-bundle.main.artifacts.repository/contents/isolated/conda/linux-64/repodata.json)

When I pin the dep in the condas file:

run:
    - python
    - transforms {{ PYTHON_TRANSFORMS_VERSION }}
    - transforms-expectations
    - transforms-preview
    - transforms-verbs
    - foundry-transforms-lib-python
    - sqlframe
    - duckdb
    - sqlglot ==27.29.0

I also get:

Solved conda environment in 1.65s
ERROR \x1b[31m❌\x1b[0m HTTP status client error (403 Forbidden) for url (https://waypoint-envoy.rubix-system.svc.cluster.local:8443/compute/fc27e2/foundry/foundry-artifacts-api-mesh/artifacts/api/repositories/ri.containers-addons-bundle.main.artifacts.repository/contents/isolated/conda/linux-64/repodata.json)
  Hawk version 0.446.0
ERROR ❌  Hawk error: GenericError HTTP status client error (403 Forbidden) for url (https://waypoint-envoy.rubix-system.svc.cluster.local:8443/compute/fc27e2/foundry/foundry-artifacts-api-mesh/artifacts/api/repositories/ri.containers-addons-bundle.main.artifacts.repository/contents/isolated/conda/linux-64/repodata.json)

  Maestro version 0.669.0

Do you know how to pin this dep in a Foundry Code repo?

Not sure these are related but in any case I fixed the upper version of SQLGlot in SQLFrame conda-forge which should fix this if you rebuild your environment.

There might be a propagation delay from conda-forge to your foundry environment.

1 Like

It’s fixed! TY! This would have been impossible to fix on my own.

There is indeed such a delay. I encountered one error where I updated some packages in Python functions. While the code assisstant in code repositories worked well, the function publish failed due to missing packages that were release one day prior.

So somehow the version resolution when writing the lock files seems to use a different conda repository than the function publish and they were out of sync. It happened to me with at least two distinct secondary dependencies. Maybe there is some other reason for this but in all cases waiting for half a day solved the issue.

It would be interesting to get some insights from the product team regarding this.

1 Like

yeah this broke again for me. I really need to be able top pin to a specific version