I’d like to have a cleaning build that only reads in the newest transaction’s worth of data, but I’m struggling to figure out how to do this. If I look at the Spark performance of my current job, it spends an awfully long amount of time reading in the files:
Where are you transforming this data? In pipeline builder, transforms?
While there is a bit more of a complicated setup, using code repositories / transforms will give you more control. The best way to grab new rows would be to use the “added” mode in the input (docs).
Pipeline builder is a bit easier to set up but does not allow you to read the previous output. (docs). Make sure you have “incremental” selected on the input:
One debugging step would be if you’re already in transform to log the files present in your transaction to ensure you’re only reading from the latest files.
Do you have a lot of files per transaction? In the ingest settings, compacting the input files as only one file per transaction might help (if you don’t care about the names of the files).
Is there multiple transactions on the input dataset since your last build? Added will include everything since the last transaction on the output, not the most recent transaction on input.
Nope, typically only 2 files per txn (a data ingestion log and the parquet file with the data)
Hmm, there may be multiple transactions, but the downstream roughly builds right after the upstream finishes (we build the entire clean schedule right after the raw schedule, so sometimes the raw runs a few times before the whole clean finishes). At most there’d be four transactions, which still doesn’t equate to ~30k that it’s taking in right now
Are you by any chance referring to the following UI element in the history tab for the transaction/build?
If so, you may be misinterpreting what this means - this is simply telling you the latest dataset view that the build used, not the data that was actually “read in.”
A lot of things go on in the gap between the start of the timeline and the first Spark task execution stages (query plan generation, etc.) - you can’t assume that it’s just reading in the files in that time (and in fact, the actual reading of files should happen in the subsequent Spark tasks, unless you’re doing something custom with the filesystem API).