# Copyright (C) MatrixEditor 2023-2026
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pyright: reportPrivateUsage=false, reportExplicitAny=false, reportAny=false
from io import BytesIO
from collections.abc import Collection, Iterable
from functools import partial
from typing import Any, Callable, Generic
from typing_extensions import TypeVar, overload, override
from caterpillar.byteorder import byteorder
from caterpillar.options import Flag
from caterpillar.context import CTX_SEQ, CTX_STREAM
from caterpillar._common import unpack_seq, pack_seq, WithoutContextVar
from caterpillar.shared import PackMixin, UnpackMixin, getstruct
from caterpillar.abc import (
_ContextLambda,
_ContextLike,
_StructLike,
_LengthT,
_IT,
_OT,
_OptionLike,
_EndianLike,
_SwitchLambda,
_ArgType,
)
from ._base import Field
class ByteOrderMixin(Generic[_IT, _OT]):
def __set_byteorder__(self, order: _EndianLike) -> Field[_IT, _OT]:
"""Returns a field with the given byteorder"""
return Field(self, order=order) # pyright: ignore[reportArgumentType]
[docs]
class FieldMixin(ByteOrderMixin[_IT, _OT]):
"""A simple mixin to support operators used to create :class:`Field` instances."""
def __or__(self, flag: _OptionLike) -> Field[_IT, _OT]:
"""Creates a field *with* the given flag."""
# fmt: off
return Field(self, byteorder(self)) | flag # pyright: ignore[reportArgumentType]
def __xor__(self, flag: Flag) -> Field[_IT, _OT]:
"""Creates a field *without* the given flag."""
# fmt: off
return Field(self, byteorder(self)) ^ flag # pyright: ignore[reportArgumentType]
def __matmul__(self, offset: _ContextLambda[int] | int) -> Field[_IT, _OT]:
"""Creates a field that should start at the given offset."""
# fmt: off
return Field(self, byteorder(self)) @ offset # pyright: ignore[reportArgumentType]
def __getitem__(self, dim: _LengthT) -> Field[Collection[_IT], Collection[_OT]]:
"""Returns a sequenced field."""
# fmt: off
return Field(self, byteorder(self))[dim] # pyright: ignore[reportArgumentType]
def __rshift__(
self, switch: _SwitchLambda | dict[str, _StructLike]
) -> Field[_IT, _OT]:
"""Inserts switch options into the new field"""
# fmt: off
return Field(self, byteorder(self)) >> switch # pyright: ignore[reportArgumentType]
def __floordiv__(self, condition: _ContextLambda[bool] | bool) -> Field[_IT, _OT]:
"""Returns a field with the given condition"""
# fmt: off
return Field(self, byteorder(self)) // condition # pyright: ignore[reportArgumentType]
def __rsub__(self, bits: _ContextLambda[int] | int) -> Field[_IT, _OT]:
"""Returns a field with the given bit count"""
# fmt: off
return Field(self, byteorder(self), bits=bits) # pyright: ignore[reportArgumentType]
@overload
def __and__(
self, other: "_StructLike[_ChainHeadT, _ChainTailT]"
) -> "Chain[_IT, _ChainTailT]": ...
@overload
def __and__(
self, other: "Chain[_ChainHeadT, _ChainTailT]"
) -> "Chain[_ChainHeadT, _OT]": ...
def __and__(
self, other: "Chain[_ChainHeadT, _ChainTailT] | _StructLike[_IT, _OT]"
) -> "Chain":
"""Returns a chain with the next element added at the end"""
# fmt: off
if isinstance(other, Chain):
return other & self # pyright: ignore[reportOperatorIssue, reportUnknownVariableType]
return Chain(self, other) # pyright: ignore[reportArgumentType]
[docs]
class FieldStruct(FieldMixin[_IT, _OT], PackMixin[_IT], UnpackMixin[_OT]):
"""
A mix-in class combining the behavior of _StructLike with additional
functionality for packing and unpacking structured data.
"""
#!! Removed in 2.8.0
# __slots__: tuple[str, ...] = ("__byteorder__", "__bits__")
# __byteorder__: _EndianLike | None
# __bits__: int | _ContextLambda[int] | None
[docs]
def pack_single(self, obj: _IT, context: _ContextLike) -> None:
"""
Abstract method to pack a single element.
:param obj: The element to pack.
:type obj: Any
:param context: The current operation context.
:type context: _ContextLike
:raises NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError
[docs]
def unpack_single(self, context: _ContextLike) -> _OT:
"""
Abstract method to unpack a single element.
:param context: The current operation context.
:type context: _ContextLike
:raises NotImplementedError: This method must be implemented by subclasses.
:return: The unpacked element.
"""
raise NotImplementedError
[docs]
def pack_seq(self, seq: Collection[_IT], context: _ContextLike) -> None:
"""
Pack a sequence of elements using the provided context.
:param seq: The sequence of elements to pack.
:type seq: Iterable
:param context: The current operation context.
:type context: _ContextLike
"""
pack_seq(seq, context, self.pack_single)
[docs]
def unpack_seq(self, context: _ContextLike) -> Collection[_OT]:
"""
Unpack a sequence of elements using the provided context.
:param context: The current operation context.
:type context: _ContextLike
:return: The list of unpacked elements.
"""
return unpack_seq(context, self.unpack_single)
[docs]
def __pack__(self, obj: _IT, context: _ContextLike) -> None:
"""
Pack data based on whether the field is sequential or not.
:param obj: The data to pack.
:type obj: Any
:param context: The current operation context.
:type context: _ContextLike
"""
# fmt: off
(self.pack_single if not context[CTX_SEQ] else self.pack_seq)(obj, context) # pyright: ignore[reportArgumentType]
[docs]
def __unpack__(self, context: _ContextLike) -> _OT:
"""
Unpack data based on whether the field is sequential or not.
:param context: The current operation context.
:type context: _ContextLike
:return: The unpacked data.
"""
# fmt: off
return (self.unpack_seq if context[CTX_SEQ] else self.unpack_single)(context) # pyright: ignore[reportReturnType]
[docs]
@override
def __repr__(self) -> str:
"""
String representation of the FieldStruct instance.
:return: A string representation.
"""
return f"<{self.__class__.__name__}>"
_ChainHeadT = TypeVar("_ChainHeadT", default=Any)
_ChainTailT = TypeVar("_ChainTailT", default=Any)
[docs]
class Chain(FieldStruct[_ChainHeadT, _ChainTailT]):
"""
Represents a chain of structures where each structure in the chain is linked
to the next one, forming a sequence.
:param initial: The initial structure in the chain.
:param structs: Additional structures to be added to the chain.
The chain allows packing and unpacking data through its elements in sequence.
.. note::
- Unpacking travels from the head to the tail.
- Packing travels from the tail to the head.
"""
__slots__: tuple[str, ...] = ("_elements",)
[docs]
def __init__(
self,
initial: _StructLike[_ChainHeadT, Any],
*structs: _StructLike[Any, Any],
tail: _StructLike[bytes, _ChainTailT] | None = None,
) -> None:
# fmt: off
# start -> next -> next -> next -> done | unpack
# Y
# done <- previous <- previous <- start | pack
self._elements: list[_StructLike] = [getstruct(initial) or initial] # pyright: ignore[reportAttributeAccessIssue]
self._elements += [x for x in map(lambda x: getstruct(x, x), structs) if x] # pyright: ignore[reportAttributeAccessIssue]
if tail:
self._elements.append(tail)
@property
def head(self) -> _StructLike[_ChainHeadT, Any]:
"""
Get the head of the chain, i.e., the first structure.
:return: The head of the chain.
:rtype: _StructLike
"""
return self._elements[0]
@property
def tail(self) -> _StructLike[Any, _ChainTailT]:
"""
Get the tail of the chain, i.e., the last structure.
:return: The tail of the chain.
:rtype: _StructLike
"""
return self._elements[-1]
[docs]
def __size__(self, context: _ContextLike) -> int:
"""
Calculate the size of the chain in bytes.
:param context: The context for the calculation.
:type context: _ContextLike
:return: The size of the chain.
:rtype: int
"""
return self.head.__size__(context)
[docs]
def __type__(self) -> type | str | None:
"""
Get the type of the tail structure in the chain.
:return: The type of the tail structure.
:rtype: type
"""
return self.tail.__type__()
[docs]
@override
def __and__( # pyright: ignore[reportIncompatibleMethodOverride]
self, other: _StructLike[_IT, _OT]
) -> "Chain[_ChainHeadT, _OT]":
"""
Concatenate another structure to the end of the chain.
:param other: The structure to concatenate.
:type other: _StructLike
:return: The updated chain.
:rtype: Chain
"""
# fmt: off
self._elements.append(getstruct(other) or other) # pyright: ignore[reportArgumentType]
return self # pyright: ignore[reportReturnType]
[docs]
def __rand__(self, other: _StructLike[_IT, _OT]) -> "Chain[_ChainHeadT, _OT]":
"""
Concatenate another structure to the beginning of the chain.
:param other: The structure to concatenate.
:type other: _StructLike
:return: The updated chain.
:rtype: Chain
"""
return self.__and__(other)
[docs]
@override
def unpack_single(self, context: _ContextLike) -> _ChainTailT:
"""
Unpack a single data instance from the chain.
:param context: The context for the unpacking operation.
:type context: _ContextLike
:return: A memory view representing the unpacked data.
:rtype: memoryview
"""
data = None
for i, struct in enumerate(self._elements):
stream = BytesIO(data) if i != 0 else context[CTX_STREAM]
with (
WithoutContextVar(context, CTX_STREAM, stream),
WithoutContextVar(context, CTX_SEQ, False),
):
data = struct.__unpack__(context)
return data # pyright: ignore[reportReturnType]
[docs]
@override
def pack_single(self, obj: _IT, context: _ContextLike) -> None:
"""
Pack a single data instance into the chain.
:param Any obj: The data to pack into the chain.
:param context: The context for the packing operation.
:type context: _ContextLike
"""
count = len(self._elements)
for i, struct in enumerate(reversed(self._elements)):
if i == count - 1:
# Last struct, use the provided context stream directly
struct.__pack__(obj, context)
else:
# Not the last struct, use a temporary BytesIO object
with (
BytesIO() as stream,
WithoutContextVar(context, CTX_STREAM, stream),
WithoutContextVar(context, CTX_SEQ, False),
):
struct.__pack__(obj, context)
obj = stream.getvalue()
class Operator(Generic[_IT, _OT]):
"""Defines a custom opearator (user-defined)
It operates _infix_ between two statements and takes them as
agruments. For instance, the following example will return
an array of structures:
.. code-block:: python
from caterpillar.fields import uint16, Operator
from caterpillar.model import struct
M = Operator(lambda a, b: a[b*2])
@struct
class Format:
f1: uint16 /M/ 3
This class reserves the `/` operator. It is also possible to
use this class as a decorator on callable objects:
.. code-block:: python
@Operator
def M(a, b):
return a[b*2]
:param func: The function to be applied.
:type func: Callable[[Any, Any], _StructLike]
"""
def __init__(self, func: Callable[[Any, Any], _StructLike[_IT, _OT]]) -> None:
self.func: Callable[[Any, Any], _StructLike[_IT, _OT]] = func
def __truediv__(self, arg2: object) -> _StructLike[_IT, _OT]:
return self.func(arg2) # pyright: ignore[reportCallIssue]
def __rtruediv__(self, arg1: object) -> "Operator[_IT, _OT]":
return Operator(partial(self.func, arg1))
def __call__(self, arg1: object, arg2: object) -> _StructLike[_IT, _OT]:
return self.func(arg1, arg2)
# utility methods
[docs]
def get_args(
args: _ArgType | list[_ArgType],
context: _ContextLike,
) -> list[Any]:
"""
Get arguments for an instance.
:param args: Input arguments.
:type args: Any
:param context: The current operation context.
:type context: _ContextLike
:return: A list of processed arguments.
:rtype: list
"""
args = list(args) if isinstance(args, Iterable) else [args]
for i, argument in enumerate(args):
if callable(argument):
args[i] = argument(context)
return args
[docs]
def get_kwargs(kwargs: dict[str, _ArgType], context: _ContextLike) -> dict[str, Any]:
"""
Process a dictionary of keyword arguments, replacing callable values with their
results.
:param kwargs: Dictionary of keyword arguments.
:type kwargs: dict
:param context: The current operation context.
:type context: _ContextLike
:return: A new dictionary with processed keyword arguments.
:rtype: dict
"""
for key, value in kwargs.items():
if callable(value):
kwargs[key] = value(context)
return kwargs