Compute Module - Pipeline Mode Trigger

When does a compute module in pipeline mode run?

When the inputs update? However long I let the code run?

I find it confusing.

A compute module is essentially a “containerized application,” which you may want to research further.

A single container will run indefinitely when scaled to 1, and will not run when the build is scaled to 0. You can also set the minimum number of replicas to greater than zero, to ensure that at least 1 instance of your containerized application will be running at all times, even during periods of inactivity. If you set the minimum to zero, your application can scale to zero (turn off) when no active requests exist. However, the application will immediately scale up from zero when a request is received, upon initial deployment, and whenever load is predicted. Thus, compute modules allow for predictive scale up and scale down, just depends on how you tweak the settings.

Hope this helps!

Pipeline mode compute modules are designed to maintain high data integrity and traceability, which is essential for Foundry’s data provenance control and security. Here are some key points to understand how and when they run:

  • Once initiated, pipeline modules will run continuously until they are manually stopped. In pipeline mode, scaling is not configurable; it defaults to a maximum of one replica and a minimum of one replica. Since scaling is based on query load and pipeline mode does not support queries, defaulting to 1,1 prevents the module from scaling down to zero, ensuring that your code runs indefinitely.

As for what a compute module in pipeline mode runs, that is up to you. Since a compute module running in pipeline mode is meant to run indefinitely, you are responsible for writing your own loop to process things - for example, pulling the latest records from a stream. There is no built-in mechanism to automatically detect when the input has changed.

Here is an example of what your code might look like:

while True:
    records = get_stream_latest_records()  # This will be your logic for getting the inputs and checking if something has changed
    processed_records = list(map(process_record, records['records']))
    [put_record_to_stream(record) for record in processed_records]
    time.sleep(60)

In this example, the module continuously fetches the latest records, processes them, and then outputs the processed records to a stream, pausing for 60 seconds between iterations.