Many small files

I have a dataset with 144,000 small files, we get a new file every 5 minutes, is there a way to repartition a dataset in place? does anyone have a first class solution to this problem?
the best idea I’ve had is to read the number of files in the ‘previous’ output dataset using an incremental transform, if is above a certain level change the output type to ‘replace’ in order to repartition it.

My first attempt:

from transforms.api import transform, incremental, configure, Input, Output
import logging
log = logging.getLogger(__name__)


@incremental(
    semantic_version=1)
@transform(
    output=Output("output),
    source=Input("input"),
)
def compute(ctx, source, output):
    if not ctx.is_incremental:
        source_df = source.dataframe().repartition("part_col")
        output.write_dataframe(source_df)
        return

    source_df = source.dataframe().repartition(1, "part_col")
    schema = source_df.schema
    previous_output = output.dataframe('previous', schema)
    num_files = len(list(output.filesystem().ls()))
    log.info(f"num_files{num_files}")

    # trigger a repartition snapshot
    if num_files > 500:
        out_df = previous_output.repartition("part_col")
        out_df.union(source_df)
        output.set_mode('replace')
    else:
        out_df = source_df
        output.set_mode('modify')

    output.write_dataframe(out_df)

Creating a projection is the standard, first-class solution to this problem in Foundry.

1 Like