I’ve got a response in the past from Palantir that it’s not possible, but actually there is a workaround that we are utilising in one of our pipelines where we need to re-compute some already processed historical data, and the easiest is to remove the corrupted files from the view.
Let’s say your dataset is ri.foundry.main.dataset.c4385d1d-67d0-46f5-984f-b391sdada3
and you want to remove a list of files or a full partition
so that the incrementallity is preserved, then you need to:
- Have a process that performs the delete
delete_files(
dataset_rid="ri.foundry.main.dataset.c4385d1d-67d0-46f5-984f-b391sdada3",
branch="master",
remove_files=[
"spark/year=2024/month=4/day=29/hour=15/part-00000-b33bb585-e338-4c31-adbb-431bed04bbb5.c000.snappy.parquet"
]
where the delete_files does:
- Remove files from a datasets view with a DELETE transaction:
def remove_files(
self,
dataset: "Dataset",
logical_paths: List[str]
):
transaction = self.start_transaction(
dataset=dataset,
txn_type=TransactionType.DELETE
)
self._catalog_service.add_files_to_delete_transaction(
auth_header=self.ctx.auth_token,
dataset_rid=str(dataset.rid),
transaction_rid=str(transaction.rid),
request=LogicalFilesRequest(
logical_paths=logical_paths
)
)
self.commit_transaction(txn=transaction)
- Forces a new build on your dataset
client._build_manager_service.submit_build(
auth_header=token,
submit_build_request=create_submit_build_request(_dataset.rid)
)
the force build is needed as the dataset where you delete files from needs to have an empty append transaction to preserve the incrementality
so in your transformer you would need to add logic that checks if the last transaction was delete and just do an empty append transaction if so (you end-up with two transactions! DELETE + APPEND), with next scheduled run you’ll end up in the else condition and the job will work as usual
if is_last_transaction_delete(
input_dataset=input_dataset, token=ctx.auth_header
):
output.set_mode("append")
else:
# all my regular spark logic
hope this helps, and that you’ll be able to fill the missing calls and decrypt the provided snippets
ps: @palantir make this somehow easier for us a simple parameter allow_delete for @incremental
would be the best