How to trigger a schedule based on new rows in a CDC stream ingest (aka, how to get a batch CDC)?

I did setup a CDC from a data source, in Foundry. Hence, this is a stream ingest.
However, the updates are very irregular (a few times per day) but it would be important to process the update in a batch and relatively quickly.

Is it possible to have a batch CDC ingest ?

CDC is a stream. But in case the updates of the upstream system are infrequent and the goal is to consume the record in a batch pipelines, the latency is actually >10m, as the records ingested as a stream need to be written to the backing archive dataset (and this happens every 10m).

In order to have a quicker ingestion and processing, one idea is to use the CDC stream ingest as a signaling mechanism, where if a row is ingested, it will trigger a batch ingest of the upstream system and trigger the downstream pipeline.

One way to achieve this is via a compute module, which will regularly poll the stream for new records, and “when a new record is added to the stream”, it will trigger the schedule via Platform API.

Technically, this can be achieved without a compute module, but the main advantage is to limit the number of executions of the stream. In the Compute module, one can “sleep” for some time, before it will trigger the schedule again.

Alternatively, this can be done via a compute module, which exposes a function that “when a new record is added to the stream” will trigger the schedule via Platform API.

The token is obtained from exchanging the client_id/client_secret of a service user defined in Developer Console. Given the API being hit, the required permissions need to be granted to the service user.

At one extreme, one can unscope the developer console applications, essentially granting whichever permissions the service user is granted without additional layer of control.

Otherwise, one can reduce the scope of the platform API to what is required + grant permissions only on the required resources.

Namely:

  • “api:streams-read”
  • “api:orchestration-write”

Here is an example of code:

"""
Compute Module — Stream Poller & Schedule Trigger
Polls a Foundry streaming dataset for new records. When new records are
detected, triggers a Foundry schedule run, then waits before polling again.
Runs as a long-lived loop (Pipeline-style execution inside a Compute Module).
"""
# - This file is the main entry point into your Compute Module
# - All functions defined in this file are callable using the function name as the 'queryType'
# - Each function must take 2 args, with the 2nd param being the actual "payload" of your function
# - Each function must return something serializable by `json.dumps`
#
# See the README page of this repo for more details
from compute_modules.logging.common import ComputeModulesLoggerAdapter

import logging
import os
import time
import requests
from dataclasses import dataclass
from compute_modules.annotations import function
from compute_modules.logging import get_logger
from compute_modules.auth import oauth

# ---------------------------------------------------------------------------
# Logger
# ---------------------------------------------------------------------------
logger: ComputeModulesLoggerAdapter = get_logger(__name__)
# Sets the minimum log level to INFO
logger.setLevel(logging.INFO)
# Configuration
# ---------------------------------------------------------------------------
# [SETUP REQUIRED] Add Foundry URL to your container's env vars
FOUNDRY_URL = os.getenv("FOUNDRY_URL")
BASE_URL = f"https://{FOUNDRY_URL}"
logger.warning(f"Foundry URL: {FOUNDRY_URL}")
logger.warning(f"BASE_URL URL: {BASE_URL}")


# ---------------- Approach 1 -----------------------------
# OAuth token scoped for streams + orchestration
# [SETUP REQUIRED] Share the dataset you want to read/write from with your service user inside Foundry
# This is supposed to work, but for some reason I'm facing a permission error with this approach...
# TOKEN = oauth(
#     FOUNDRY_URL,
#     [
#         # no scope = max scope
#         # "api:datasets-read",
#         # "api:datasets-write",
#         # "api:streams-read",
#         # "api:orchestration-write",
#     ]
# )
# ---------------- End of Approach 1 -----------------------------

# ---------------- Approach 2 -----------------------------
# So I'm sharing this second approach, which should work in all cases but requires to pass the creds by env variables
from requests_oauthlib import OAuth2Session # import requests-oauthlib
from oauthlib.oauth2 import BackendApplicationClient
# # === Manual OAuth ===
OAUTH_TOKEN_URL = f"{BASE_URL}/multipass/api/oauth2/token"  # Change to your OAuth2 endpoint
CLIENT_ID = os.getenv("CLIENT_ID_CUSTOM") # Note: You can't use "CLIENT_ID" as it is reserved
CLIENT_SECRET = os.getenv("CLIENT_SECRET_CUSTOM") # Note: You can't use "CLIENT_SECRET" as it is reserved
oauth_data = {
    "grant_type": "client_credentials",
    "client_id": CLIENT_ID,
    "client_secret": CLIENT_SECRET,
    # "scope": "YOUR_SCOPE"  # Optional, or adjust as needed
}
client = BackendApplicationClient(client_id=CLIENT_ID)
oauth = OAuth2Session(client=client) #, scope=scope)
# oauthlib.oauth2.rfc6749.errors.MissingTokenError: (missing_token) Missing access token parameter.
token = oauth.fetch_token(token_url=OAUTH_TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, verify=False)
print(token)
TOKEN = token['access_token']
# ---------------- End of Approach 2 -----------------------------

