One possible approach is to create a dataset downstream for each of your snapshot inputs that is incremental and only contains new records (note: be careful if you have datasets that join together, new records need to be processed concurrently). You can then pipe those incremental datasets into your pipeline as inputs and process it incrementally. This community post runs through how you can convert a snapshot dataset into incremental: How can I process a dataset by chunk?