Tutorial
Compute module allows to have essentially a service running in Foundry, which can expose Foundry Functions or perform computation (in an “always on” manner).
Scenario 1 - The base case will be:
- We have a datasource available on the public internet (for example, a website)
- We will develop the code in Foundry of our compute module
- We will push the updates to a stream, for further processing in Foundry (the processing being outside of the scope of the tutorial)
Scenario 2 - On-prem datasource
We will want to reach a source that is on-prem and not available on the broader internet
Scenario 3 - Local developement
We will want to develop our compute module locally from our laptop instead of in Foundry
Scenario 1 - Base case
At a high level, the idea is to:
- We will write some code that will “fetch some data, and push it to a stream in Foundry” in an infinite loop (in a Code Repository)
Note: the code could do something else for other use-cases (read from dataset, writes to dataset, trigger some API calls, etc.) and a version of this supports to expose API endpoints like Foundry Function, so that you can invoke your “service” endpoint from all the places in Foundry that can call a Function.
- We will package this code in a container image (from Code Repository)
- Run the container image like a service (via “Compute module”)
Note: Compute Module will act as a way to “deploy” container images. It will handle the replica(s), give us logging and visibility, upgrade to newer versions, etc.
Note bis: The code inside the container will be able to perform API calls as we will grant it permission to use the source (including the egresses policies associated)
If the website or service you are trying to pull data from is available on the public internet (it has a DNS or an IP that can be resolved publicly) then you can use a direct connection instead of an agent. In this particular case, as an example, I will use the “api.weather.gov” API.
Scenario 1 - Steps
-
Create the Source
a. Navigate to Data Connection application > New Source
b. Select the Protocol source of type “REST API”
c. Create the Direct Connection mode
d. Continue the configuration of the source
e. Configure the source domain you will access.
f. Toggle on the “Allow this source to be imported into compute modules”
g. Once toggled, you will need to set the API Name for the source, so that you can access it progammatically in your code, for example, to fetch the credentials. In my case the source doesn’t require credentials, but
h. Finish the configuration of the source
-
Create and configure the output stream
a. Right click in the folder > New > Streaming Sync
b. Setup the Streaming sync, by defining the schema of the JSON you will push from your compute module.
For example, if you want a simple timestamp + value
schema to push data, or if you want much more information (timestamp + value + machine_id + type + ...
any fields you want to push)
c. You can key the stream and configure it further if you wish, but strictly speaking, you just need to have a well defined schema and to hit the “create sync” button.
Note: In case you make a mistake in the schema, you will need to recreate the sync. This is an easy/quick operation but you will then need to change the configuration in your compute module as you will write to a new Streaming sync. So it’s not a big deal if you make a mistake here or want to “expand” the schema later. In a production setup, or you would migrate to the new schema fully (simply dropping the historical streaming sync) or if it has any value (e.g. you need it for training models, etc.) you can simply let it aside, or even union it downstream.
-
Create a Compute module in Foundry
a. Right click in the folder > New > Compute Module
Note: You have access to the docs of Compute module, from there, embedded in the app. Get a look at those as they include code snippets and a ton of useful information to start, extend and enrich your compute module !
-
Write the logic of your service that will run in compute-module
a. Create a new code repository
b. Select “Pipeline” > “Compute modules” > “Python compute module”
Note: Even if we select Functions, what we really want is a python compute module. We’ll delete the code for Functions and replace it by the code we used above, for pipeline. Just pick the below options to get started.
c. You can fully replace and edit the code of the “app.py” file and add a utils.py file.
The code can direct perform API calls (e.g. request.get(...)
) without additional setup
Note: Don’t forget to import the relevant library (pyyaml)
d. In app.py
import logging as log
import time
import os
import json
from requests.adapters import HTTPAdapter
import requests
from requests import Session
import utils as utils
# log.basicConfig(level=log.INFO)
log.basicConfig(level=log.DEBUG)
log.getLogger("requests").setLevel(log.DEBUG)
log.getLogger("urllib3").setLevel(log.DEBUG)
log.info("1. Initialization of the app")
# Harcoded values
FOUNDRY_URL = "MYFOUNDRY.com"
ENDPOINT = "https://api.weather.gov/"
# Get environement information
bearer_token = utils.get_build_token()
resource_alias_map = utils.get_resources()
# Get output stream information
output_info = resource_alias_map['foundry-stream-sync']
output_rid = output_info['rid']
output_branch = output_info['branch'] or "master"
# Some logging for tracing down information
log.info(f"bearer_token {bearer_token}")
log.info(f"resource_alias_map {resource_alias_map}")
# Functions
def put_record_to_stream(record):
try:
url = f"https://{FOUNDRY_URL}/stream-proxy/api/streams/{output_rid}/branches/{output_branch}/jsonRecord"
response = requests.post(url, json=record, headers={"Authorization": f"Bearer {bearer_token}"})
log.info(f"Record pushed to stream {response.text}")
except requests.exceptions.RequestException as e:
log.error(f"An error occurred when pushing value to stream: {e}")
return None
def get_value_from_source(session):
try:
response = session.get(ENDPOINT)
response.raise_for_status() # Raises an HTTPError if the response code was unsuccessful
log.info(f"get value from local {response.text}")
return response.text
except requests.exceptions.RequestException as e:
log.error(f"An error occurred when fetching value from local: {e}")
return None
# Main event loop
while True:
log.info("Loop iteration start")
# Step A. Generate the current timestamp in milliseconds
current_timestamp = int(time.time() * 1000)
# Step B. Generate the payload
# Fetch data from API
session = Session() # In case you want to implement retries, etc. you can configure the session accordingly
curr_result = get_value_from_source(session)
# PLACEHOLDER - FOR TEST PURPOSES ONLY
if curr_result is None:
log.info("no value fetched ! defaulting to NO_VALUE")
curr_result = "NO_VALUE"
curr_payload = {"timestamp": current_timestamp,"value": curr_result}
log.info(f"payload sent: {curr_payload}")
put_record_to_stream(curr_payload)
log.info("Sleeping")
time.sleep(5)
d. in utils.py
import yaml
import os
import json
def get_build_token():
# Get the token to access inputs and outputs
with open(os.environ['BUILD2_TOKEN']) as f:
bearer_token = f.read()
return bearer_token
def get_resources():
# Get the token to access the resources
with open(os.environ['RESOURCE_ALIAS_MAP']) as f:
resource_alias_map = json.load(f)
return resource_alias_map
def get_credentials():
with open(os.environ['SOURCE_CREDENTIALS'], 'r') as f:
credentials = json.load(f)
return credentials
def get_source_configuration():
with open(os.environ['SOURCE_CONFIGURATIONS_PATH'], 'r') as f:
source_configuration = json.load(f)
return source_configuration
def get_services():
with open(os.environ['FOUNDRY_SERVICE_DISCOVERY_V2'], 'r') as f:
# service_discovery = json.load(f)
service_discovery = yaml.safe_load(f)
return service_discovery
def get_ca_path():
with open(os.environ['EXTERNAL_CONNECTIONS_CA_PATH'], 'r') as f:
return f.read()
def extract_proxy_URL(services_discovery):
if "on_prem_proxy" in services_discovery:
return services_discovery["on_prem_proxy"][0]
else:
raise Exception(f"no 'on_prem_proxy' in the list of services_discovery: {services_discovery}")
def extract_proxy_token_for_source(source_configuration):
return source_configuration["proxyToken"]
e. Import the libraries required by the code, by importing them in the left sidebar, for example:
requests
PyYAML
f. Click the “Commit” button top right, and then the “tag” button.
g. Wait for the checks to pass for the commit and the tag. You can see the progressing in the “branches” tab at the top
-
Use the container created as a compute module
a. Open your Compute Module
b. Setup the compute module as a “pipeline module” (it will run the code forever, and not expose functions)
c. Select the output stream you created previously as an output resource of your compute module (so that the compute module will have the permission to write to this resource)
d. Select the container by selecting the code repository, the name of the image and the version, you created previously.
Note: the name of the image is by default “myimage” and the version is the version you “tagged” previously.
Note bis: No other configuration should be required, given all the environment variables we will use are automatically populated by the Compute Module at runtime. You can tweak the resources allocated to the compute module at the bottom (e.g. in my case, for test purposes, 0.1 vCPUs and 0.75G RAM)
e. Select the Source you created previously
f. You can tweak the number of replicas, and additional configuration.
For test purposes, toggle off the module persistence and set a timeout of 30m, so that if you forget to turn it off, the module won’t run a whole night before you notice
-
Run your code (forever) by running the docker container in Compute module
a. Open your Compute Module and go in the “Overview” tab
b. Click “run” or “start”
c. You should shortly see the logs being written, once the container started.
-
Verify it works as expected
- Check the logs to verify it works as expected.
- Navigate to the Stream Sync output and see if new rows are added
Well done ! You are now streaming the content of a website to a stream in Foundry