Hello,
I am writing a Python transform which fetches and generates a large number of rows from an API, large enough that collecting them all in a list in memory causes the build to crash. While I probably could get away with a large memory profile for the driver like DRIVER_MEMORY_EXTRA_LARGE
in this particular situation, I’m interested in solutions that scale better.
My current solution writes batches of N rows into individual CSV files, which are also gzipped for storage efficiency. Once the dataset is built, I can manually apply a schema on the output dataset and use it downstream in Contour, Pipeline Builder, and other applications.
However, when the dataset is rebuilt, the schema on the output dataset is deleted, and I need to add it back manually. How can I write, or avoid deleting, the schema on my output dataset from my Python transform?
Alternatively, are there other ways to write transforms like this in a memory-efficient and scalable manner?
My current code looks like this:
from transforms.api import Output, transform, configure
from transforms.external.systems import EgressPolicy, use_external_systems, Credential
import csv
import gzip
import io
import itertools
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@use_external_systems(
egress=EgressPolicy(
"ri.resource-policy-manager.global.network-egress-policy.12345678"
),
creds=Credential("ri.credential..credential.12345678"),
)
@transform(
output=Output("ri.foundry.main.dataset.12345678")
)
def compute(egress, creds, output):
# fetch rows and group into chunks of 1000000
row_generator = get_rows_from_api(creds.get("api_key"))
chunk_generator = itertools.batched(row_generator, 1000000)
# write each chunk into a gzipped csv file
for chunk_number, rows in enumerate(chunk_generator):
with (
output.filesystem().open(f"output-{chunk_number}.csv.gz", "wb") as output_file,
gzip.GzipFile(fileobj=output_file, mode="wb") as gzip_file,
io.TextIOWrapper(gzip_file, encoding="utf-8") as textio_file,
):
csv.writer(textio_file).writerows(rows)
def get_rows_from_api(api_key):
# yield large number of rows