# Copyright (C) MatrixEditor 2023-2024## This program is free software: you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program. If not, see <https://www.gnu.org/licenses/>.fromioimportBytesIOfromtypesimportEllipsisTypefromtypingimportAny,Collection,List,Union,Iterable,Callablefromfunctoolsimportpartialfromcaterpillar.abcimport(_ContextLike,_StructLike,_ContextLambda,_Switch,getstruct,)fromcaterpillar.byteorderimportByteOrder,byteorderfromcaterpillar.optionsimportFlagfromcaterpillar.contextimportCTX_SEQ,CTX_STREAMfromcaterpillar._commonimportunpack_seq,pack_seq,WithoutContextVarfrom._baseimportField
[docs]classFieldMixin:"""A simple mixin to support operators used to create :class:`Field` instances."""def__or__(self,flag:Flag)->Field:"""Creates a field *with* the given flag."""returnField(self,byteorder(self))|flagdef__xor__(self,flag:Flag)->Field:"""Creates a field *without* the given flag."""returnField(self,byteorder(self))^flagdef__matmul__(self,offset:Union[_ContextLambda,int])->Field:"""Creates a field that should start at the given offset."""returnField(self,byteorder(self))@offsetdef__getitem__(self,dim:Union[_ContextLambda,int,EllipsisType])->Field:"""Returns a sequenced field."""returnField(self,byteorder(self))[dim]def__rshift__(self,switch:Union[_Switch,dict])->Field:"""Inserts switch options into the new field"""returnField(self,byteorder(self))>>switchdef__floordiv__(self,condition:Union[_ContextLambda,bool])->Field:"""Returns a field with the given condition"""returnField(self,byteorder(self))//conditiondef__set_byteorder__(self,order:ByteOrder)->Field:"""Returns a field with the given byteorder"""returnField(self,order=order)def__rsub__(self,bits:Union[_ContextLambda,int])->Field:"""Returns a field with the given bit count"""returnField(self,byteorder(self),bits=bits)def__and__(self,other:_StructLike)->"Chain":"""Returns a chain with the next element added at the end"""ifisinstance(other,Chain):returnother&selfreturnChain(self,other)
[docs]classFieldStruct(FieldMixin):""" A mix-in class combining the behavior of _StructLike with additional functionality for packing and unpacking structured data. """__slots__={"__byteorder__":(""" An internal field used to measure the byte order of this struct. Note that this field will be used during processing only and not during parsing or building data. In addition, the actual byte order should be retrieved using the :class:`Field` instance within the context. """),"__bits__":"TBD",}
[docs]defpack_single(self,obj:Any,context:_ContextLike)->None:""" Abstract method to pack a single element. :param obj: The element to pack. :type obj: Any :param context: The current operation context. :type context: _ContextLike :raises NotImplementedError: This method must be implemented by subclasses. """raiseNotImplementedError
[docs]defunpack_single(self,context:_ContextLike)->Any:""" Abstract method to unpack a single element. :param context: The current operation context. :type context: _ContextLike :raises NotImplementedError: This method must be implemented by subclasses. :return: The unpacked element. """raiseNotImplementedError
[docs]defpack_seq(self,seq:Collection,context:_ContextLike)->None:""" Pack a sequence of elements using the provided context. :param seq: The sequence of elements to pack. :type seq: Iterable :param context: The current operation context. :type context: _ContextLike """pack_seq(seq,context,self.pack_single)
[docs]defunpack_seq(self,context:_ContextLike)->List[Any]:""" Unpack a sequence of elements using the provided context. :param context: The current operation context. :type context: _ContextLike :return: The list of unpacked elements. """returnunpack_seq(context,self.unpack_single)
[docs]def__pack__(self,obj:Any,context:_ContextLike)->None:""" Pack data based on whether the field is sequential or not. :param obj: The data to pack. :type obj: Any :param context: The current operation context. :type context: _ContextLike """(self.pack_singleifnotcontext[CTX_SEQ]elseself.pack_seq)(obj,context)
[docs]def__unpack__(self,context:_ContextLike)->Any:""" Unpack data based on whether the field is sequential or not. :param context: The current operation context. :type context: _ContextLike :return: The unpacked data. """ifcontext[CTX_SEQ]:returnself.unpack_seq(context)returnself.unpack_single(context)
[docs]def__repr__(self)->str:""" String representation of the FieldStruct instance. :return: A string representation. """returnf"<{self.__class__.__name__}>"
[docs]classChain(FieldStruct):""" Represents a chain of structures where each structure in the chain is linked to the next one, forming a sequence. :param initial: The initial structure in the chain. :param structs: Additional structures to be added to the chain. The chain allows packing and unpacking data through its elements in sequence. .. note:: - Unpacking travels from the head to the tail. - Packing travels from the tail to the head. """__slots__=("_elements",)
[docs]def__init__(self,initial:_StructLike,*structs:_StructLike)->None:# start -> next -> next -> next -> done | unpack# Y# done <- previous <- previous <- start | packself._elements=[getstruct(initial,initial)]self._elements+=list(map(lambdax:getstruct(x,x),structs))
@propertydefhead(self)->_StructLike:""" Get the head of the chain, i.e., the first structure. :return: The head of the chain. :rtype: _StructLike """returnself._elements[0]@propertydeftail(self)->_StructLike:""" Get the tail of the chain, i.e., the last structure. :return: The tail of the chain. :rtype: _StructLike """returnself._elements[-1]
[docs]def__size__(self,context:_ContextLike)->int:""" Calculate the size of the chain in bytes. :param context: The context for the calculation. :type context: _ContextLike :return: The size of the chain. :rtype: int """returnself.head.__size__(context)
[docs]def__type__(self)->type:""" Get the type of the tail structure in the chain. :return: The type of the tail structure. :rtype: type """returnself.tail.__type__()
[docs]def__and__(self,other:_StructLike)->"Chain":""" Concatenate another structure to the end of the chain. :param other: The structure to concatenate. :type other: _StructLike :return: The updated chain. :rtype: Chain """self._elements.append(getstruct(other,other))returnself
[docs]def__rand__(self,other:_StructLike)->"Chain":""" Concatenate another structure to the beginning of the chain. :param other: The structure to concatenate. :type other: _StructLike :return: The updated chain. :rtype: Chain """returnself.__and__(other)
[docs]defunpack_single(self,context:_ContextLike)->memoryview:""" Unpack a single data instance from the chain. :param context: The context for the unpacking operation. :type context: _ContextLike :return: A memory view representing the unpacked data. :rtype: memoryview """data=Nonefori,structinenumerate(self._elements):stream=BytesIO(data)ifi!=0elsecontext[CTX_STREAM]with(WithoutContextVar(context,CTX_STREAM,stream),WithoutContextVar(context,CTX_SEQ,False),):data=struct.__unpack__(context)returndata
[docs]defpack_single(self,obj:Any,context:_ContextLike)->None:""" Pack a single data instance into the chain. :param Any obj: The data to pack into the chain. :param context: The context for the packing operation. :type context: _ContextLike """count=len(self._elements)fori,structinenumerate(reversed(self._elements)):ifi==count-1:# Last struct, use the provided context stream directlystruct.__pack__(obj,context)else:# Not the last struct, use a temporary BytesIO objectwith(BytesIO()asstream,WithoutContextVar(context,CTX_STREAM,stream),WithoutContextVar(context,CTX_SEQ,False),):struct.__pack__(obj,context)obj=stream.getvalue()
classOperator:"""Defines a custom opearator (user-defined) It operates _infix_ between two statements and takes them as agruments. For instance, the following example will return an array of structures: .. code-block:: python from caterpillar.fields import uint16, _infix_ from caterpillar.model import struct M = _infix_(lambda a, b: a[b*2]) @struct class Format: f1: uint16 /M/ 3 This class reserves the `/` operator. It is also possible to use this class as a decorator on callable objects: .. code-block:: python @_infix_ def M(a, b): return a[b*2] :param func: The function to be applied. :type func: Callable[[Any, Any], _StructLike] """def__init__(self,func:Callable[[Any,Any],_StructLike])->None:self.func=funcdef__truediv__(self,arg2)->_StructLike:returnself.func(arg2)def__rtruediv__(self,arg1)->"_infix_":returnOperator(partial(self.func,arg1))def__call__(self,arg1,arg2)->_StructLike:returnself.func(arg1,arg2)# utility methodsdefget_args(args:Any,context:_ContextLike)->List[Any]:""" Get arguments for an instance. :param args: Input arguments. :type args: Any :param context: The current operation context. :type context: _ContextLike :return: A list of processed arguments. :rtype: list """args=list(args)ifisinstance(args,Iterable)else[args]fori,argumentinenumerate(args):ifcallable(argument):args[i]=argument(context)returnargsdefget_kwargs(kwargs:dict,context:_ContextLike)->dict:""" Process a dictionary of keyword arguments, replacing callable values with their results. :param kwargs: Dictionary of keyword arguments. :type kwargs: dict :param context: The current operation context. :type context: _ContextLike :return: A new dictionary with processed keyword arguments. :rtype: dict """forkey,valueinkwargs.items():ifcallable(value):kwargs[key]=value(context)returnkwargs