Hello,
We have a very big dataset that we need to repartition by a column, and this repartitionning takes a very long time, and this dataset is used as an input of our daily pipeline, se we would like to do the repartitionning and output it in another dataset ( just to not block our daily pipeline), and then when it’s finished to just copy the files from the repartitionned dataset to our real dataset, is there a way to do that and to do it relatively fast ? ( we will have around 50 000 files of 128mb ), or if you see any other better way to go from the repartitionned dataset to our dataset ?
Best Regards,
Soufiane
Hi @Soufiane
To copy files from one dataset to another you can use the function below:
import shutil
@transform(
tf_output=Output("rid"),
tf_input=Input("rid"),
)
def compute(ctx, tf_output, tf_input):
tasks = [i.path for i in tf_input.filesystem().ls(show_hidden=False)]
(
ctx
.spark_session
.sparkContext
.parallelize(tasks)
.foreach(lambda rdd: copy_files(rdd, tf_input, tf_output))
)
def copy_files(filename, tf_input, tf_output):
" Create your own logic here and regex in case you want to filter out something"
with tf_input.filesystem().open(filename, 'rb') as tmp:
with tf_output.filesystem().open(filename, 'wb') as output_file:
shutil.copyfileobj(tmp, output_file)
output_file.flush()
There’s an upper limit of 10million files. Copying files puts pressure on the backend services that retrieve the metadata and is normally rather slow.
2 Likes
I guess you want to repartition to enable faster filtering or joins on your dataset ?
In that case I would suggest setting up a projection instead.
Your downstream transform will use the projection if it is ready or the input dataset. It is also works with incremental
https://www.palantir.com/docs/foundry/optimizing-pipelines/projections-overview/
2 Likes
Hello,
thanks for the suggestion, actually the problem with projection is the evolution of schema, we have our data that can change schema and also we need to be able to snapshot it whenever we want, so that’s why projections are not really suitable for what we do, so we are trying to just well repartition our dataset, we manage to do that, but the problem is as i said we don’t do that directly on our dataset for the time that it takes, and it will be perfect if we find a way to go from the repartitionned dataset to our dataset ( that’s why i suggested copying files )
Hello,
Thank you so much for the help! I tested it on a small dataset its working perfectly fine, but on a large dataset it’s taking a lot of time and i don’t know why i have only 2 tasks running in parallel altough i have 32 executors with 8 cores each it’s weird, are you using on your side this code on something similar ?
Hi Soufiane - a possibility is for you to repartition the tasks:
.parallelize(tasks)
.repartition("NUMBER OR A KEY")
.foreach(lambda rdd: copy_files(rdd, tf_input, tf_output))
This will distribute better the workload.