Concurrency
Concurrency utilities for bridging sync and async code.
This module provides two primary helpers:
- asyncify: Wrap a synchronous function so it can be awaited in async code by offloading it to a worker thread using AnyIO.
- syncify: Wrap an asynchronous function so it can be called from synchronous code. By default, this uses a shared AnyIO BlockingPortal running in a background thread.
Usage
from gllm_core.concurrency import asyncify, syncify
# Asyncify a sync function
async_op = asyncify(blocking_fn)
result = await async_op(arg1, arg2)
# Syncify an async function
sync_op = syncify(async_fn)
result = sync_op(arg1, arg2)
Notes: 1. For asyncify: Cancelling an await of an asyncified sync function cancels the awaiter, but the underlying thread cannot be forcibly interrupted. The function continues to run until it returns. 2. For syncify: A shared default BlockingPortal is lazily created on first use and shut down at process exit.
asyncify(func, *, cancellable=False, limiter=None)
Wrap a sync function into an awaitable callable using a worker thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[P, R]
|
Synchronous function to wrap. |
required |
cancellable
|
bool
|
If True, allow cancellation of the awaiter while running in a worker thread. Defaults to False. |
False
|
limiter
|
CapacityLimiter | None
|
Capacity limiter to throttle concurrent thread usage. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, Awaitable[R]]
|
Callable[P, Awaitable[R]]: An async function that when awaited will execute |
Callable[P, Awaitable[R]]
|
worker thread and return its result. |
Usage
async def handler() -> int:
wrapped = asyncify(blocking_func)
return await wrapped(1, 2)
get_default_portal()
Return the shared default BlockingPortal.
Returns:
| Name | Type | Description |
|---|---|---|
BlockingPortal |
BlockingPortal
|
A process-wide portal running on a background thread. |
syncify(async_func, *, portal=None)
Wrap an async function to be callable from synchronous code.
Lifecycle and portals:
1. This helper uses an already running AnyIO BlockingPortal to execute the coroutine.
2. If portal is not provided, a process-wide shared portal is used. Its lifecycle is
managed internally: it is created lazily on first use and shut down automatically at process exit.
3. If you provide a portal, you are expected to manage its lifecycle, typically with a
context manager. This is recommended when making many calls in a bounded scope since it
avoids per-call startup costs while allowing deterministic teardown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_func
|
Callable[P, Awaitable[R]]
|
Asynchronous function to wrap. |
required |
portal
|
BlockingPortal | None
|
Portal to use for calling the async function from sync code. Defaults to None, in which case a shared default portal is used. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, R]
|
Callable[P, R]: A synchronous function that runs the coroutine and returns its result. |
Usage
# Use the default shared portal (most convenient)
def do_work(x: int) -> int:
sync_call = syncify(async_func)
return sync_call(x)
# Reuse a scoped portal for multiple calls (deterministic lifecycle)
from anyio.from_thread import start_blocking_portal
with start_blocking_portal() as portal:
sync_call = syncify(async_func, portal=portal)
a = sync_call(1)
b = sync_call(2)
Notes
Creating a brand-new portal per call is discouraged due to the overhead of spinning up and tearing down a background event loop/thread. Prefer the shared portal or a scoped portal reused for a batch of calls.