# Copyright (C) MatrixEditor 2023-2024## This program is free software: you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program. If not, see <https://www.gnu.org/licenses/>.fromtypingimportUnion,Any,Type,Optional,Protocol,Iterable,runtime_checkabletry:fromcryptography.hazmat.primitives.ciphersimportCipher,algorithms,modesfromcryptography.hazmat.primitives.paddingimportPaddingContextexceptImportError:Cipher=algorithms=modes=PaddingContext=Anyfromcaterpillar.abcimport_StructLike,_ContextLikefromcaterpillar.abcimport_GreedyType,_ContextLambdafromcaterpillar.exceptionimportUnsupportedOperationfromcaterpillar.exceptionimportInvalidValueErrorfromcaterpillar.contextimportCTX_STREAM,Contextfrom.commonimportMemory,Bytesfrom._mixinimportget_args,get_kwargs@runtime_checkableclassPadding(Protocol):# pylint: disable=missing-class-docstringdefunpadder(self)->PaddingContext:"""Abstract method to get an unpadder for padding."""defpadder(self)->PaddingContext:"""Abstract method to get a padder for padding."""_ArgType=Union[_ContextLambda,Any]KwArgs=Context
[docs]classEncrypted(Memory):"""Struct that is able to encrypt/decrypt blocks of memory. :param length: Length of the encrypted data. :type length: Union[int, _GreedyType, _ContextLambda] :param algorithm: Encryption algorithm. :type algorithm: Type[algorithms.CipherAlgorithm] :param mode: Encryption mode. :type mode: Union[Type[modes.Mode], modes.Mode] :param padding: Padding scheme for encryption. :type padding: Union[Padding, Type[Padding]], optional :param algo_args: Additional arguments for the encryption algorithm. :type algo_args: Optional[Iterable[_ArgType]], optional :param mode_args: Additional arguments for the encryption mode. :type mode_args: Optional[Iterable[_ArgType]], optional :param padding_args: Additional arguments for the padding scheme. :type padding_args: Optional[Iterable[_ArgType]], optional :param post: Post-processing structure. """# REVISIT: this constructor looks uglydef__init__(self,length:Union[int,_GreedyType,_ContextLambda],algorithm:Type["algorithms.CipherAlgorithm"],mode:Union[Type["modes.Mode"],"modes.Mode"],padding:Union[Padding,Type[Padding]]=None,algo_args:Optional[Iterable[_ArgType]]=None,mode_args:Optional[Iterable[_ArgType]]=None,padding_args:Optional[Iterable[_ArgType]]=None,post:Optional[_StructLike]=None,)->None:ifCipherisNone:raiseUnsupportedOperation(("To use encryption with this framework, the module 'cryptography' ""is required! You can install it via pip or use the packaging ""extra 'crypto' that is available with this library."))super().__init__(length)self._algo=algorithmself._algo_args=algo_argsself._mode=modeself._mode_args=mode_argsself._padding=paddingself._padding_args=padding_argsself.post=post
[docs]defalgorithm(self,context:_ContextLike)->"algorithms.CipherAlgorithm":""" Get the encryption algorithm instance. :param context: The current operation context. :type context: _ContextLike :return: An instance of the encryption algorithm. :rtype: algorithms.CipherAlgorithm """returnself.get_instance(algorithms.CipherAlgorithm,self._algo,self._algo_args,context)
[docs]defmode(self,context:_ContextLike)->"modes.Mode":""" Get the encryption mode instance. :param context: The current operation context. :type context: _ContextLike :return: An instance of the encryption mode. :rtype: modes.Mode """returnself.get_instance(modes.Mode,self._mode,self._mode_args,context)
[docs]defpadding(self,context:_ContextLike)->Padding:""" Get the padding scheme instance. :param context: The current operation context. :type context: _ContextLike :return: An instance of the padding scheme. :rtype: Padding """returnself.get_instance(Padding,self._padding,self._padding_args,context)
[docs]defget_instance(self,type_:type,field:Any,args:Any,context:_ContextLambda)->Any:""" Get an instance of a specified type. :param type_: The desired type of the instance. :type type_: type :param field: The field or instance. :type field: Any :param args: Additional arguments for the instance. :type args: Any :param context: The current operation context. :type context: _ContextLambda :return: An instance of the specified type. :rtype: Any """ifisinstance(field,type_)ornotfield:returnfieldifisinstance(args,dict):args,kwargs=(),get_kwargs(args,context)else:args,kwargs=get_args(args,context),{}returnfield(*args,**kwargs)
[docs]defpack_single(self,obj:Any,context:_ContextLike)->None:""" Pack a single element. :param obj: The element to pack. :type obj: Any :param context: The current operation context. :type context: _ContextLike """cipher=Cipher(self.algorithm(context),self.mode(context))padding=self.padding(context)data=objifisinstance(obj,bytes)elsebytes(obj)ifpadding:padder=padding.padder()data=padder.update(data)+padder.finalize()encryptor=cipher.encryptor()super().pack_single(encryptor.update(data)+encryptor.finalize(),context)
[docs]defunpack_single(self,context:_ContextLike)->memoryview:""" Unpack a single element. :param context: The current operation context. :type context: _ContextLike :return: The unpacked element as a memoryview. :rtype: memoryview """value=super().unpack_single(context)cipher=Cipher(self.algorithm(context),self.mode(context))decryptor=cipher.decryptor()data=decryptor.update(bytes(value))+decryptor.finalize()padding=self.padding(context)ifpadding:unpadder=padding.unpadder()data=unpadder.update(data)+unpadder.finalize()returnmemoryview(data)
_KeyType=Union[str,bytes,int,_ContextLambda]classKeyCipher(Bytes):key:Union[str,bytes,int]"""The key that should be applied. It will be converted automatically to bytes if not given. """key_length:int"""Internal attribute to keep track of the key's length"""__slots__="key","key_length","is_lazy"def__init__(self,key:_KeyType,length:Union[_ContextLambda,int,None]=None)->None:super().__init__(lengthor...)self.key=self.is_lazy=self.key_length=Noneself.set_key(key)defset_key(self,key:_KeyType,context:_ContextLike=None)->None:ifcallable(key)andcontextisNone:# context lambda indicates the key will be computed at runtimeself.key=keyself.key_length=-1self.is_lazy=Truereturnmatchkey:casestr():self.key=key.encode()caseint():self.key=bytes([key])casebytes():self.key=keycase_:raiseInvalidValueError(f"Expected a valid key type, got {key!r}",context)self.key_length=len(self.key)self.is_lazy=Falsedefprocess(self,obj:bytes,context:_ContextLike)->bytes:length=len(obj)data=bytearray(length)key=self.keyifself.is_lazy:self.set_key(key(context),context)self._do_process(obj,data)returnbytes(data)def_do_process(self,src:bytes,dest:bytearray):raiseNotImplementedErrordefpack_single(self,obj:bytes,context:_ContextLike)->None:context[CTX_STREAM].write(self.process(obj,context))defunpack_single(self,context:_ContextLike)->bytes:obj:bytes=super().unpack_single(context)returnself.process(obj,context)classXor(KeyCipher):__slots__=()def_do_process(self,src:bytes,dest:bytearray):fori,einenumerate(src):dest[i]=e^self.key[i%self.key_length]classOr(KeyCipher):__slots__=()def_do_process(self,src:bytes,dest:bytearray):fori,einenumerate(src):dest[i]=e|self.key[i%self.key_length]classAnd(KeyCipher):__slots__=()def_do_process(self,src:bytes,dest:bytearray):fori,einenumerate(src):dest[i]=e&self.key[i%self.key_length]