Output several .parquet files from Code Workspaces using PySpark

I work in Code Workspaces (Jupyter) using PySpark. The result is written into the file system, as the documentation suggests, like this:

spark_output = Dataset.get("spark_output")
spark_output.write_table(df_processed.toArrow())

I think, there is a problem with toArrow(), as it collects all the data into the driver. It outputs a single .parquet file even when my input dataset consists of many .parquet files, even when I output it without any transformation, right after reading it.

This is a bottleneck for bigger datasets which require to use bigger drivers just to be able to collect all the data once at the end. Maybe there is another method to output into a Foundry dataset directly from executors?

Hi @ZygD,

This is a bit hacky but will avoid collecting the data and hopefully do the trick!

  • Write the parquet files of your Spark dataframe to a local directory, such as df.write.parquet(“/foundry/outputs/df”)
  • Upload those raw files to your output dataset:
    • spark_output = Dataset.get(“spark_output”)
      spark_output.upload_directory(“/foundry/outputs/df”)
  • Create an empty Arrow df with the schema you need:
    • empty_df = spark.createDataFrame([], schema=df.schema).toArrow()
  • Commit that transaction with no data (the transaction may fail but the schema still gets applied):
    • spark_output.write_table(empty_df)

Gabe

1 Like

Thank you @gwalker .
I had to switch the order - first upload schema and only then the .parquet files. If we write the schema afterwards, we lose the data, because in this case it creates a snapshot transaction. The following code works well in Spark4. Lower versions would need some small workaround for df.toArrow() which is used here to upload the schema.

def write_ds_spark(out_str, df):
    spark = SparkSession.getActiveSession()
    out_dir = f"/foundry/outputs/{out_str}"
    df.write.mode("overwrite").parquet(out_dir)
    out = Dataset.get(out_str)
    df_empty = spark.createDataFrame([], df.schema).toArrow()
    out.write_table(df_empty)
    out.upload_directory(out_dir)
    return print('Done writing.')
1 Like