Partition a big incremental dataset

I am having a problem with one table, we recently discovered it is not partitioned, and now we are in the middle of a development and it requires to be properly partitioned to make a project “future proof”

Input dataset (df_raw): +200Gb
Output dataset (“df_daily” the one we want to partition): +100Gb

It has been running incrementally for a very long time…and recomputing those 200Gb is not an option… so we decided to try the following approach:

1- Stop all schedules
2- Change code to just read the previous output and write back the output in mode “replace” partitioning by date using the following code (the original code is commented)

But now we are getting the error:
The build failed due to an org.apache.spark.sql.AnalysisException. The error message indicates that the operation MSCK REPAIR TABLE is not allowed because the table in question is not a partitioned table. This operation is typically used to recover partitions and data associated with partitions.

The error occurred in the file df_daily.py at line 32, where the function write_partitioned_dataframe_row_based is called. This function seems to be trying to write to a non-partitioned table as if it were partitioned.

To fix this issue, you can either convert the table to a partitioned table or modify your code to not treat the table as partitioned. If you’re unsure how to proceed, please consult with Palantir support.

It’s not allowing us to partition because there is “MSCK REPAIR TABLE” which seems that in the metadata of the table, it is flagged as “non-partitioned” and you can not make it partitioned even if you are completely replacing the data inside…

We also considered the idea of making the following:

current status:
<df_raw> → <df_daily (non partitioned)>

​Potential Solution:
1st step: “create a copy of df_daily partitioned with @incremental

<df_raw> → <df_daily (non partitioned)> → <df_daily_2 (partitioned)>

2nd step: “rewire to make the copy read from raw” WE DONT KNOW IF IT WOULD WORK
<df_raw> → <df_daily_2 (partitioned)>

The question at the end is: How I can partition by date an BIG incremental dataset avoiding recompute all the input dataset?

Error:

Hello ALAIZA,

Without seeing your code it is hard for me to guess what is going wrong. I think your intuition of pausing schedules and running one of logic to properly partition your data sounds correct.

Have you seen this code example: https://www.palantir.com/docs/foundry/transforms-python/create-historical-dataset/#increased-resource-consumption ?

In it you can see some code examples of re-writing data using the ‘replace’ mode.

You will have to read in all the historical data by using the read mode ‘previous’ .

previous_df = output.dataframe('previous', schema)

Once you have done that, and set the write mode to replace, you can re-write the files in a new partitioning style.

added more context to the post with screenshot of the code and error output

Hi @ALAIZA,

Could you try adding the localCheckpoint() call as in this example in the docs?

previous_output = output.dataframe(...).localCheckpoint()

Copying in the docs:

The output of the read of the dataframe is evaluated according to the write mode of the dataset during the write_dataframe call, such that previous write modes are ignored. Calling .localCheckpoint(eager=True) forces the data to be read and the output write mode to be evaluated at that point, and never recomputed.

1 Like

Tried but output is the same:

It seems that the solution is to run a command to refresh the Hive metastore, but this is hidden on Palantir side…

# Run MSCK REPAIR TABLE to update Hive metastore with new partitions
output_table_name = "<your_output_table_name>"  # Replace with the actual table name
spark.sql(f"MSCK REPAIR TABLE {output_table_name}")

Hi @ALAIZA -

Since the evaluation of read mode within a dataframe() call is lazy, you may have an issue with the invocation of output.dataframe("current").schema when creating previous_output.

Can we try eagerly evaluating the dataframe off of “current”?

This would look like:

previous_output = output.dataframe("current").localCheckpoint()

See the example in the docs: Merge and replace with schema change.