@StamDataEngineer So I’ve used this pattern before, with PySpark, to split very large datasets smaller ‘chunks’ that you can run as parallel jobs. It does rely on you being able to partition your datasets into independent chunks, meaning that each one doesn’t need to know about rows in any of the others, which is very dependent on your data.
Suppose you have a large input dataset that looks like this:
id |
data |
0 |
c8e1eb22 |
1 |
9311744c |
2 |
4bb9c1a0 |
3 |
78f8cf0b |
4 |
c8f0193b |
… |
etc… |
Spark Version
Then you create a function that generates your (PySpark in this case) transform, and another that generates a list of transforms, this is the file gen_spark_parellel_batches.py
:
from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import logging
def create_spark_transform(in_dataset, output, total_splits, split):
""" Create the transform that will be repeated total_splits times
"""
@transform(
out_df=Output(output),
source_df=Input(in_dataset),
)
def one_split_spark_transform(source_df, out_df):
""" Select only one split amounts of data, and then
do the work on them here.
"""
log = logging.getLogger(__name__)
df = (source_df
.dataframe()
.filter(F.col("id") % total_splits == split)
)
# Now have a smaller dataframe and do your operations here
# This is just a silly example regex extract.
df = df.withColumn("matches", F.regexp_extract(F.col("data"), r"8.", 0))
log.info(f"Row Count: {df.count()}")
out_df.write_dataframe(df)
return None
return one_split_spark_transform
def pipeline_spark_split_generator(in_dataset, number_of_splits):
""" generate the split transforms """
transforms = []
for i in range(number_of_splits):
output = f"/<path_to_output_folder>/spark_split_{i}"
transforms.append(create_spark_transform(in_dataset, output, number_of_splits, i))
return transform
(Ignore the inclusion of logging for now.)
Then you need to add this to your pipeline, with us having chosen 8 splits in this case, in pipeline.py
:
from transforms.api import Pipeline
from myproject import datasets
from myproject.datasets.gen_spark_parallel_batches import pipeline_spark_split_generator
# This is the default pipeline.py code that finds all transforms kept in separate files:
my_pipeline = Pipeline()
my_pipeline.discover_transforms(datasets)
# Below is in addition to the default for generating our new transforms:
my_pipeline.add_transforms(*pipeline_spark_split_generator(
"/<path_to_input_datset>/large_input",
8))
When the checks have completed you’ll see empty datatsets ‘spark_split_0’, ‘spark_split_1’ etc. appear. You can’t build them from the repository, but just click “Build” on the dataset, or add them all to a schedule and them build them.
Afterwards you can create a transform that unions them all together for a final output.
Lightweight Version
So I tried the above with a Polars lightweight transform. Identical pattern, except the function one_split_spark_transform()
gets an additional @lightweight
decorator, and the code inside it is Polars code (and runs when done as a stand-alone transform).
For reasons I don’t understand, this doesn’t work. It creates the empty datasets as expected after the checks run, and when I build one of those datsets, something runs, but it somehow doesn’t seem to pick up the transform, and I don’t get any useful errors or log messages. After the build I just have an empty dataset that’s 0KB in size, and says it doesn’t have a schema.
In the above example I added logging
, and in the Spark version the row count is then printed into the logs as expected. In the lightweight version this also doesn’t appear, so it makes me think that somehow the lightweight transform isn’t really run.
I don’t know if anyone from the lightweight team @palantir might read this, and can explain what I’m doing wrong. For completeness here is the lightweight version of the same transform generator, all the other code is the same:
def create_transform(in_dataset, output, total_splits, split):
""" Create the transform that will be repeated total_splits times
"""
@lightweight
@transform(
out_df=Output(output),
source_df=Input(in_dataset),
)
def one_split_transform(source_df, out_df):
""" Select only one split amounts of data, and then
do the work on them here.
"""
log = logging.getLogger(__name__)
df = (source_df
.polars(lazy=True)
.filter(pl.col("id").mod(total_splits) == split)
)
# Now have a smaller dataframe and do your operations here
# This is just a silly example regex extract.
df = (df.with_columns(pl.col("data").str.extract(r"(8.)", 1).alias("matched")))
log.info(f"Row Count: {df.count().collect().item(0, 'id')}")
out_df.write_table(df.collect())
return None
return one_split_transform
Conclusion
So what I proposed doesn’t work directly for what you asked, but depending on your data, you might be able to use the Spark pattern above to split your data into smaller ‘chunks’, then process each one as a lightweight transform (which would probably still be faster), and then union them all together at the end, but you don’t have to convenience of generating all the split transforms automatically.