I’m currently building a compute module with our ontology and am having problems running it. While I can get the module to boot up successfully, there’s a problem running the main function, which is an asynchronous call. I need to make several async calls within my main function as the code would run far too slowly if I didn’t. The problem is I constantly receive this error:
raise RuntimeError('cannot schedule new futures after '\nRuntimeError: cannot schedule new futures after interpreter shutdown\n"
I believe this is because the event loop the container generates closes while my async function is executing, causing a race condition. I tried initializing the function as an async function, but this didn’t work. I tried the following as well to no success:
Is there a way people have handled async calls? Or is this simply something that’s not supported by compute modules? My alternative idea is to create a FastAPI endpoint that links to my function but I have no idea how I would hook that into a compute module. Thanks all!
The @function decorator will not work if you want to use threading, if you want to use threading you’ll need to use the alternative method for registering functions & starting your compute module:
The reason behind this is because the decorator @function uses atexit under the hood. And atexit is not compatible with multithreading https://docs.python.org/3/library/atexit.html#atexit.register
@lauraaf THANK YOU SO MUCH, I was going over this problem for legit hours! I also had to wrap my async function in ThreadPoolExecutor.run_until_complete within my function in order to ensure it completed. The code looked something like this, for anyone who is interested:
from compute_modules import add_function, start_compute_module
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
import atexit
from .main import main_async_function
GLOBAL_EXECUTOR = ThreadPoolExecutor()
GLOBAL_LOOP = asyncio.new_event_loop()
asyncio.set_event_loop(GLOBAL_LOOP)
atexit.register(GLOBAL_EXECUTOR.shutdown)
@dataclass
class MyInput
arg1: str
arg2: str
def compute_module_fn(context, event: MyInput):
# Run the coroutine explicitly in the event loop
retval = GLOBAL_LOOP.run_until_complete(create_resupply_plan(event.arg1, event.arg2))
# retrval is a dataclass and needs to be made json serializable
return asdict(retval)
if __name__ == "__main__":
add_function(compute_module_fn)
start_compute_module()