Skip to content

cortex.utils

utils

Utilities module for Cortex framework.

Functions

compute_fingerprint

compute_fingerprint(message_class: type[Message]) -> int

Compute a 64-bit fingerprint for a message class.

The fingerprint is based on the fully qualified class name and the field names/types to ensure type safety across processes.

Parameters:

Name Type Description Default
message_class type[Message]

The message class to compute fingerprint for.

required

Returns:

Type Description
int

A 64-bit unsigned integer fingerprint.

Source code in src/cortex/utils/hashing.py
def compute_fingerprint(message_class: type["Message"]) -> int:
    """
    Compute a 64-bit fingerprint for a message class.

    The fingerprint is based on the fully qualified class name and
    the field names/types to ensure type safety across processes.

    Args:
        message_class: The message class to compute fingerprint for.

    Returns:
        A 64-bit unsigned integer fingerprint.
    """
    # Build a canonical string representation of the message type
    class_name = f"{message_class.__module__}.{message_class.__qualname__}"

    # Include field information for structural fingerprinting
    field_info = []
    if hasattr(message_class, "__dataclass_fields__"):
        for name, field in message_class.__dataclass_fields__.items():
            field_type = getattr(field.type, "__name__", str(field.type))
            field_info.append(f"{name}:{field_type}")

    canonical = f"{class_name}|{','.join(sorted(field_info))}"

    # Compute SHA-256 and take first 8 bytes as 64-bit fingerprint
    hash_bytes = hashlib.sha256(canonical.encode("utf-8")).digest()
    fingerprint = struct.unpack(">Q", hash_bytes[:8])[0]

    return fingerprint

get_logger

get_logger(
    name: str, level: int = logging.INFO
) -> logging.Logger

Get a configured logger with colored output.

Parameters:

Name Type Description Default
name str

Logger name (e.g., "cortex.discovery")

required
level int

Logging level (default: INFO)

INFO

Returns:

Type Description
Logger

Configured logger instance

Source code in src/cortex/utils/logging.py
def get_logger(name: str, level: int = logging.INFO) -> logging.Logger:
    """
    Get a configured logger with colored output.

    Args:
        name: Logger name (e.g., "cortex.discovery")
        level: Logging level (default: INFO)

    Returns:
        Configured logger instance
    """
    logger = logging.getLogger(name)

    # Avoid adding duplicate handlers
    if logger.handlers:
        return logger

    logger.setLevel(level)
    logger.propagate = False

    # Create console handler with colored formatter
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(
        ColoredFormatter(
            fmt="%(asctime)s | %(levelname)s | %(message)s",
            datefmt="%H:%M:%S",
        )
    )

    logger.addHandler(handler)

    return logger

set_log_level

set_log_level(logger: Logger, level: str) -> None

Set the log level for a logger and its handlers.

Parameters:

Name Type Description Default
logger Logger

The logger to configure

required
level str

Log level name ("DEBUG", "INFO", "WARNING", "ERROR")

required
Source code in src/cortex/utils/logging.py
def set_log_level(logger: logging.Logger, level: str) -> None:
    """
    Set the log level for a logger and its handlers.

    Args:
        logger: The logger to configure
        level: Log level name ("DEBUG", "INFO", "WARNING", "ERROR")
    """
    log_level = getattr(logging, level.upper(), logging.INFO)
    logger.setLevel(log_level)
    for handler in logger.handlers:
        handler.setLevel(log_level)

run

run(
    coro: Coroutine[Any, Any, Any], *, debug: bool = False
) -> Any

Run a coroutine, preferring uvloop when available.

Drop-in replacement for :func:asyncio.run. On Unix with uvloop installed, this yields noticeably lower tail latency on high-rate small-message workloads.

Parameters:

Name Type Description Default
coro Coroutine[Any, Any, Any]

The top-level coroutine to run to completion.

required
debug bool

