I often store zip files in Foundry datasets for processing. In PySpark this works fine. I’ve since attempted to port the code over to lightweight transforms as follows:
import csv
import shutil
import zipfile
import io
import tempfile
import logging
import pandas as pd
from transforms.api import Input, Output, transform, LightweightInput, LightweightOutput, LightweightContext
logging.basicConfig(level=logging.INFO)
def process_file(fs, file_status):
"""
Iterate over all rows in all CSVs inside a single zip file.
- Skips the first line of each CSV (as in your original code).
- Uses the next line as the header row.
- Yields dicts mapping header -> value.
"""
with fs.open(file_status.path, "rb") as f:
with tempfile.NamedTemporaryFile() as tmp:
shutil.copyfileobj(f, tmp)
tmp.flush()
with zipfile.ZipFile(tmp) as archive:
for filename in archive.namelist():
with archive.open(filename) as f2:
br = io.BufferedReader(f2)
# Try main encoding, then fallback, same pattern as before.
for encoding, label in (("ISO-8859-1", ""), ("latin1", " (latin1)")):
try:
tw = io.TextIOWrapper(br, encoding=encoding)
# Skip the first line of each CSV (per your original comment)
tw.readline()
r = csv.reader(tw)
header = next(r, None)
if header is None:
logging.warning(
f"Skipping empty/1-line CSV{label} in archive {file_status.path}: {filename}"
)
break # go to next file in the archive
for row in r:
# Yield a plain Python dict instead of a Spark Row
yield dict(zip(header, row))
break # successfully processed with this encoding, break the encoding loop
except UnicodeDecodeError:
# Reset buffer for the next encoding attempt
br.seek(0)
continue
@transform.using(
output_dataset=Output("<RID>"),
input_dataset=Input("<RID>"),
)
def compute(ctx: LightweightContext, input_dataset: LightweightInput, output_dataset: LightweightOutput) -> None:
fs = input_dataset.filesystem()
paths_rdd = list(fs.ls("**/*.zip"))
print(paths_rdd)
# Collect rows from all zip files in the input dataset
rows = []
for file_status in fs.ls("**/*.zip"):
logging.info(f"Processing zip file: {file_status.path}")
for row in process_file(fs, file_status):
rows.append(row)
if not rows:
logging.warning("No rows parsed from any zip files. Writing empty output dataset.")
# Create an empty DataFrame with the expected schema if you know the columns:
columns = [
"alliance",
"alliance_revenue",
"yoy_revenue",
"engagement_margin",
"yoy_engagement_margin",
"tytf_win",
"yoy_wins",
"win_rate",
"pipeline",
]
df = pd.DataFrame(columns=columns)
else:
df = pd.DataFrame(rows)
# Register the DataFrame as a DuckDB relation and write it out
conn = ctx.duckdb().conn
conn.register("alliances", df)
rel = conn.sql("SELECT * FROM alliances")
output_dataset.write_table(rel)
But I keep getting this error:
Error <ValueError: Unable to retrieve schema for input alias 'input_dataset' when configuring DuckDB connection. Please build the dataset or apply a schema..> raised from /home/user/repo/transforms-python/src/myproject/datasets/examples.py:93
In cases where I am using datasets to store files there is no schema applied to the input dataset. I don’t know if I can apply one. Is it possible to use lightweight transforms when using datasets to store files? If not should I first process the files in spark?