I have a dataset with 144,000 small files, we get a new file every 5 minutes, is there a way to repartition a dataset in place? does anyone have a first class solution to this problem?
the best idea I’ve had is to read the number of files in the ‘previous’ output dataset using an incremental transform, if is above a certain level change the output type to ‘replace’ in order to repartition it.
My first attempt:
from transforms.api import transform, incremental, configure, Input, Output
import logging
log = logging.getLogger(__name__)
@incremental(
semantic_version=1)
@transform(
output=Output("output),
source=Input("input"),
)
def compute(ctx, source, output):
if not ctx.is_incremental:
source_df = source.dataframe().repartition("part_col")
output.write_dataframe(source_df)
return
source_df = source.dataframe().repartition(1, "part_col")
schema = source_df.schema
previous_output = output.dataframe('previous', schema)
num_files = len(list(output.filesystem().ls()))
log.info(f"num_files{num_files}")
# trigger a repartition snapshot
if num_files > 500:
out_df = previous_output.repartition("part_col")
out_df.union(source_df)
output.set_mode('replace')
else:
out_df = source_df
output.set_mode('modify')
output.write_dataframe(out_df)