Can i copy a file within a transforms FileSystem?

Can i use filesystem operations to duplicate data within my dataset, but change values in columns that are encoded into the file path?

For example, copy the data in file “example.csv” but change the column a from 1 to 0.

spark/a=1/part-00000-7d755da9-bc41-4bad-8faf-7813d3b19454.c000.csv

You can do this using incremental transforms and shutil:


@incremental()
@transform(
    output=Output("ri.foundry.main.dataset.f98d00a9-0720-4a58-9722-ea04f40cb11f"),
)
def write_dataset(output):

    def rename_file(file_status, var_name, var_val_before, var_val_after):
        old_path = file_status.path
        with output.filesystem().open(old_path, "rb") as in_parq:
            new_path = old_path.replace(
                'spark/' + var_name + '=' + var_val_before,
                'spark/' + var_name + '=' + var_val_after
            )
            with output.filesystem().open(new_path, "wb") as out_parq:
                log.info(f"renaming {old_path} to {new_path}")
                shutil.copyfileobj(in_parq, out_parq)

    files_df = output.filesystem().files()
    log.info(f"Number of files: {files_df.count()}")
    for file_status in files_df.collect():
        log.info(f"processing path: {file_status.path}")
        rename_file(file_status, 'a', '1', '0')

Or something like this:

@transform(
           tf_output=Output(output),
           tf_input=Input(rid_dataset_with_files)
)
        def compute(ctx, tf_output, tf_input):

            tasks = [for _file.path in tfInput.filesystem().ls(show_hidden=False)]

            (
                ctx
                .spark_session
                .sparkContext
                .parallelize(tasks)
                # ----> you can alternatively use  .partitionBy(lambda x -> hash(x) % NUMBER) to distribute your files across the cluster in case there's a skew in size
                .persist()
                .foreach(lambda rdd: copy_files(rdd, tf_output))
            )


def copy_files(filename, transform_output):

    with transform_input.filesystem().open(filename, 'rb') as tmp:
        with transform_output.filesystem().open(filename, 'wb') as output_file:
            shutil.copyfileobj(tmp, output_file)
            output_file.flush()


1 Like