I have a transform that previouls worked fine that now is throwing the following error:
Error <IndexError: list index out of range> raised from /home/user/repo/transforms-python/src/myproject/datasets/hospital_beds.py:72
Traceback from Python:
File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/bin/preview_daemon.py", line 83, in main
success, data = process_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/bin/dispatcher.py", line 26, in process_request
execute_preview(request)
File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/handlers/preview_handler.py", line 58, in execute_preview
outputs, code_checks = execute_lightweight(
^^^^^^^^^^^^^^^^^^^^
File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms_preview/handlers/lightweight_handler.py", line 96, in execute_lightweight
transform.compute()
File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms/api/_lightweight/_transform.py", line 201, in compute
self._compute()
File "/home/user/repo/.maestro/lib/python3.12/site-packages/transforms/api/_lightweight/_transform.py", line 292, in _compute
self._user_code(**kwargs)
File "/home/user/repo/transforms-python/src/myproject/datasets/hospital_beds.py", line 72, in analyze_employees
df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/repo/.maestro/lib/python3.12/site-packages/sqlframe/base/operations.py", line 69, in wrapper
df = func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/repo/.maestro/lib/python3.12/site-packages/sqlframe/base/dataframe.py", line 1106, in join
join_expression = join_expression.join(join_expression.ctes[-1].alias, join_type=join_type)
~~~~~~~~~~~~~~~~~~~~^^^^
The code for the transform is below:
from transforms.api import LightweightOutput
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
from transforms.api import LightweightInput, transform, Output, Input
@transform.using(
staff=Input("ri.foundry.main.dataset.xxx"),
patients=Input("ri.foundry.main.dataset.xxx"),
staff_stats=Output(
"ri.foundry.main.dataset.xxx"
),
employee_stats=Output(
"ri.foundry.main.dataset.xxx"
),
)
def analyze_employees(
staff: LightweightInput,
patients: LightweightInput,
staff_stats: LightweightOutput,
employee_stats: LightweightOutput,
):
# Documenttion on the opensource dataset can be found here:
# https://www.kaggle.com/datasets/jaderz/hospital-beds-management?resource=download
# Documentation on SQLFrame can be found here:
# https://github.com/eakmanrq/sqlframe
# Documentation on Foundry Transforms API can be found here:
# https://www.palantir.com/docs/foundry/transforms-python/advanced-compute
# 1. Initialize the SQLFrame DuckDB session
session = DuckDBSession.builder.getOrCreate()
# 2) Read inputs as Arrow (works in lightweight), for development in the lesson
# staff_arrow = staff.arrow() # pyarrow.Table
# patients_arrow = patients.arrow() # pyarrow.Table
# 3) Create SQLFrame DFs (Arrow -> pandas -> SQLFrame)
df_staff = session.createDataFrame(staff.pandas())
df_patients = session.createDataFrame(patients.pandas())
# Ensure column types are correct if necessary (SQLFrame infers schema automatically)
df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
df_patients = df_patients.withColumn(
"age", F.col("age").cast("integer")
).withColumn("satisfaction", F.col("satisfaction").cast("integer"))
# 4. Perform an AGGREGATION operation and select specific columns
df_agg_patients = (
df_patients.groupBy("service")
.agg(
F.count("*").alias("total_patients"),
F.avg("age").alias("average_age"),
)
.orderBy(F.col("average_age").desc())
)
# 5. Perform a WINDOW FUNCTION operation
# window_spec = Window.partitionBy("service").orderBy(F.col("satisfaction").desc())
df_ranked = (
df_patients
.groupBy("service")
.agg(
F.count("*").alias("service_count"),
F.avg("satisfaction").alias("average_satisfaction"), # 0β100
)
)
# Example Join to test more of the Spark API
df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")
# 6. Write results back to Foundry: Collect results into a Pandas DataFrame and write it
staff_stats.write_table(df_agg_with_rank.toPandas())
# Write ranked employee results: Collect results into an Arrow Table and write it
# arrow_table_employee_stats = df_ranked.to_arrow()
# employee_stats.write_table(arrow_table_employee_stats)
As part of troubleshooting I rewrote the transform locally and removed all Foundry deps to make sure this is not a bug in SQLFrame:
import os
import pandas as pd
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
# from sqlframe.duckdb import Window
def analyze_employees():
"""
Standalone version of the hospital beds transform.
- Reads `staff.csv` and `patients.csv` from the same directory as this file.
- Uses SQLFrame (DuckDB backend) to perform groupBy/agg and join operations.
- Writes:
- staff_stats.csv (joined aggregate)
- employee_stats.csv (ranked aggregate)
"""
# ----------------------------------------------------------------------------
# 0. Resolve file paths for local CSVs
# ----------------------------------------------------------------------------
base_dir = os.path.dirname(os.path.abspath(__file__))
staff_path = os.path.join(base_dir, "staff.csv")
patients_path = os.path.join(base_dir, "patients.csv")
# ----------------------------------------------------------------------------
# 1. Initialize the SQLFrame DuckDB session
# ----------------------------------------------------------------------------
# Equivalent to what you had in Foundry
session = DuckDBSession.builder.getOrCreate()
# ----------------------------------------------------------------------------
# 2. Read CSV inputs with pandas
# ----------------------------------------------------------------------------
staff_pdf = pd.read_csv(staff_path)
patients_pdf = pd.read_csv(patients_path)
# ----------------------------------------------------------------------------
# 3. Create SQLFrame DataFrames from pandas DataFrames
# ----------------------------------------------------------------------------
df_staff = session.createDataFrame(staff_pdf)
df_patients = session.createDataFrame(patients_pdf)
# Ensure column types are correct if necessary (SQLFrame infers schema automatically)
df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
df_patients = (
df_patients
.withColumn("age", F.col("age").cast("integer"))
.withColumn("satisfaction", F.col("satisfaction").cast("integer"))
)
# ----------------------------------------------------------------------------
# 4. Perform an AGGREGATION operation and select specific columns
# ----------------------------------------------------------------------------
df_agg_patients = (
df_patients
.groupBy("service")
.agg(
F.count("*").alias("total_patients"),
F.avg("age").alias("average_age"),
)
.orderBy(F.col("average_age").desc())
)
# ----------------------------------------------------------------------------
# 5. Perform a second aggregation (rank-like stats)
# ----------------------------------------------------------------------------
# window_spec = Window.partitionBy("service").orderBy(F.col("satisfaction").desc())
df_ranked = (
df_patients
.groupBy("service")
.agg(
F.count("*").alias("service_count"),
F.avg("satisfaction").alias("average_satisfaction"), # 0β100
)
.orderBy(F.col("average_satisfaction").desc())
)
# ----------------------------------------------------------------------------
# 6. Join the two aggregates on "service"
# ----------------------------------------------------------------------------
df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")
# Optional: debug prints (uncomment if you want to see them in the console)
# df_agg_patients.show()
# df_ranked.show()
# df_agg_with_rank.show()
# ----------------------------------------------------------------------------
# 7. Write results out to CSV locally
# ----------------------------------------------------------------------------
staff_stats_path = os.path.join(base_dir, "staff_pateints_stats.csv")
# employee_stats_path = os.path.join(base_dir, "employee_stats.csv")
df_agg_with_rank.toPandas().to_csv(staff_stats_path, index=False)
# df_ranked.toPandas().to_csv(employee_stats_path, index=False)
print(f"Wrote staff_stats to: {staff_stats_path}")
# print(f"Wrote employee_stats to: {employee_stats_path}")
if __name__ == "__main__":
analyze_employees()
The transform works fine locally. The duckdb versions are the same in Foundry as they are locally:
DuckDB 1.4.2.
SQLFrame 3.43.8.
This feels like there is some environment issue in Foundry that recently emerged. Any ideas on what could be causing this? If itβs operator error/skill issue please LMK how to resolve.