Compute Module Async Function

I’m currently building a compute module with our ontology and am having problems running it. While I can get the module to boot up successfully, there’s a problem running the main function, which is an asynchronous call. I need to make several async calls within my main function as the code would run far too slowly if I didn’t. The problem is I constantly receive this error:

raise RuntimeError('cannot schedule new futures after '\nRuntimeError: cannot schedule new futures after interpreter shutdown\n"

I believe this is because the event loop the container generates closes while my async function is executing, causing a race condition. I tried initializing the function as an async function, but this didn’t work. I tried the following as well to no success:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from .main_function import main_async_function
import atexit

GLOBAL_EXECUTOR = ThreadPoolExecutor()
GLOBAL_LOOP = asyncio.new_event_loop()
asyncio.set_event_loop(GLOBAL_LOOP)

atexit.register(GLOBAL_EXECUTOR.shutdown)

@function
def predict_resupply(context, event):
    coroutine = main_function_wrapper(context, event)
    return GLOBAL_LOOP.run_until_complete(coroutine)

async def main_function_wrapper(context, event):
    return await main_async_function(event["property"])

Is there a way people have handled async calls? Or is this simply something that’s not supported by compute modules? My alternative idea is to create a FastAPI endpoint that links to my function but I have no idea how I would hook that into a compute module. Thanks all!

The @function decorator will not work if you want to use threading, if you want to use threading you’ll need to use the alternative method for registering functions & starting your compute module:

add_functions( hello, add, ) start_compute_module()

The reason behind this is because the decorator @function uses atexit under the hood. And atexit is not compatible with multithreading https://docs.python.org/3/library/atexit.html#atexit.register

1 Like

@lauraaf THANK YOU SO MUCH, I was going over this problem for legit hours! I also had to wrap my async function in ThreadPoolExecutor.run_until_complete within my function in order to ensure it completed. The code looked something like this, for anyone who is interested:

from compute_modules import add_function, start_compute_module
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
import atexit
from .main import main_async_function

GLOBAL_EXECUTOR = ThreadPoolExecutor()
GLOBAL_LOOP = asyncio.new_event_loop()
asyncio.set_event_loop(GLOBAL_LOOP)

atexit.register(GLOBAL_EXECUTOR.shutdown)

@dataclass
class MyInput
   arg1: str
   arg2: str

def compute_module_fn(context, event: MyInput):    
    # Run the coroutine explicitly in the event loop
    retval = GLOBAL_LOOP.run_until_complete(create_resupply_plan(event.arg1, event.arg2))
    # retrval is a dataclass and needs to be made json serializable
    return asdict(retval)


if __name__ == "__main__":
    add_function(compute_module_fn)
    start_compute_module()
1 Like