Batching Large Historical Data On The First Run Of Lightweight Transform

Hey everyone,
I have a pipeline that processes relatively small increments of data (only tens of thousands of rows per run) and doesn’t involve complex logic, so I’m using a lightweight transform with pandas.
However, for the initial historical data load (hundreds of millions of rows), the lightweight transform can’t handle it due to its 10-million-row limit.

What would be the best approach to process this historical data in batches? Is there a way to efficiently load and process it without switching my code to PySpark just for this one-time initial run?

Would appreciate any recommendations!

1 Like

First time I hear of a 10mio row limit. In fact we have lightweight builds running with 24 cores and 192GB of memory - that kind of resources should comfortably handle 10mio rows.

What’s your compressed (parquet) and in-memory dataset size?

1 Like

@nicornk the 10 Million row ‘limit’ is from the docs

  • Lightweight transforms: Generally recommended for transforms that operate on datasets of less than 10 million rows.
  • Spark transforms: Leverage distributed computing to enable better performance for transforms on datasets of more than 10 million rows.

@StamDataEngineer but it’s not a hard limit, and it’s very dependent on what your dataset consists of. While the dataset size on disk isn’t always a good guide to the in-memory size because of the snappy compression applied on the saved data, you can process datasets with more than 10 Million rows. I’d say just try it, perhaps with a more powerful profile as nicork suggests, and see what happens.

OK Thank you very much! @nicornk @green
I will try.
But do you know any solution to batch it in case we have to?

It depends on if your incoming data can be split into separate datasets in some way, e.g. you don’t need to perform any operations that rely on all rows knowing about all other rows.

If you can split them, you could write a function that generates transform functions, and collect them as a list and register it with the pipeline in your repository. Each first filtering down to some fraction of the total that a single transform can deal with. Then you can process the whole lot in bits, and union the result together at the end in another transform (which may have to be a spark backed one).

I’ve done this with PySpark transforms, and not with lightweight, but I don’t think there’s anything in lightweight stopping you from doing so.

I’m not sure I completely understand you.
Can you maybe send the code as an example? It will be very helpful

1 Like

Hi @StamDataEngineer, do you mind sharing the current status of your project? Could you try running a lightweight build and a non-lightweight build and then posting any errors the job throws? I have a similar challenge processing a large historical dataset once that will then need only small, incremental builds once initialized. Currently I’m trying to process the whole thing (see screenshot below), but I expect to run into errors and go (further) down a troubleshooting path.

@StamDataEngineer So I’ve used this pattern before, with PySpark, to split very large datasets smaller ‘chunks’ that you can run as parallel jobs. It does rely on you being able to partition your datasets into independent chunks, meaning that each one doesn’t need to know about rows in any of the others, which is very dependent on your data.

Suppose you have a large input dataset that looks like this:

id data
0 c8e1eb22
1 9311744c
2 4bb9c1a0
3 78f8cf0b
4 c8f0193b
etc…

Spark Version

Then you create a function that generates your (PySpark in this case) transform, and another that generates a list of transforms, this is the file gen_spark_parellel_batches.py:

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import logging


def create_spark_transform(in_dataset, output, total_splits, split):
    """ Create the transform that will be repeated total_splits times
    """
    @transform(
        out_df=Output(output),
        source_df=Input(in_dataset),
    )
    def one_split_spark_transform(source_df, out_df):
        """ Select only one split amounts of data, and then
            do the work on them here.
        """
        log = logging.getLogger(__name__)

        df = (source_df
              .dataframe()
              .filter(F.col("id") % total_splits == split)
              )
        # Now have a smaller dataframe and do your operations here
        # This is just a silly example regex extract.
        df = df.withColumn("matches", F.regexp_extract(F.col("data"), r"8.", 0))

        log.info(f"Row Count: {df.count()}")

        out_df.write_dataframe(df)

        return None

    return one_split_spark_transform


def pipeline_spark_split_generator(in_dataset, number_of_splits):
    """ generate the split transforms """

    transforms = []
    for i in range(number_of_splits):
        output = f"/<path_to_output_folder>/spark_split_{i}"
        transforms.append(create_spark_transform(in_dataset, output, number_of_splits, i))

    return transform

(Ignore the inclusion of logging for now.)

Then you need to add this to your pipeline, with us having chosen 8 splits in this case, in pipeline.py:

from transforms.api import Pipeline
from myproject import datasets
from myproject.datasets.gen_spark_parallel_batches import pipeline_spark_split_generator

# This is the default pipeline.py code that finds all transforms kept in separate files:
my_pipeline = Pipeline()
my_pipeline.discover_transforms(datasets)


# Below is in addition to the default for generating our new transforms:
my_pipeline.add_transforms(*pipeline_spark_split_generator(
    "/<path_to_input_datset>/large_input",
    8))

When the checks have completed you’ll see empty datatsets ‘spark_split_0’, ‘spark_split_1’ etc. appear. You can’t build them from the repository, but just click “Build” on the dataset, or add them all to a schedule and them build them.

Afterwards you can create a transform that unions them all together for a final output.

Lightweight Version

So I tried the above with a Polars lightweight transform. Identical pattern, except the function one_split_spark_transform() gets an additional @lightweight decorator, and the code inside it is Polars code (and runs when done as a stand-alone transform).

For reasons I don’t understand, this doesn’t work. It creates the empty datasets as expected after the checks run, and when I build one of those datsets, something runs, but it somehow doesn’t seem to pick up the transform, and I don’t get any useful errors or log messages. After the build I just have an empty dataset that’s 0KB in size, and says it doesn’t have a schema.

In the above example I added logging, and in the Spark version the row count is then printed into the logs as expected. In the lightweight version this also doesn’t appear, so it makes me think that somehow the lightweight transform isn’t really run.

I don’t know if anyone from the lightweight team @palantir might read this, and can explain what I’m doing wrong. For completeness here is the lightweight version of the same transform generator, all the other code is the same:

def create_transform(in_dataset, output, total_splits, split):
    """ Create the transform that will be repeated total_splits times
    """
    @lightweight
    @transform(
        out_df=Output(output),
        source_df=Input(in_dataset),
    )
    def one_split_transform(source_df, out_df):
        """ Select only one split amounts of data, and then
            do the work on them here.
        """
        log = logging.getLogger(__name__)

        df = (source_df
              .polars(lazy=True)
              .filter(pl.col("id").mod(total_splits) == split)
              )
        # Now have a smaller dataframe and do your operations here
        # This is just a silly example regex extract.
        df = (df.with_columns(pl.col("data").str.extract(r"(8.)", 1).alias("matched")))

        log.info(f"Row Count: {df.count().collect().item(0, 'id')}")

        out_df.write_table(df.collect())

        return None

    return one_split_transform

Conclusion

So what I proposed doesn’t work directly for what you asked, but depending on your data, you might be able to use the Spark pattern above to split your data into smaller ‘chunks’, then process each one as a lightweight transform (which would probably still be faster), and then union them all together at the end, but you don’t have to convenience of generating all the split transforms automatically.

1 Like