Source code for caterpillar.fields._mixin

# Copyright (C) MatrixEditor 2023-2024
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
import inspect

from io import BytesIO
from typing import Any, List, Union, Optional, Iterable, Callable
from abc import ABC, abstractmethod
from functools import partial

from caterpillar.abc import _ContextLike, _StructLike, _ContextLambda, _Switch
from caterpillar.abc import _ContainsStruct, getstruct
from caterpillar.byteorder import ByteOrder, byteorder
from caterpillar.options import Flag
from caterpillar.context import CTX_SEQ, CTX_STREAM
from caterpillar._common import unpack_seq, pack_seq, WithoutContextVar

from ._base import Field


[docs] class FieldMixin: """A simple mixin to support operators used to create :class:`Field` instances.""" def __or__(self, flag: Flag) -> Field: """Creates a field *with* the given flag.""" return Field(self, byteorder(self)) | flag def __xor__(self, flag: Flag) -> Field: """Creates a field *without* the given flag.""" return Field(self, byteorder(self)) ^ flag def __matmul__(self, offset: Union[_ContextLambda, int]) -> Field: """Creates a field that should start at the given offset.""" return Field(self, byteorder(self)) @ offset def __getitem__(self, dim: Union[_ContextLambda, int]) -> Field: """Returns a sequenced field.""" return Field(self, byteorder(self))[dim] def __rshift__(self, switch: Union[_Switch, dict]) -> Field: """Inserts switch options into the new field""" return Field(self, byteorder(self)) >> switch def __floordiv__(self, condition: Union[_ContextLambda, bool]) -> Field: """Returns a field with the given condition""" return Field(self, byteorder(self)) // condition def __set_byteorder__(self, order: ByteOrder) -> Field: """Returns a field with the given byteorder""" return Field(self, order=order) def __rsub__(self, bits: Union[_ContextLambda, int]) -> Field: """Returns a field with the given bit count""" return Field(self, byteorder(self), bits=bits) def __and__(self, other: _StructLike) -> "Chain": """Returns a chain with the next element added at the end""" if isinstance(other, Chain): return other & self return Chain(self, other)
[docs] class FieldStruct(FieldMixin): """ A mix-in class combining the behavior of _StructLike with additional functionality for packing and unpacking structured data. """ __slots__ = { "__byteorder__": ( """ An internal field used to measure the byte order of this struct. Note that this field will be used during processing only and not during parsing or building data. In addition, the actual byte order should be retrieved using the :class:`Field` instance within the context. """ ), "__bits__": "TBD", }
[docs] def pack_single(self, obj: Any, context: _ContextLike) -> None: """ Abstract method to pack a single element. :param obj: The element to pack. :type obj: Any :param context: The current operation context. :type context: _ContextLike :raises NotImplementedError: This method must be implemented by subclasses. """ raise NotImplementedError
[docs] def unpack_single(self, context: _ContextLike) -> Any: """ Abstract method to unpack a single element. :param context: The current operation context. :type context: _ContextLike :raises NotImplementedError: This method must be implemented by subclasses. :return: The unpacked element. """ raise NotImplementedError
[docs] def pack_seq(self, seq: Iterable, context: _ContextLike) -> None: """ Pack a sequence of elements using the provided context. :param seq: The sequence of elements to pack. :type seq: Iterable :param context: The current operation context. :type context: _ContextLike """ pack_seq(seq, context, self.pack_single)
[docs] def unpack_seq(self, context: _ContextLike) -> List[Any]: """ Unpack a sequence of elements using the provided context. :param context: The current operation context. :type context: _ContextLike :return: The list of unpacked elements. """ return unpack_seq(context, self.unpack_single)
[docs] def __pack__(self, obj: Any, context: _ContextLike) -> None: """ Pack data based on whether the field is sequential or not. :param obj: The data to pack. :type obj: Any :param context: The current operation context. :type context: _ContextLike """ (self.pack_single if not context[CTX_SEQ] else self.pack_seq)(obj, context)
[docs] def __unpack__(self, context: _ContextLike) -> Any: """ Unpack data based on whether the field is sequential or not. :param context: The current operation context. :type context: _ContextLike :return: The unpacked data. """ if context[CTX_SEQ]: return self.unpack_seq(context) return self.unpack_single(context)
[docs] def __repr__(self) -> str: """ String representation of the FieldStruct instance. :return: A string representation. """ return f"<{self.__class__.__name__}>"
[docs] class Chain(FieldStruct): """ Represents a chain of structures where each structure in the chain is linked to the next one, forming a sequence. :param initial: The initial structure in the chain. :param structs: Additional structures to be added to the chain. The chain allows packing and unpacking data through its elements in sequence. .. note:: - Unpacking travels from the head to the tail. - Packing travels from the tail to the head. """ __slots__ = ("_elements",)
[docs] def __init__(self, initial: _StructLike, *structs: _StructLike) -> None: # start -> next -> next -> next -> done | unpack # Y # done <- previous <- previous <- start | pack self._elements = [getstruct(initial, initial)] self._elements += list(map(lambda x: getstruct(x, x), structs))
@property def head(self) -> _StructLike: """ Get the head of the chain, i.e., the first structure. :return: The head of the chain. :rtype: _StructLike """ return self._elements[0] @property def tail(self) -> _StructLike: """ Get the tail of the chain, i.e., the last structure. :return: The tail of the chain. :rtype: _StructLike """ return self._elements[-1]
[docs] def __size__(self, context: _ContextLike) -> int: """ Calculate the size of the chain in bytes. :param context: The context for the calculation. :type context: _ContextLike :return: The size of the chain. :rtype: int """ return self.head.__size__(context)
[docs] def __type__(self) -> type: """ Get the type of the tail structure in the chain. :return: The type of the tail structure. :rtype: type """ return self.tail.__type__()
[docs] def __and__(self, other: _StructLike) -> "Chain": """ Concatenate another structure to the end of the chain. :param other: The structure to concatenate. :type other: _StructLike :return: The updated chain. :rtype: Chain """ self._elements.append(getstruct(other, other)) return self
[docs] def __rand__(self, other: _StructLike) -> "Chain": """ Concatenate another structure to the beginning of the chain. :param other: The structure to concatenate. :type other: _StructLike :return: The updated chain. :rtype: Chain """ return self.__and__(other)
[docs] def unpack_single(self, context: _ContextLike) -> memoryview: """ Unpack a single data instance from the chain. :param context: The context for the unpacking operation. :type context: _ContextLike :return: A memory view representing the unpacked data. :rtype: memoryview """ data = None for i, struct in enumerate(self._elements): stream = BytesIO(data) if i != 0 else context[CTX_STREAM] with ( WithoutContextVar(context, CTX_STREAM, stream), WithoutContextVar(context, CTX_SEQ, False), ): data = struct.__unpack__(context) return data
[docs] def pack_single(self, obj: Any, context: _ContextLike) -> None: """ Pack a single data instance into the chain. :param Any obj: The data to pack into the chain. :param context: The context for the packing operation. :type context: _ContextLike """ count = len(self._elements) for i, struct in enumerate(reversed(self._elements)): if i == count - 1: # Last struct, use the provided context stream directly struct.__pack__(obj, context) else: # Not the last struct, use a temporary BytesIO object with ( BytesIO() as stream, WithoutContextVar(context, CTX_STREAM, stream), WithoutContextVar(context, CTX_SEQ, False), ): struct.__pack__(obj, context) obj = stream.getvalue()
class Operator: """Defines a custom opearator (user-defined) It operates _infix_ between two statements and takes them as agruments. For instance, the following example will return an array of structures: .. code-block:: python from caterpillar.fields import uint16, _infix_ from caterpillar.model import struct M = _infix_(lambda a, b: a[b*2]) @struct class Format: f1: uint16 /M/ 3 This class reserves the `/` operator. It is also possible to use this class as a decorator on callable objects: .. code-block:: python @_infix_ def M(a, b): return a[b*2] :param func: The function to be applied. :type func: Callable[[Any, Any], _StructLike] """ def __init__(self, func: Callable[[Any, Any], _StructLike]) -> None: self.func = func def __truediv__(self, arg2) -> _StructLike: return self.func(arg2) def __rtruediv__(self, arg1) -> "_infix_": return Operator(partial(self.func, arg1)) def __call__(self, arg1, arg2) -> _StructLike: return self.func(arg1, arg2) # utility methods def get_args(args: Any, context: _ContextLike) -> List[Any]: """ Get arguments for an instance. :param args: Input arguments. :type args: Any :param context: The current operation context. :type context: _ContextLike :return: A list of processed arguments. :rtype: list """ args = list(args) if isinstance(args, Iterable) else [args] for i, argument in enumerate(args): if callable(argument): args[i] = argument(context) return args def get_kwargs(kwargs: dict, context: _ContextLike) -> dict: """ Process a dictionary of keyword arguments, replacing callable values with their results. :param kwargs: Dictionary of keyword arguments. :type kwargs: dict :param context: The current operation context. :type context: _ContextLike :return: A new dictionary with processed keyword arguments. :rtype: dict """ for key, value in kwargs.items(): if callable(value): kwargs[key] = value(context) return kwargs