Is it possible to share logic between batch and streaming pipelines?

If so, how? Can you point us to the appropriate documentation?
If not, should we expect to see this capability in the future? If so, what on what general timeline can we expect to see those features roll out?

Here’s a simplified example of a case that could motivate us to want to share logic between streaming and batch pipelines:

Suppose we have an input stream that serves as a streaming datasource for an Object Type A. Prior to the sync layer, we have some logic in a streaming pipeline builder that takes as input columns A,B,C…J in the input stream, and derives a new column X. For efficiency reasons, sometimes we need to replay this stream from a short amount of time ago. This is no problem as we get updates about each distinct key every day, and the object only needs to show the latest state for each object.

However, we also want to some historical analysis on this data specifically related to column X. In a perfect world, we would just feed the output of the streaming pipeline into a batch transform to format the data in the way we need to for our timeseries and other historical aggregation analyses, but but if we did this and we replayed the stream from a short amount of time ago, our historical analysis wouldn’t have much data from the past anymore. So because of this, our historical analysis transforms need to consume the raw data and derive column X in the same way the stream does.

Since now we have two desired deployment targets of that logic block, we want to point them to a shared reference for the logic definition so that if we change that logic, both deployments will receive an identical update.

Some potential solutions we’ve considered

  1. Marketplace / Deployment Suite
    1. From what we can tell it isn’t possible to package a pipeline that’s deployable as a batch and as a stream
    2. This may not be the best method for our use case because there are some cases where we think we require portions of the pipelines to differ between the streaming and batch cases
      1. There are some cases more complex than the example described above where we need to do preparation steps we need to do in batch that can’t be done in Pipeline Builder and so require us to use PySpark transforms
      2. There are some cases where we want to join data in that should likely differ between the streaming and batch case. These are cases where the most correct join conditions use conditions that aren’t available in streaming lookup joins and so we use a simpler and less reliable join condition that leverages the fact that the data is usually joined live
  2. Pipeline Builder Re-Usables
    1. From what we can tell, the only way to share re-usables across Pipeline Builder files is through User Defined Functions (UDFs), but it appears that when defining a UDF one must explicitly classify it as either a Batch or Streaming UDF, so we’re guessing a UDF can’t be used in both contexts.

Hi there! FE dev here, so if any BE folks see this feel free to correct me. Currently, I don’t believe we support sharing logic between batch and streaming pipelines. Batch and streaming pipelines rely on entirely different backends, and our supported transforms for batch vs streaming don’t always line up 1:1 (as you mentioned), meaning it may be difficult to share custom functions / UDFs / pipelines across BE types.

Hi @jsklan, if you’re doing this in Pipeline Builder and it’s a standard manipulation both Java UDFs and Python Functions can be used in batch and streaming interchangeably.

If this is an async or stateful Java UDF then it only applies in the streaming context so wouldn’t work in batch. I believe the batch/streaming differentiation comes from the type of deployment repo you choose if you want to use a Java UDF in transform pipelines instead of Pipeline Builder.