Deleting Files in historic transactions without breaking the incremental

I have an incremental transform writing out daily partitioned parquet files based on an incremental ingestion. (>10TB)

Due to Business Requirements there might be corrections for past dates.

Therefore, I would like to - within my incremental transform - delete special parquet files and add others replacing the data.

To my understanding this common requirement is not supported and there is also no workaround possible that keeps the incremental behaviour intact.

Did anyone solve this challenge?

1 Like

I’ve got a response in the past from Palantir that it’s not possible, but actually there is a workaround that we are utilising in one of our pipelines where we need to re-compute some already processed historical data, and the easiest is to remove the corrupted files from the view.

Let’s say your dataset is ri.foundry.main.dataset.c4385d1d-67d0-46f5-984f-b391sdada3 and you want to remove a list of files or a full partition
so that the incrementallity is preserved, then you need to:

  1. Have a process that performs the delete
    delete_files(
        dataset_rid="ri.foundry.main.dataset.c4385d1d-67d0-46f5-984f-b391sdada3",
        branch="master",
        remove_files=[
            "spark/year=2024/month=4/day=29/hour=15/part-00000-b33bb585-e338-4c31-adbb-431bed04bbb5.c000.snappy.parquet"
        ]

where the delete_files does:

  • Remove files from a datasets view with a DELETE transaction:
  def remove_files(
        self,
        dataset: "Dataset",
        logical_paths: List[str]
):
        transaction = self.start_transaction(
            dataset=dataset,
            txn_type=TransactionType.DELETE
        )
        self._catalog_service.add_files_to_delete_transaction(
            auth_header=self.ctx.auth_token,
            dataset_rid=str(dataset.rid),
            transaction_rid=str(transaction.rid),
            request=LogicalFilesRequest(
                logical_paths=logical_paths
            )
        )
        self.commit_transaction(txn=transaction)
  • Forces a new build on your dataset
client._build_manager_service.submit_build(
auth_header=token,
submit_build_request=create_submit_build_request(_dataset.rid)
)

the force build is needed as the dataset where you delete files from needs to have an empty append transaction to preserve the incrementality

so in your transformer you would need to add logic that checks if the last transaction was delete and just do an empty append transaction if so (you end-up with two transactions! DELETE + APPEND), with next scheduled run you’ll end up in the else condition and the job will work as usual

    if is_last_transaction_delete(
        input_dataset=input_dataset, token=ctx.auth_header
    ):
        output.set_mode("append")
    else:
        # all my regular spark logic

hope this helps, and that you’ll be able to fill the missing calls and decrypt the provided snippets :wink:

ps: @palantir make this somehow easier for us :pray: a simple parameter allow_delete for @incremental would be the best

3 Likes

Thanks for providing this solution. It seems like the incremental of the “large” dataset is still working with your workaround, however incremental pipelines build on top of that fail :frowning:

So I agree that Palantir should offer an allow_delete parameter to make this workflow possible.

3 Likes

A flag called allow_retention exists in python transforms and IGNORE_INCREMENTAL_DELETES in java transform that if added to transform does not break incrementality if DELETE transaction is added to the dataset.

Does this work for all DELETES or only for deletes by the retention service?

It should work for any DELETE as long as you write metadata to the deleted transaction that it was deleted via the retention service.

{
  "deletion-metadata": {
    "reason": "EXPIRED"
  }
}

It’s easier if you want to delete the entire transaction (i.e. all files in it). If you use retention service to mark the transaction, it will perform all the steps you described above (creating txn/adding files/committing txn) and writing appropriate transaction metadata to allow allow_retention and IGNORE_INCREMENTAL_DELETES to work as expected.