Files
ai/sandbox/dexorder/event_loop.py

55 lines
2.1 KiB
Python

"""
Thread-safe asyncio.run() for the sandbox.
Installs a global replacement for asyncio.run() that, when called from a
non-async thread while uvicorn's event loop is running, dispatches the
coroutine to that loop via run_coroutine_threadsafe(). The calling thread
blocks on future.result() — releasing the GIL — so uvicorn's loop runs
freely (health checks, MCP requests, etc.).
Usage:
from dexorder.event_loop import install_thread_safe_asyncio_run
install_thread_safe_asyncio_run(asyncio.get_running_loop()) # call once at startup
"""
import asyncio
import logging
log = logging.getLogger(__name__)
_main_loop: asyncio.AbstractEventLoop | None = None
_original_asyncio_run = asyncio.run
def install_thread_safe_asyncio_run(loop: asyncio.AbstractEventLoop) -> None:
"""
Patch asyncio.run globally to cooperate with uvicorn's event loop.
Call once from the lifespan startup (main thread, loop already running).
"""
global _main_loop
_main_loop = loop
def _thread_safe_run(coro, *, debug=None):
# Detect if we're in a thread (no running loop in this thread)
try:
asyncio.get_running_loop()
# We're already inside an async context — asyncio.run() is not
# valid here regardless; let it raise the normal error.
raise RuntimeError(
"asyncio.run() cannot be called when another event loop is running "
"in the same thread."
)
except RuntimeError as exc:
if "cannot be called" in str(exc):
raise
# No running loop in this thread — safe to dispatch to main loop.
if _main_loop is not None and _main_loop.is_running():
log.debug("asyncio.run() from thread → run_coroutine_threadsafe")
return asyncio.run_coroutine_threadsafe(coro, _main_loop).result()
# Fallback: main loop not available (e.g., called before startup or in tests)
return _original_asyncio_run(coro, debug=debug)
asyncio.run = _thread_safe_run
log.info("Installed thread-safe asyncio.run()")