Pass through to the event loop's debug flag.

False

Returns:

Type Description
Any

Whatever coro returns.

Source code in src/cortex/utils/loop.py
def run(coro: Coroutine[Any, Any, Any], *, debug: bool = False) -> Any:
    """Run a coroutine, preferring ``uvloop`` when available.

    Drop-in replacement for :func:`asyncio.run`. On Unix with ``uvloop``
    installed, this yields noticeably lower tail latency on high-rate
    small-message workloads.

    Args:
        coro: The top-level coroutine to run to completion.
        debug: Pass through to the event loop's ``debug`` flag.

    Returns:
        Whatever ``coro`` returns.
    """
    if _uvloop_available:
        import uvloop

        logger.info("Using uvloop event loop")
        return uvloop.run(coro, debug=debug)

    logger.info("Using asyncio event loop")
    return asyncio.run(coro, debug=debug)

deserialize

deserialize(data: bytes) -> tuple[Any, int]

Deserialize bytes to a value.

Returns:

Type Description
tuple[Any, int]

Tuple of (value, bytes_consumed)

Source code in src/cortex/utils/serialization.py
def deserialize(data: bytes) -> tuple[Any, int]:
    """
    Deserialize bytes to a value.

    Returns:
        Tuple of (value, bytes_consumed)
    """
    offset = 0
    view = _as_buffer_view(data)
    data_type = DataType(struct.unpack(">B", view[offset : offset + 1])[0])
    offset += 1

    if data_type == DataType.NONE:
        return None, offset

    if data_type == DataType.NUMPY:
        arr, arr_bytes = deserialize_numpy(view[offset:])
        return arr, offset + arr_bytes

    if data_type == DataType.TORCH:
        tensor, tensor_bytes = deserialize_torch(bytes(view[offset:]))
        return tensor, offset + tensor_bytes

    if data_type == DataType.BYTES:
        length = struct.unpack(">I", view[offset : offset + 4])[0]
        offset += 4
        return bytes(view[offset : offset + length]), offset + length

    if data_type in (DataType.DICT, DataType.LIST, DataType.PRIMITIVE):
        length = struct.unpack(">I", view[offset : offset + 4])[0]
        offset += 4
        payload = view[offset : offset + length]
        value = msgpack.unpackb(payload, raw=False, ext_hook=_msgpack_ext_hook)
        return value, offset + length

    raise ValueError(f"Unknown data type: {data_type}")

serialize

serialize(value: Any) -> bytes

Serialize any supported value to bytes.

Supported types: - None, int, float, str, bool - bytes - list, dict - numpy.ndarray - torch.Tensor

Source code in src/cortex/utils/serialization.py
def serialize(value: Any) -> bytes:
    """
    Serialize any supported value to bytes.

    Supported types:
    - None, int, float, str, bool
    - bytes
    - list, dict
    - numpy.ndarray
    - torch.Tensor
    """
    if value is None:
        return struct.pack(">B", DataType.NONE)

    if isinstance(value, np.ndarray):
        return struct.pack(">B", DataType.NUMPY) + serialize_numpy(value)

    if TORCH_AVAILABLE and isinstance(value, torch.Tensor):
        return struct.pack(">B", DataType.TORCH) + serialize_torch(value)

    if isinstance(value, bytes):
        return struct.pack(">BI", DataType.BYTES, len(value)) + value

    if isinstance(value, dict):
        packed = msgpack.packb(value, default=_msgpack_default, use_bin_type=True)
        return struct.pack(">BI", DataType.DICT, len(packed)) + packed

    if isinstance(value, (list, tuple)):
        packed = msgpack.packb(value, default=_msgpack_default, use_bin_type=True)
        return struct.pack(">BI", DataType.LIST, len(packed)) + packed

    packed = msgpack.packb(value, use_bin_type=True)
    return struct.pack(">BI", DataType.PRIMITIVE, len(packed)) + packed