Source code for icspacket.proto.iso_ses.tsdu

# This file is part of icspacket.
# Copyright (C) 2025-present  MatrixEditor @ github
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
TSDU (Transport Service Data Unit) Handling
-------------------------------------------

This module implements parsing and building logic for **TSDUs** as defined in
§8.1 of the X.225 specification.

A TSDU is the *unit of data* exchanged between the session layer and the
transport layer. It may contain:

- One SPDU mapped directly to a TSDU (Category 1)
- Multiple SPDUs concatenated (Category 0 + Category 2, or multiple Category 2s)
- In special cases, only a Category 0 SPDU

Concatenation rules are defined in §6.3.7:

- **Category 0** SPDUs may be mapped one-to-one onto a TSDU *or*
    concatenated with one or more Category 2 SPDUs.
- **Category 1** SPDUs are always mapped one-to-one to a TSDU (no
    concatenation).
- **Category 2** SPDUs are never mapped one-to-one — they appear only after
    a Category 0 SPDU in the same TSDU.

.. seealso::

    - §6.3.7 Concatenation rules
    - §8.1 TSDU structure
    - §8.2 SPDU structure
"""

from collections.abc import Iterator
from caterpillar.model import unpack

from icspacket.proto.iso_ses.spdu import LI, SPDU, SPDU_Category, SPDU_Codes, LI_Extended


[docs] class TSDU: """TSDU container for one or more concatenated SPDUs — X.225 §8.1 Usage: >>> tsdu = TSDU.from_octets(data) # Parse a raw TSDU from bytes >>> tsdu.build() # Encode back to raw bytes """ def __init__(self) -> None: self.__spdus = [] def __repr__(self) -> str: return f"TSDU(spdus={self.spdus})" def __len__(self) -> int: return len(self.spdus) def __iter__(self) -> Iterator[SPDU]: return iter(self.spdus)
[docs] def add_spdu(self, code: int, category: SPDU_Category | None = None) -> SPDU: """Create a new SPDU and append it to this TSDU. :param code: The SPDU code (see `SPDU_Codes`). :type code: int :param category: The SPDU category, defaults to False :type category: SPDU_Category, optional :return: The newly created SPDU object. :rtype: SPDU """ spdu = SPDU(code, category) self.spdus.append(spdu) return spdu
@property def spdus(self) -> list[SPDU]: """List of SPDUs contained in this TSDU. This property returns a direct reference to the internal list, so it can be iterated over or modified directly. """ return self.__spdus
[docs] @staticmethod def from_octets(octets: bytes): """Decode a raw TSDU from its octet representation. :param octets: The raw TSDU data as received from the transport layer. :type octets: bytes :raises ValueError: If a Category 0 SPDU has a non-zero length, or or if any SPDU length is invalid (extends beyond buffer). :return: A fully parsed TSDU object containing one or more SPDUs. :rtype: TSDU """ # fmt: off tsdu = TSDU() # --- Step 1: Detect Category 0 SPDU --- si = octets[0] # SPDU Identifier offset = 0 # Current parsing position in TSDU buffer index = 0 # SPDU sequence index within TSDU # Special case: Category 0 types (PLEASE_TOKENS, GIVE_TOKENS) if si in (SPDU_Codes.PLEASE_TOKENS_SPDU, SPDU_Codes.GIVE_TOKENS_SPDU): tsdu.spdus.append(SPDU(si, category=SPDU_Category.CATEGORY_0)) if octets[1] != 0: # length MUST be null raise ValueError("Invalid length of Category 0 SPDU") offset = 2 # Skip SI + LI index = 1 # --- Step 2: Parse remaining SPDUs --- while offset < len(octets): # Read LI for this SPDU (octets[offset+1:] = LI + rest) li = unpack(LI_Extended, octets[offset + 1 :]) if li > len(octets) or 2 + li > len(octets): raise ValueError( f"Invalid length of SPDU [{index}] at offset {offset}+1" ) # Parse SPDU without immediately decoding user information spdu = SPDU.from_octets(octets[offset:]) category = SPDU_Category.CATEGORY_1 if len(tsdu.spdus) and tsdu.spdus[0].category == SPDU_Category.CATEGORY_0: category = SPDU_Category.CATEGORY_2 spdu.category = category # Advance offset: # LI length (LI.octet_size) + LI value + SI (1 byte) # + any trailing user information bytes offset += li + 1 + LI.octet_size(li) + len(spdu.user_information) tsdu.spdus.append(spdu) # fmt: on return tsdu
[docs] def build(self) -> bytes: """Assemble the TSDU into its raw octet representation. :return: The encoded TSDU, produced by concatenating each SPDU's octet representation. :rtype: bytes """ return b"".join([spdu.build() for spdu in self.spdus])