And i’am getting the following error message :
py4j.protocol.Py4JJavaError: An error occurred while calling o241.writeDataset.
: org.apache.spark.sql.AnalysisException: [NOT_A_PARTITIONED_TABLE] Operation MSCK REPAIR TABLE is not allowed for … because it is not a partitioned table.
Trying to read the existing dataframe with current and then newly add partitioning settings, as you do in your code sample, can cause this issue. You should rewrite your code as below:
Per the recently updated documentation, it is no longer necessary to pass a schema when reading from the output with the previous read mode, as long as the transform is running incrementally (and your code is already assuming that the transform is running incrementally). You may need to upgrade your repository to benefit from this recent improvement in behavior. Note that as a result of this improvement, there is no longer any valid use-case for reading data from the output with the current read mode before new data has been written.
As a side-note, you do not actually need to include salt in the partition_cols to achieve the behavior that you want (20 output dataset files per value of actual_recording_date). I also believe that you will get a more even distribution into twenty files with the following alternative expression for the salt:
F.abs(F.hash(F.rand())) % 20
So the following code would be my final recommendation:
Thank you so much it worked, and really appreciate your point on how to define the salt i have used it it’s better than what i had.
Do you think there is a better way to snapshot for partitionning with date ( even without using salting ? ) the problem that we have is that we have some dates where we received a lot of data ( 7GB for example ) and other dates where we receive only 200 MB or less, so defining the salt factor will work well for the “normal days”, but it will create a lot of small files for the days where we receive less data, so if you see a better way doing it i will love to know.
This should give you on average of 15 files per day, but more files for days with more data and less files for dates with less data. It will also give you the added benefit of additionally optimizing queries on the timestamp column because row groups (if your files are <= 128 MB, row groups will be contiguous with files) will contain non-overlapping timestamp ranges, and Spark will be able to look at the row group metadata (which includes min, max values of each column in each group) to quickly prune irrelevant row groups without reading the actual data. The sortWithinPartitions operation is essentially “free” because Spark would do a sort on the partition_cols anyway as part of writing the data and will skip that sort if you’ve already sorted in this way.