Fetching and writing large amounts of rows efficiently

Hello,

I am writing a Python transform which fetches and generates a large number of rows from an API, large enough that collecting them all in a list in memory causes the build to crash. While I probably could get away with a large memory profile for the driver like DRIVER_MEMORY_EXTRA_LARGE in this particular situation, I’m interested in solutions that scale better.

My current solution writes batches of N rows into individual CSV files, which are also gzipped for storage efficiency. Once the dataset is built, I can manually apply a schema on the output dataset and use it downstream in Contour, Pipeline Builder, and other applications.

However, when the dataset is rebuilt, the schema on the output dataset is deleted, and I need to add it back manually. How can I write, or avoid deleting, the schema on my output dataset from my Python transform?

Alternatively, are there other ways to write transforms like this in a memory-efficient and scalable manner?

My current code looks like this:

from transforms.api import Output, transform, configure
from transforms.external.systems import EgressPolicy, use_external_systems, Credential

import csv
import gzip
import io
import itertools


@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@use_external_systems(
    egress=EgressPolicy(
        "ri.resource-policy-manager.global.network-egress-policy.12345678"
    ),
    creds=Credential("ri.credential..credential.12345678"),
)
@transform(
    output=Output("ri.foundry.main.dataset.12345678")
)
def compute(egress, creds, output):
    # fetch rows and group into chunks of 1000000
    row_generator = get_rows_from_api(creds.get("api_key"))
    chunk_generator = itertools.batched(row_generator, 1000000)

    # write each chunk into a gzipped csv file
    for chunk_number, rows in enumerate(chunk_generator):
        with (
            output.filesystem().open(f"output-{chunk_number}.csv.gz", "wb") as output_file,
            gzip.GzipFile(fileobj=output_file, mode="wb") as gzip_file,
            io.TextIOWrapper(gzip_file, encoding="utf-8") as textio_file,
        ):
            csv.writer(textio_file).writerows(rows)


def get_rows_from_api(api_key):
    # yield large number of rows

My preferred method would be to use spark here. Does the API use pagination? Do you have a method to determine the total number of pages or rows?

If so you could do something like:

  • Create a spark DF with one row per item and a page number (use F.sequence and explode, don’t generate it on the driver)
  • df.groupBy(['page']).apply(my_pandas_udf)
  • Pandas udf calls the api to get the rows for that page and return

You can do this with a standard (non-pandas) udf, but this won’t allow you to easily do batching (you may find out that you can do more than one page per executor).

If you want to use your current methodology, just build DFs instead of writing to CSVs and then use union_many from dataframes verbs to union them into a tabular output. Creating the DF will allow spark to persist it to disk so should effectively do the same as your current method of writing CSVs.

NB that your idea of using DRIVER_MEMORY_EXTRA_LARGE will not be efficient. You’re proposing to do everything single-node so the additional memory won’t really be used. You want to use the DRIVER_MEMORY_OVERHEAD... to give memory to the python process, not the spark driver process.

1 Like

Hey Ben! Thanks for the detailed response. Unfortunately this API does not have predictable and parallelisable paging – otherwise I agree with your Spark and Pandas UDF-based approach.

I was able to get this working with Spark DataFrames and union_many as you suggested, but with one additional trick: checkpointing each DataFrame to force it to be written out to disk. Without this, my build would still OOM. I also had to decrease the amount of rows per DataFrame. New version of the code:

from transforms.api import Output, transform_df, configure
from transforms.external.systems import EgressPolicy, use_external_systems, Credential
from transforms.verbs.dataframes import union_many

import itertools


@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@use_external_systems(
    egress=EgressPolicy(
        "ri.resource-policy-manager.global.network-egress-policy.12345678"
    ),
    creds=Credential("ri.credential..credential.12345678"),
)
@transform_df(
    output=Output("ri.foundry.main.dataset.12345678")
)
def compute(ctx, egress, creds):
    # fetch rows and group into chunks of 250000
    row_generator = get_rows_from_api(creds.get("api_key"))
    chunk_generator = itertools.batched(row_generator, 250000)
    
    dataframes = []

    for rows in chunk_generator:
        # create dataframe from rows and checkpoint to disk
        df = ctx.spark_session.createDataFrame(rows, my_schema).checkpoint()
        dataframes.append(df)

    return union_many(*dataframes)


def get_rows_from_api(api_key):
    # yield large number of rows
2 Likes

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.