Lightweight breaks when using Foundry Filesystem

I often store zip files in Foundry datasets for processing. In PySpark this works fine. I’ve since attempted to port the code over to lightweight transforms as follows:

import csv
import shutil
import zipfile
import io
import tempfile
import logging

import pandas as pd
from transforms.api import Input, Output, transform, LightweightInput, LightweightOutput, LightweightContext

logging.basicConfig(level=logging.INFO)


def process_file(fs, file_status):
    """
    Iterate over all rows in all CSVs inside a single zip file.

    - Skips the first line of each CSV (as in your original code).
    - Uses the next line as the header row.
    - Yields dicts mapping header -> value.
    """
    with fs.open(file_status.path, "rb") as f:
        with tempfile.NamedTemporaryFile() as tmp:
            shutil.copyfileobj(f, tmp)
            tmp.flush()
            with zipfile.ZipFile(tmp) as archive:
                for filename in archive.namelist():
                    with archive.open(filename) as f2:
                        br = io.BufferedReader(f2)

                        # Try main encoding, then fallback, same pattern as before.
                        for encoding, label in (("ISO-8859-1", ""), ("latin1", " (latin1)")):
                            try:
                                tw = io.TextIOWrapper(br, encoding=encoding)
                                # Skip the first line of each CSV (per your original comment)
                                tw.readline()
                                r = csv.reader(tw)

                                header = next(r, None)
                                if header is None:
                                    logging.warning(
                                        f"Skipping empty/1-line CSV{label} in archive {file_status.path}: {filename}"
                                    )
                                    break  # go to next file in the archive

                                for row in r:
                                    # Yield a plain Python dict instead of a Spark Row
                                    yield dict(zip(header, row))
                                break  # successfully processed with this encoding, break the encoding loop
                            except UnicodeDecodeError:
                                # Reset buffer for the next encoding attempt
                                br.seek(0)
                                continue


@transform.using(
    output_dataset=Output("<RID>"),
    input_dataset=Input("<RID>"),
)
def compute(ctx: LightweightContext, input_dataset: LightweightInput, output_dataset: LightweightOutput) -> None:
    fs = input_dataset.filesystem()

    paths_rdd = list(fs.ls("**/*.zip"))

    print(paths_rdd)

    # Collect rows from all zip files in the input dataset
    rows = []
    for file_status in fs.ls("**/*.zip"):
        logging.info(f"Processing zip file: {file_status.path}")
        for row in process_file(fs, file_status):
            rows.append(row)

    if not rows:
        logging.warning("No rows parsed from any zip files. Writing empty output dataset.")
        # Create an empty DataFrame with the expected schema if you know the columns:
        columns = [
            "alliance",
            "alliance_revenue",
            "yoy_revenue",
            "engagement_margin",
            "yoy_engagement_margin",
            "tytf_win",
            "yoy_wins",
            "win_rate",
            "pipeline",
        ]
        df = pd.DataFrame(columns=columns)
    else:
        df = pd.DataFrame(rows)

    # Register the DataFrame as a DuckDB relation and write it out
    conn = ctx.duckdb().conn
    conn.register("alliances", df)
    rel = conn.sql("SELECT * FROM alliances")
    output_dataset.write_table(rel)

But I keep getting this error:

Error <ValueError: Unable to retrieve schema for input alias 'input_dataset' when configuring DuckDB connection. Please build the dataset or apply a schema..> raised from /home/user/repo/transforms-python/src/myproject/datasets/examples.py:93

In cases where I am using datasets to store files there is no schema applied to the input dataset. I don’t know if I can apply one. Is it possible to use lightweight transforms when using datasets to store files? If not should I first process the files in spark?

This looks like a bug in the duckdb bindings, unrelated to the filesystem operations.

However, why don’t you just pass the df to write_table directly? Your usage of duckdb seems not required here.