@transform(
tf_output=Output(output),
tf_input=Input(rid_dataset_with_files)
)
def compute(ctx, tf_output, tf_input):
tasks = [for _file.path in tfInput.filesystem().ls(show_hidden=False)]
(
ctx
.spark_session
.sparkContext
.parallelize(tasks)
# ----> you can alternatively use .partitionBy(lambda x -> hash(x) % NUMBER) to distribute your files across the cluster in case there's a skew in size
.persist()
.foreach(lambda rdd: copy_files(rdd, tf_output))
)
def copy_files(filename, transform_output):
with transform_input.filesystem().open(filename, 'rb') as tmp:
with transform_output.filesystem().open(filename, 'wb') as output_file:
shutil.copyfileobj(tmp, output_file)
output_file.flush()