Efficiently Preserving Partitioning After Transformation

I am writing a data pipeline that has as input a 2 TB dataset partitioned by a column my_col. I want to take this input, apply a regex to a column, and save the output still repartitioned by my_col, but with some of the files coalesced according to what spark thinks is optimal (since some of the files are just a couple hundred KB). Is there a more efficient way to do this than the following:

def compute(timeseries, out):
    df = timeseries.dataframe().withColumn(
        'channel_id',
        F.regexp_extract(F.col('timeseries_id'), r'^([^\|]*\|\|)', 1)
    )

    out.write_dataframe(df, partition_cols=['my_col']

The reason I am asking is because I am seeing a huge amount of disk spillage even though the data should already be partitioned correctly; and if I add df.repartition('my_col') the disk spillage goes to 0 whilst shuffle writes become very large.

When running a build like this I also see that the logical plan isn’t making use of the partitioning of the input
(I see: ..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ...)

I would try to use coalesce that should only group files together rather than shuffle the data.
Since you’ll want to aim for ~128MB files, for a 2TB dataset you could try df.coalesce(15000).

There is no straightforward way to achieve it natively as far as I am aware.

You could loop over the partition values, load the specific partition into a dataframe, union them together into a single dataframe and write it again as you did. This will result into one MAP operation without any shuffle.
This will ensure that you will get one input partition per partition output.
You could play with maxPartitonBytes parameters to adapt the number of output partitions you want.
Let’s say you have 4 files of 64 MB for value A for my_col and you want to keep the same in your output. By default Spark will likely create 2 partitions of 128MB

As for disk spillage, could happen because spark likely coalesces multiple input partitions into one of 128MB and then it has to perform a sort to partition the data on the disk. Decreasing this parameter would likely help