Utils
Utility modules for use in the GLLM Core package.
ArgUsages
Bases: BaseModel
Model representing the different types of argument usage in a run.
Attributes:
| Name | Type | Description |
|---|---|---|
required |
list[str]
|
A list of argument names that are required. |
optional |
list[str]
|
A list of argument names that are optional. |
unresolvable |
list[str]
|
A list of unresolvable key patterns encountered during analysis. |
BinaryHandlingStrategy
Bases: StrEnum
Enum for binary data handling options.
Attributes:
| Name | Type | Description |
|---|---|---|
SKIP |
str
|
Skip binary data. |
BASE64 |
str
|
Encode binary data to base64. |
HEX |
str
|
Encode binary data to hex. |
SHOW_SIZE |
str
|
Show the size of binary data. |
ChunkMetadataMerger(merger_func_map=None, default_merger_func=None, retained_keys=None)
A helper class to merge metadata from multiple chunks.
Attributes:
| Name | Type | Description |
|---|---|---|
merger_func_map |
dict[str, Callable[[list[Any]], Any]]
|
A mapping of metadata keys to merger functions. |
default_merger_func |
Callable[[list[Any]], Any]
|
The default merger function for metadata keys that are not present in the merger_func_map. |
retained_keys |
set[str] | None
|
The keys that should be retained in the merged metadata. If None, all intersection keys are retained. |
Initializes a new instance of the ChunkMetadataMerger class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
merger_func_map
|
dict[str, Callable[[list[Any]], Any]] | None
|
A mapping of metadata keys to merger functions. Defaults to None, in which case a default merger map is used. The default merger map: 1. Picks the first value of the PREV_CHUNK_ID key. 2. Picks the last value of the NEXT_CHUNK_ID key. |
None
|
default_merger_func
|
Callable[[list[Any]], Any] | None
|
The default merger for metadata keys that are not present in the merger_func_map. Defaults to None, in which case a default merger that picks the first value is used. |
None
|
retained_keys
|
set[str] | None
|
The keys that should be retained in the merged metadata. Defaults to None, in which case all intersection keys are retained. |
None
|
merge(metadatas)
Merges metadata from multiple chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metadatas
|
list[dict[str, Any]]
|
The metadata to merge. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
dict[str, Any]: The merged metadata. |
LoggerManager
A singleton class to manage logging configuration.
This class ensures that the root logger is initialized only once and is used across the application.
Basic usage
The LoggerManager can be used to get a logger instance as follows:
logger = LoggerManager().get_logger()
logger.info("This is an info message")
Set logging configuration
The LoggerManager also provides capabilities to set the logging configuration:
manager = LoggerManager()
manager.set_level(logging.DEBUG)
manager.set_log_format(custom_log_format)
manager.set_date_format(custom_date_format)
Add custom handlers
The LoggerManager also provides capabilities to add custom handlers to the root logger:
manager = LoggerManager()
handler = logging.FileHandler("app.log")
manager.add_handler(handler)
Extra error information
When logging errors, extra error information can be added as follows:
logger.error("I am dead!", extra={"error_code": "ERR_CONN_REFUSED"})
Logging modes
The LoggerManager supports three logging modes:
-
Text: Logs in a human-readable format with RichHandler column-based formatting. Used when the
LOG_FORMATenvironment variable is set to "text". Output example:log 2025-10-08T09:26:16 DEBUG [LoggerName] This is a debug message. 2025-10-08T09:26:17 INFO [LoggerName] This is an info message. 2025-10-08T09:26:18 WARNING [LoggerName] This is a warning message. 2025-10-08T09:26:19 ERROR [LoggerName] This is an error message. 2025-10-08T09:26:20 CRITICAL [LoggerName] This is a critical message. -
Simple: Logs in a human-readable format with Rich colors but without columns-based formatting. Used when the
LOG_FORMATenvironment variable is set to "simple". Output example:log [2025-10-08T09:26:16.123 LoggerName DEBUG] This is a debug message. [2025-10-08T09:26:17.456 LoggerName INFO] This is an info message. [2025-10-08T09:26:18.789 LoggerName WARNING] This is a warning message. [2025-10-08T09:26:19.012 LoggerName ERROR] This is an error message. [2025-10-08T09:26:20.345 LoggerName CRITICAL] This is a critical message. -
JSON: Logs in a JSON format, recommended for easy parsing due to the structured nature of the log records. Used when the
LOG_FORMATenvironment variable is set to "json". Output example:log {"timestamp": "2025-10-08T11:23:43+0700", "name": "LoggerName", "level": "DEBUG", "message": "..."} {"timestamp": "2025-10-08T11:23:44+0700", "name": "LoggerName", "level": "INFO", "message": "..."} {"timestamp": "2025-10-08T11:23:45+0700", "name": "LoggerName", "level": "WARNING", "message": "..."} {"timestamp": "2025-10-08T11:23:46+0700", "name": "LoggerName", "level": "ERROR", "message": "..."} {"timestamp": "2025-10-08T11:23:47+0700", "name": "LoggerName", "level": "CRITICAL", "message": "..."}
When the LOG_FORMAT environment is not set, the LoggerManager defaults to "text" mode.
__new__()
Initialize the singleton instance.
Returns:
| Name | Type | Description |
|---|---|---|
LoggerManager |
LoggerManager
|
The singleton instance. |
add_handler(handler)
Add a custom handler to the root logger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
Handler
|
The handler to add to the root logger. |
required |
get_logger(name=None)
Get a logger instance.
This method returns a logger instance that is a child of the root logger. If name is not provided, the root logger will be returned instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
The name of the child logger. If None, the root logger will be returned. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Logger
|
logging.Logger: Configured logger instance. |
set_date_format(date_format)
Set date format for all loggers in the hierarchy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
date_format
|
str
|
The date format to set. |
required |
set_level(level)
Set logging level for all loggers in the hierarchy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
int
|
The logging level to set (e.g., logging.INFO, logging.DEBUG). |
required |
set_log_format(log_format)
Set logging format for all loggers in the hierarchy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_format
|
str
|
The log format to set. |
required |
MergerMethod
Deprecated. Use functions from gllm_core.utils.mergers directly.
concatenate(delimiter='-')
staticmethod
Deprecated. Use gllm_core.utils.mergers.concatenate instead.
merge_overlapping_strings(delimiter='\n')
staticmethod
Deprecated. Use gllm_core.utils.mergers.merge_overlapping_strings instead.
pick_first(values)
staticmethod
Deprecated. Use gllm_core.utils.mergers.pick_first instead.
pick_last(values)
staticmethod
Deprecated. Use gllm_core.utils.mergers.pick_last instead.
MethodSignature
Bases: BaseModel
Model representing the signature of a method.
Attributes:
| Name | Type | Description |
|---|---|---|
parameters |
dict[str, ParameterInfo]
|
A dictionary of parameter names to their information. |
is_async |
bool
|
Whether the method is asynchronous. |
ParameterInfo
Bases: BaseModel
Model representing information about a method parameter.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
ParameterKind
|
The kind of the parameter. |
default |
str
|
The default value of the parameter, if any. |
annotation |
str
|
The type annotation of the parameter, if any. |
ParameterKind
Bases: StrEnum
Enum representing the different kinds of parameters a method can have.
RetryConfig
Bases: BaseModel
Configuration for retry behavior.
Attributes:
| Name | Type | Description |
|---|---|---|
max_retries |
int
|
Maximum number of retry attempts. |
base_delay |
float
|
Base delay in seconds between retries. |
max_delay |
float
|
Maximum delay in seconds between retries. |
jitter |
bool
|
Whether to add random jitter to delays. |
timeout |
float | None
|
Overall timeout in seconds for the entire operation. If None, timeout is disabled. |
retry_on_exceptions |
tuple[type[Exception], ...]
|
Tuple of exception types to retry on. |
validate_delay_constraints()
Validates that max_delay is greater than or equal to base_delay.
Returns:
| Name | Type | Description |
|---|---|---|
RetryConfig |
RetryConfig
|
The validated configuration. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If max_delay is less than base_delay. |
RunAnalyzer(cls)
Bases: NodeVisitor
AST NodeVisitor that analyzes a class to build a RunProfile.
The run analyzer visits the AST nodes of a class to analyze the _run method and build a RunProfile. It will look for the usage of the **kwargs parameter in method calls and subscript expressions. The traversal result is stored as a RunProfile object.
Attributes:
| Name | Type | Description |
|---|---|---|
cls |
type
|
The class to analyze. |
profile |
RunProfile
|
The profile of the run being analyzed. |
Initialize the RunAnalyzer with a class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls
|
type
|
The class to analyze. |
required |
visit_Call(node)
Visit a Call node in the AST.
This node represents a function call in the source code. Here, we are looking for calls to methods that fully pass the kwargs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
Call
|
The Call node to visit. |
required |
visit_Subscript(node)
Visit a Subscript node in the AST.
The Subscript node represents a subscripted value in the source code. Example: kwargs["key"]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
Subscript
|
The Subscript node to visit. |
required |
RunArgumentUsageType
Bases: StrEnum
Enum representing the different types of argument usage in a run.
RunProfile(**data)
Bases: BaseModel
Model representing the profile of a run.
Attributes:
| Name | Type | Description |
|---|---|---|
arg_usages |
ArgUsages
|
A dictionary mapping argument usage types to lists of argument names. |
full_pass_methods |
list[str]
|
A list of method names that fully pass the kwargs. |
method_signatures |
dict[str, MethodSignature]
|
A dictionary mapping method names to their signatures. |
Initialize the RunProfile with the given data.
This is to circumvent Pylint false positives due to the usage of Field(default_factory=...).
analyze_method(cls, method)
Analyze a method using RunAnalyzer.
This function encapsulates the common analysis logic used by both Component._analyze_run_method() and schema_generator._generate_from_analyzer().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls
|
type
|
The class containing the method (for analyzer context). |
required |
method
|
Callable
|
The method to analyze. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
RunProfile |
RunProfile
|
The analysis results. |
asyncify(func, *, cancellable=False, limiter=None)
Wrap a sync function into an awaitable callable using a worker thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[P, R]
|
Synchronous function to wrap. |
required |
cancellable
|
bool
|
If True, allow cancellation of the awaiter while running in a worker thread. Defaults to False. |
False
|
limiter
|
CapacityLimiter | None
|
Capacity limiter to throttle concurrent thread usage. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, Awaitable[R]]
|
Callable[P, Awaitable[R]]: An async function that when awaited will execute |
Callable[P, Awaitable[R]]
|
worker thread and return its result. |
Usage
async def handler() -> int:
wrapped = asyncify(blocking_func)
return await wrapped(1, 2)
check_optional_packages(packages, error_message=None, install_instructions=None, extras=None)
Check if optional packages are available and raise ImportError if not.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packages
|
str | list[str]
|
Package name or list of package names to check. |
required |
error_message
|
str | None
|
Custom error message. If None, a default message is used. Defaults to None. |
None
|
install_instructions
|
str | None
|
Installation instructions. If None, generates uv sync command. Defaults to None. |
None
|
extras
|
str | list[str] | None
|
Extras that contain the required packages. If provided, generates specific installation instructions using uv sync. If install_instructions is None, it will create default instructions based on the extras. If install_instructions is not None, it will use the provided instructions directly and ignore this argument. Defaults to None. |
None
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If any of the required packages are not installed. |
concatenate(delimiter='-')
Creates a function that concatenates a list of values with a delimiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delimiter
|
str
|
The delimiter to use when concatenating the values. Defaults to "-". |
'-'
|
Returns:
| Type | Description |
|---|---|
Callable[[list[Any]], str]
|
Callable[[list[Any]], str]: A function that concatenates a list of values with the delimiter. |
cosine(vector, matrix)
Calculate cosine similarities between a vector and a matrix of vectors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vector
|
list[float]
|
The input vector to compare against. |
required |
matrix
|
list[list[float]]
|
The matrix of vectors to compare with. |
required |
Returns:
| Type | Description |
|---|---|
list[float]
|
list[float]: The list of cosine similarities between the vector and each row of the matrix. |
deprecated(deprecated_in, removed_in, current_version=None, details='')
Decorator to mark functions as deprecated.
This is currently implemented as a thin wrapper around deprecation.deprecated for consistency, since deprecation may be deprecated when we move into Python 3.13, where @warnings.deprecated will be available.
Usage example:
@deprecated(deprecated_in="0.1.0", removed_in="0.2.0", current_version="0.1.1")
def old_function():
pass
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deprecated_in
|
str
|
The version when the function was deprecated. |
required |
removed_in
|
str
|
The version when the function will be removed. |
required |
current_version
|
str | None
|
The current version of the package. Defaults to None. |
None
|
details
|
str
|
Additional details about the deprecation. Defaults to an empty string. |
''
|
Returns:
| Name | Type | Description |
|---|---|---|
Callable |
Callable
|
The decorated function. |
format_chunk_for_logging(chunk, rank=None, include_score=True, include_metadata=True)
Formats a log to display a single chunk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
Chunk
|
The chunk to be formatted. |
required |
rank
|
int | None
|
The optional rank of the formatted chunk. Defaults to None. |
None
|
include_score
|
bool
|
Whether to include the score in the formatted message. Defaults to True. |
True
|
include_metadata
|
bool
|
Whether to include the metadata in the formatted message. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A formatted log message that displays information about the logged chunk. |
format_chunk_message(chunk, rank=None, include_score=True, include_metadata=True)
Formats a log to display a single chunk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
Chunk
|
The chunk to be formatted. |
required |
rank
|
int | None
|
The optional rank of the formatted chunk. Defaults to None. |
None
|
include_score
|
bool
|
Whether to include the score in the formatted message. Defaults to True. |
True
|
include_metadata
|
bool
|
Whether to include the metadata in the formatted message. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A formatted log message that displays information about the logged chunk. |
get_default_portal()
Return the shared default BlockingPortal.
Returns:
| Name | Type | Description |
|---|---|---|
BlockingPortal |
BlockingPortal
|
A process-wide portal running on a background thread. |
get_value_repr(value)
Get the string representation of a value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
The value to get the string representation of. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The string representation of the value. |
load_gsheets(client_email, private_key, sheet_id, worksheet_id)
Loads data from a Google Sheets worksheet.
This function retrieves data from a Google Sheets worksheet using service account credentials. It authorizes the client, selects the specified worksheet, and reads the worksheet data. The first row of the worksheet will be treated as the column names.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_email
|
str
|
The client email associated with the service account. |
required |
private_key
|
str
|
The private key used for authentication. |
required |
sheet_id
|
str
|
The ID of the Google Sheet. |
required |
worksheet_id
|
str
|
The ID of the worksheet within the Google Sheet. |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, str]]
|
list[dict[str, str]]: A list of dictionaries containing the Google Sheets content. |
merge_overlapping_strings(delimiter='\n', min_overlap=1, max_window=200)
Creates a function that merges a list of strings, handling common prefixes and overlaps.
The created function will: 1. Identify and remove any common prefix shared by the strings. 2. Process each pair of adjacent strings to remove overlapping strings. 3. Join the cleaned strings together, including the common prefix at the beginning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delimiter
|
str
|
The delimiter to use when merging the values. Defaults to "\n". |
'\n'
|
min_overlap
|
int
|
Minimum overlap length to consider valid. Defaults to 1. |
1
|
max_window
|
int
|
Maximum window size to search for overlaps. Defaults to 200. |
200
|
Returns:
| Type | Description |
|---|---|
Callable[[list[str]], str]
|
Callable[[list[str]], str]: A function that merges a list of strings, handling common prefixes and overlaps. |
pick_first(values)
Picks the first value from a list of values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values
|
list[Any]
|
The values to pick from. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The first value from the list. |
pick_last(values)
Picks the last value from a list of values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values
|
list[Any]
|
The values to pick from. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The last value from the list. |
syncify(async_func, *, portal=None)
Wrap an async function to be callable from synchronous code.
Lifecycle and portals:
1. This helper uses an already running AnyIO BlockingPortal to execute the coroutine.
2. If portal is not provided, a process-wide shared portal is used. Its lifecycle is
managed internally: it is created lazily on first use and shut down automatically at process exit.
3. If you provide a portal, you are expected to manage its lifecycle, typically with a
context manager. This is recommended when making many calls in a bounded scope since it
avoids per-call startup costs while allowing deterministic teardown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_func
|
Callable[P, Awaitable[R]]
|
Asynchronous function to wrap. |
required |
portal
|
BlockingPortal | None
|
Portal to use for calling the async function from sync code. Defaults to None, in which case a shared default portal is used. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, R]
|
Callable[P, R]: A synchronous function that runs the coroutine and returns its result. |
Usage
# Use the default shared portal (most convenient)
def do_work(x: int) -> int:
sync_call = syncify(async_func)
return sync_call(x)
# Reuse a scoped portal for multiple calls (deterministic lifecycle)
from anyio.from_thread import start_blocking_portal
with start_blocking_portal() as portal:
sync_call = syncify(async_func, portal=portal)
a = sync_call(1)
b = sync_call(2)
Notes
Creating a brand-new portal per call is discouraged due to the overhead of spinning up and tearing down a background event loop/thread. Prefer the shared portal or a scoped portal reused for a batch of calls.
validate_enum(enum_type, value)
Validates that the provided value is a valid enum value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enum_type
|
type[E]
|
The type of the enum. |
required |
value
|
object
|
The value to validate. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the provided value is not a valid enum value. |
validate_string_enum(enum_type, value)
Validates that the provided value is a valid string enum value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enum_type
|
type[StrEnum]
|
The type of the string enum. |
required |
value
|
str
|
The value to validate. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the provided value is not a valid string enum value. |