# Copyright (C) MatrixEditor 2023-2026
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# pyright: reportPrivateUsage=false, reportAny=false, reportExplicitAny=false
from typing import Any, Protocol, runtime_checkable
from typing_extensions import override
from caterpillar.abc import (
_ContainsStruct,
_ContextLike,
_StructLike,
_LengthT,
)
from caterpillar.shared import getstruct, hasstruct
from caterpillar.fields._mixin import get_kwargs
from caterpillar.fields.common import Transformer, Bytes
@runtime_checkable
class _Compressor(Protocol):
"""
A protocol for compression algorithms, requiring two methods:
- compress: to compress data
- decompress: to decompress data
Any class implementing this protocol must define these methods.
"""
def compress(self, data: bytes, **kwds: Any) -> bytes:
"""
Compress the provided data.
:param data: The data to be compressed.
:type data: bytes
:param kwds: Additional keyword arguments specific to the compression method.
:return: The compressed data.
:rtype: bytes
"""
...
def decompress(self, data: bytes, **kwds: Any) -> bytes:
"""
Decompress the provided data.
:param data: The compressed data to be decompressed.
:type data: bytes
:param kwds: Additional keyword arguments specific to the decompression method.
:return: The decompressed data.
:rtype: bytes
"""
...
[docs]
class Compressed(Transformer[bytes, bytes, bytes, bytes]):
"""
A transformer class for handling compression and decompression of data.
This class takes a compression algorithm that implements the `_Compressor` protocol and
applies it to the data for compression or decompression operations. It allows for
additional customization through the `comp_kwargs` and `decomp_kwargs` parameters.
:param compressor: The compression algorithm that implements the _Compressor protocol.
:type compressor: _Compressor
:param struct: The struct to be compressed or decompressed, should implement _ContainsStruct or _StructLike.
:type struct: Union[_ContainsStruct, _StructLike]
:param comp_kwargs: Optional keyword arguments for the compression method.
:type comp_kwargs: Optional[dict]
:param decomp_kwargs: Optional keyword arguments for the decompression method.
:type decomp_kwargs: Optional[dict]
"""
def __init__(
self,
compressor: _Compressor,
struct: _StructLike[bytes, bytes] | _ContainsStruct[bytes, bytes],
comp_kwargs: dict[str, Any] | None = None,
decomp_kwargs: dict[str, Any] | None = None,
) -> None:
if hasstruct(struct):
struct = getstruct(struct)
super().__init__(struct)
self.compressor: _Compressor = compressor
self.comp_args: dict[str, Any] = comp_kwargs or {}
self.decomp_args: dict[str, Any] = decomp_kwargs or {}
[docs]
@override
def encode(self, obj: bytes, context: _ContextLike) -> bytes:
"""
Compress the input data using the provided compressor.
:param obj: The data to be compressed.
:type obj: bytes
:param context: Context information for compression (e.g., field-specific metadata).
:type context: _ContextLike
:return: The compressed data.
:rtype: bytes
"""
return self.compressor.compress(obj, **get_kwargs(self.comp_args, context))
[docs]
@override
def decode(self, parsed: bytes, context: _ContextLike) -> bytes:
"""
Decompress the input data using the provided compressor.
:param parsed: The compressed data to be decompressed.
:type parsed: bytes
:param context: Context information for decompression (e.g., field-specific metadata).
:type context: _ContextLike
:return: The decompressed data.
:rtype: bytes
"""
return self.compressor.decompress(
parsed, **get_kwargs(self.decomp_args, context)
)
_LengthTorStructT = _LengthT | _ContainsStruct[bytes, bytes] | _StructLike[bytes, bytes]
def compressed(
lib: _Compressor,
obj: _LengthTorStructT,
comp_kwargs: dict[str, Any] | None = None,
decomp_kwargs: dict[str, Any] | None = None,
) -> _StructLike[bytes, bytes]:
if callable(obj) or isinstance(obj, int) or obj is ...:
obj = Bytes(obj)
return Compressed(lib, obj, comp_kwargs, decomp_kwargs)
[docs]
def ZLibCompressed(
obj: _LengthTorStructT,
comp_kwargs: dict[str, Any] | None = None,
decomp_kwargs: dict[str, Any] | None = None,
) -> _StructLike[bytes, bytes]:
"""
Create a struct representing zlib compression.
"""
try:
import zlib
return compressed(zlib, obj, comp_kwargs, decomp_kwargs)
except ImportError:
raise NotImplementedError("Could not import zlib!")
[docs]
def Bz2Compressed(
obj: _LengthTorStructT,
comp_kwargs: dict[str, Any] | None = None,
decomp_kwargs: dict[str, Any] | None = None,
) -> _StructLike[bytes, bytes]:
"""
Create a struct representing bz2 compression.
"""
try:
import bz2
return compressed(bz2, obj, comp_kwargs, decomp_kwargs)
except ImportError:
raise NotImplementedError("Could not import bz2!")
[docs]
def LZMACompressed(
obj: _LengthTorStructT,
comp_kwargs: dict[str, Any] | None = None,
decomp_kwargs: dict[str, Any] | None = None,
) -> _StructLike[bytes, bytes]:
"""
Create a struct representing lzma compression.
"""
try:
import lzma
return compressed(lzma, obj, comp_kwargs, decomp_kwargs)
except ImportError:
raise NotImplementedError("Could not import lzma!")
[docs]
def LZOCompressed(
obj: _LengthTorStructT,
comp_kwargs: dict[str, Any] | None = None,
decomp_kwargs: dict[str, Any] | None = None,
) -> _StructLike[bytes, bytes]:
"""
Create a struct representing LZO compression.
"""
try:
# install package manually with pip install lzallright
import lzallright
return compressed(lzallright.LZOCompressor(), obj, comp_kwargs, decomp_kwargs)
except ImportError:
raise NotImplementedError("Could not import lzallright!")