# Stream settings
STREAM_DATASET_RID = os.getenv(
    "STREAM_DATASET_RID",
    "ri.foundry.main.dataset.afc11963-823b-4533-b345-5366b8fdb25d",
)
STREAM_BRANCH = os.getenv("STREAM_BRANCH", "master")
# Note: The partition refers to the nb of partition setup for the ingest stream. 0 if only 1 partition, up to N.
STREAM_PARTITION_ID: str = os.getenv("STREAM_PARTITION_ID", "0")
# Schedule to trigger
SCHEDULE_RID = os.getenv(
    "SCHEDULE_RID",
    "ri.scheduler.main.schedule.af92394a-ee15-4407-b160-9fd8623eaa09",
)
# How many records to fetch per poll & how long to sleep between polls
POLL_LIMIT = int(os.getenv("POLL_LIMIT", "100"))
POLL_INTERVAL_SECONDS = int(os.getenv("POLL_INTERVAL_SECONDS", "30"))
# ---------------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------------

def _auth_headers() -> dict:
    """Return the standard Authorization header."""
    return {"Authorization": f"Bearer {TOKEN}"}

def poll_stream_records(start_offset: int | None = None) -> dict:
    """
    Fetch up to POLL_LIMIT records from the configured stream partition.
    Returns the raw JSON response which contains a ``records`` list and
    metadata such as ``nextOffset``.
    """
    logger.info("Polling new records")
    url = f"{BASE_URL}/api/v2/highScale/streams/datasets/{STREAM_DATASET_RID}/streams/{STREAM_BRANCH}/getRecords"
    params: dict = {
        # "viewRid": "ri.foundry-streaming.main.view.6e39d7c8-738e-4815-9058-f006a49163d4",
        "partitionId": STREAM_PARTITION_ID,
        "limit": POLL_LIMIT,
        "preview": "true",
    }
    if start_offset is not None:
        params["startOffset"] = start_offset
    logger.warning(f"calling URL: {url}")
    response = requests.get(url, params=params, headers=_auth_headers())
    logging.warning(f"Polling raw response: {response.text} / {response.status_code}")
    response.raise_for_status()
    return response.json()

def trigger_schedule() -> dict:
    """
    Trigger a run of the configured Foundry schedule.
    Returns the JSON response from the orchestration API containing
    the ``rid`` of the new schedule run.
    """
    logger.info("Triggering schedule")
    url = f"{BASE_URL}/api/v2/orchestration/schedules/{SCHEDULE_RID}/run"
    logger.warning(f"calling URL: {url}")
    response = requests.post(url, headers=_auth_headers())
    logging.warning(f"Schedule raw response: {response.text}")
    response.raise_for_status()
    return response.json()

# ---------------------------------------------------------------------------
# Main polling loop
# ---------------------------------------------------------------------------
logger.info("Initialization of the app")
## Overall goal: check new records, and trigger one ingest,
# at most every 30 seconds
# Track our position in the stream so we only react to *new* records.
last_known_offset: int | None = None
while True:
    try:
        ## Step 1 – Poll new records on the input stream
        logging.info(f"WILL BE POLLING")
        result = poll_stream_records(start_offset=last_known_offset)
        logging.info(f"Polling response: {result}")
        # The API returns a flat list of record objects, each with
        # "offset" and "value" keys — not a wrapper dict.
        records = result if isinstance(result, list) else result.get("records", [])
        if records:
            # Compute next offset: max offset seen + 1
            next_offset = max(int(r["offset"]) for r in records) + 1
        else:
            next_offset = last_known_offset
        if records:
            logger.info(
                "Received %d new record(s) (offsets %s -> %s)",
                len(records),
                last_known_offset,
                next_offset,
            )
            logging.info(f"WILL BE SCHEDULE")
            ## Step 2 – Trigger a schedule run
            run_result = trigger_schedule()
            logger.info(
                "Schedule triggered - run RID: %s",
                run_result.get("rid", "unknown"),
            )
            # Advance the offset so the next poll starts after these records
            if next_offset is not None:
                last_known_offset = next_offset
        else:
            logger.info("No new records found - sleeping.")
    except requests.HTTPError as exc:
        logger.error("HTTP error during poll/trigger cycle: %s", exc)
    except Exception:
        logger.exception("Unexpected error during poll/trigger cycle")
    time.sleep(POLL_INTERVAL_SECONDS)
# This line is unreachable but left for clarity of intent.
logger.info("Shutdown of the app")

To set it up, you should:

  • Create a Developer console application with a service with, with the right (described above) permission
  • Create a Compute Module in Function mode (yes, even if in the case where we won’t expose a function)
  • Create the code repository backing the compute module
  • Copy paste the above code and import the required libraries
  • Commit and tag a new released version
  • Configure the compute module to use the released version + inject the parameters into the compute module configuration + inject the credentials of the service user (if you use the “Approach 2” in the code)