# Copyright (C) MatrixEditor 2023-2024
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Protocol, Union, Callable, Optional
from caterpillar.abc import (
_ContextLike,
_StructLike,
_ContainsStruct,
getstruct,
hasstruct,
)
from ._mixin import get_kwargs
from .common import Transformer, Bytes
class _Compressor(Protocol):
def compress(self, data: bytes, **kwds) -> bytes:
"""Data should get compressed here"""
def decompress(self, data: bytes, **kwds) -> bytes:
"""Data should get decompressed here"""
[docs]
class Compressed(Transformer):
"""
A class representing a compressed transformer.
:param compressor: The compression algorithm.
:type compressor: _Compressor
:param length: The length of the transformer.
:type length: Union[_ContextLambda, Any]
"""
def __init__(
self,
compressor: _Compressor,
struct: Union[_ContainsStruct, _StructLike],
comp_kwargs: Optional[dict] = None,
decomp_kwargs: Optional[dict] = None,
) -> None:
"""
Initialize a Compressed instance.
:param compressor: The compression algorithm.
:type compressor: _Compressor
:param length: The length of the transformer.
:type length: Union[_ContextLambda, Any]
"""
if hasstruct(struct):
struct = getstruct(struct)
super().__init__(struct)
self.compressor = compressor
self.comp_args = comp_kwargs or {}
self.decomp_args = decomp_kwargs or {}
[docs]
def encode(self, obj: bytes, context: _ContextLike) -> bytes:
"""
Compress the input data.
:param obj: The input data to be compressed.
:type obj: bytes
:param context: The context information.
:type context: _ContextLike
:return: The compressed data.
:rtype: bytes
"""
return self.compressor.compress(obj, **get_kwargs(self.comp_args, context))
[docs]
def decode(self, parsed: bytes, context: _ContextLike) -> bytes:
"""
Decompress the input data.
:param parsed: The compressed data to be decompressed.
:type parsed: bytes
:param context: The context information.
:type context: _ContextLike
:return: The decompressed data.
:rtype: bytes
"""
return self.compressor.decompress(
parsed, **get_kwargs(self.decomp_args, context)
)
_LengthOrStruct = Union[_ContainsStruct, _StructLike, Callable, int]
def compressed(
lib: _Compressor,
obj: _LengthOrStruct,
comp_kwargs: Optional[dict] = None,
decomp_kwargs: Optional[dict] = None,
) -> _StructLike:
if callable(obj) or isinstance(obj, int) or obj is ...:
obj = Bytes(obj)
return Compressed(lib, obj, comp_kwargs, decomp_kwargs)
try:
import zlib
[docs]
def ZLibCompressed(
obj: _LengthOrStruct,
comp_kwargs: Optional[dict] = None,
decomp_kwargs: Optional[dict] = None,
):
"""
Create a struct representing zlib compression.
"""
return compressed(zlib, obj, comp_kwargs, decomp_kwargs)
except ImportError:
ZLibCompressed = None
try:
import bz2
[docs]
def Bz2Compressed(
obj: _LengthOrStruct,
comp_kwargs: Optional[dict] = None,
decomp_kwargs: Optional[dict] = None,
):
"""
Create a struct representing bz2 compression.
"""
return compressed(bz2, obj, comp_kwargs, decomp_kwargs)
except ImportError:
Bz2Compressed = None
try:
import lzma
[docs]
def LZMACompressed(
obj: _LengthOrStruct,
comp_kwargs: Optional[dict] = None,
decomp_kwargs: Optional[dict] = None,
):
"""
Create a struct representing lzma compression.
"""
return compressed(lzma, obj, comp_kwargs, decomp_kwargs)
except ImportError:
LZMACompressed = None
try:
# install package manuall with pip install lzallright
import lzallright
[docs]
def LZOCompressed(
obj: _LengthOrStruct,
comp_kwargs: Optional[dict] = None,
decomp_kwargs: Optional[dict] = None,
):
"""
Create a struct representing LZO compression.
"""
return compressed(lzallright.LZOCompressor(), obj, comp_kwargs, decomp_kwargs)
except ImportError:
LZOCompressed = None