Skip to content

Commit

Permalink
improve detection of overlapping parameters during encoding
Browse files Browse the repository at this point in the history
this code should now be bullet-proof: instead of complaing only if at
least of the bits which are supposed to be set by a parameter in the
coded message is already set, used bits are now explicitly tracked via
a bit mask in the `EncodeState`. As a result, if e.g. an all-zero
integer parameter overrides another all-zero integer parameer we will
now get an overlapping objects warning as we are supposed to.

Signed-off-by: Andreas Lauser <[email protected]>
Signed-off-by: Alexander Walz <[email protected]>
  • Loading branch information
andlaus committed Apr 26, 2024
1 parent 3e5ce68 commit 4f17220
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 53 deletions.
4 changes: 3 additions & 1 deletion odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ def encode_into_pdu(self, physical_value: Optional[ParameterValue],
# position directly after the structure and let
# EncodeState add the padding as needed.
encode_state.cursor_byte_position = encode_state.origin_byte_position + self.byte_size
encode_state.emplace_bytes(b'', "<PADDING>")
# Padding bytes needed. these count as "used".
encode_state.coded_message += b"\x00" * (self.byte_size - actual_len)
encode_state.used_mask += b"\xff" * (self.byte_size - actual_len)

# encode the length- and table keys. This cannot be done above
# because we allow these to be defined implicitly (i.e. they
Expand Down
4 changes: 2 additions & 2 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeSt
f"The value {repr(physical_value)} of type {type(physical_value).__name__}"
f" is not a valid.")

internal_val = self.convert_physical_to_internal(physical_value)
self.diag_coded_type.encode_into_pdu(internal_val, encode_state)
internal_value = self.convert_physical_to_internal(physical_value)
self.diag_coded_type.encode_into_pdu(internal_value, encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""
Expand Down
124 changes: 88 additions & 36 deletions odxtools/encodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class EncodeState:
#: payload that has been constructed so far
coded_message: bytearray

#: the bits of the payload that are used
used_mask: bytearray = field(default_factory=bytearray)

#: The absolute position in bytes from the beginning of the PDU to
#: which relative positions refer to, e.g., the beginning of the
#: structure.
Expand Down Expand Up @@ -53,45 +56,60 @@ class EncodeState:
#: (needed for MinMaxLengthType, EndOfPduField, etc.)
is_end_of_pdu: bool = True

def __post_init__(self) -> None:
# if a coded message has been specified, but no used_mask, we
# assume that all of the bits of the coded message are
# currently used.
if len(self.coded_message) > len(self.used_mask):
self.used_mask += b'\xff' * (len(self.coded_message) - len(self.used_mask))
if len(self.coded_message) < len(self.used_mask):
odxraise(f"The specified bit mask 0x{self.used_mask.hex()} for used bits "
f"is not suitable for representing the coded_message "
f"0x{self.coded_message.hex()}")
self.used_mask = self.used_mask[:len(self.coded_message)]

def emplace_atomic_value(
self,
*,
internal_value: AtomicOdxType,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
used_mask: Optional[bytes],
) -> None:
"""Convert the internal_value to bytes and emplace this into the PDU"""

raw_value: AtomicOdxType

# Check that bytes and strings actually fit into the bit length
if base_data_type == DataType.A_BYTEFIELD:
if isinstance(internal_value, bytearray):
internal_value = bytes(internal_value)
if not isinstance(internal_value, bytes):
odxraise()
if 8 * len(internal_value) > bit_length:
raise EncodeError(f"The bytefield {internal_value.hex()} is too large "
f"({len(internal_value)} bytes)."
f" The maximum length is {bit_length//8}.")
if base_data_type == DataType.A_ASCIISTRING:
raw_value = internal_value
elif base_data_type == DataType.A_ASCIISTRING:
if not isinstance(internal_value, str):
odxraise()

# The spec says ASCII, meaning only byte values 0-127.
# But in practice, vendors use iso-8859-1, aka latin-1
# reason being iso-8859-1 never fails since it has a valid
# character mapping for every possible byte sequence.
internal_value = internal_value.encode("iso-8859-1")
raw_value = internal_value.encode("iso-8859-1")

if 8 * len(internal_value) > bit_length:
if 8 * len(raw_value) > bit_length:
raise EncodeError(f"The string {repr(internal_value)} is too large."
f" The maximum number of characters is {bit_length//8}.")
elif base_data_type == DataType.A_UTF8STRING:
if not isinstance(internal_value, str):
odxraise()

internal_value = internal_value.encode("utf-8")
raw_value = internal_value.encode("utf-8")

if 8 * len(internal_value) > bit_length:
if 8 * len(raw_value) > bit_length:
raise EncodeError(f"The string {repr(internal_value)} is too large."
f" The maximum number of bytes is {bit_length//8}.")

Expand All @@ -100,11 +118,13 @@ def emplace_atomic_value(
odxraise()

text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.encode(text_encoding)
raw_value = internal_value.encode(text_encoding)

if 8 * len(internal_value) > bit_length:
if 8 * len(raw_value) > bit_length:
raise EncodeError(f"The string {repr(internal_value)} is too large."
f" The maximum number of characters is {bit_length//16}.")
else:
raw_value = internal_value

# If the bit length is zero, return empty bytes
if bit_length == 0:
Expand All @@ -125,46 +145,78 @@ def emplace_atomic_value(
left_pad = f"p{padding}" if padding > 0 else ""

# actually encode the value
coded = bitstruct.pack(f"{left_pad}{char}{bit_length}", internal_value)

# apply byte order for numeric objects
coded = bitstruct.pack(f"{left_pad}{char}{bit_length}", raw_value)

# create the raw mask of used bits for numeric objects
used_mask_raw = used_mask
if base_data_type in [DataType.A_INT32, DataType.A_UINT32
] and (self.cursor_bit_position != 0 or
(self.cursor_bit_position + bit_length) % 8 != 0):
if used_mask is None:
tmp = (1 << bit_length) - 1
else:
tmp = int.from_bytes(used_mask, "big")
tmp <<= self.cursor_bit_position

used_mask_raw = tmp.to_bytes((self.cursor_bit_position + bit_length + 7) // 8, "big")

# apply byte order to numeric objects
if not is_highlow_byte_order and base_data_type in [
DataType.A_INT32,
DataType.A_UINT32,
DataType.A_FLOAT32,
DataType.A_FLOAT64,
DataType.A_INT32, DataType.A_UINT32, DataType.A_FLOAT32, DataType.A_FLOAT64
]:
coded = coded[::-1]

self.emplace_bytes(coded)
if used_mask_raw is not None:
used_mask_raw = used_mask_raw[::-1]

self.cursor_bit_position = 0
self.emplace_bytes(coded, obj_used_mask=used_mask_raw)

def emplace_bytes(self,
new_data: bytes,
obj_name: Optional[str] = None,
obj_used_mask: Optional[bytes] = None) -> None:
if self.cursor_bit_position != 0:
odxraise("EncodeState.emplace_bytes can only be called "
"for a bit position of 0!", RuntimeError)

def emplace_bytes(self, new_data: bytes, obj_name: Optional[str] = None) -> None:
pos = self.cursor_byte_position

# Make blob longer if necessary
min_length = pos + len(new_data)
if len(self.coded_message) < min_length:
self.coded_message.extend([0] * (min_length - len(self.coded_message)))

for i in range(len(new_data)):
# insert new byte. this is pretty hacky: it will fail if
# the value to be inserted is bitwise "disjoint" from the
# value which is already in the PDU...
if self.coded_message[pos + i] & new_data[i] != 0:
if obj_name is not None:
warnings.warn(
f"'{obj_name}' overlaps with another object (bits to be set are already set)",
OdxWarning,
stacklevel=1,
)
else:
pad = b'\x00' * (min_length - len(self.coded_message))
self.coded_message += pad
self.used_mask += pad

if obj_used_mask is None:
# Happy path for when no obj_used_mask has been
# specified. In this case we assume that all bits of the
# new data to be emplaced are used.
n = len(new_data)

if self.used_mask[pos:pos + n] != b'\x00' * n:
warnings.warn(
f"Overlapping objects detected in between bytes {pos} and "
f"{pos+n}",
OdxWarning,
stacklevel=1,
)
self.coded_message[pos:pos + n] = new_data
self.used_mask[pos:pos + n] = b'\xff' * n
else:
# insert data the hard way, i.e. we have to look at each
# individual byte to determine if it has already been used
# somewhere else (it would be nice if bytearrays supported
# bitwise operations!)
for i in range(len(new_data)):
if self.used_mask[pos + i] & obj_used_mask[i] != 0:
warnings.warn(
"Object overlap (bits to be set are already set)",
"Overlapping objects detected",
OdxWarning,
stacklevel=1,
)

self.coded_message[pos + i] |= new_data[i]
self.coded_message[pos + i] |= new_data[i]
self.used_mask[pos + i] |= obj_used_mask[i]

self.cursor_byte_position += len(new_data)
self.cursor_bit_position = 0
2 changes: 2 additions & 0 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta

encode_state.emplace_atomic_value(
internal_value=byte_length,
used_mask=None,
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32,
is_highlow_byte_order=self.is_highlow_byte_order,
)

encode_state.emplace_atomic_value(
internal_value=internal_value,
used_mask=None,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand Down
1 change: 1 addition & 0 deletions odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta
orig_cursor = encode_state.cursor_byte_position
encode_state.emplace_atomic_value(
internal_value=internal_value,
used_mask=None,
bit_length=8 * data_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand Down
2 changes: 1 addition & 1 deletion odxtools/parameterinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def parameter_info(param_list: Iterable[Parameter], quoted_names: bool = False)
of.write(f"{q}{param.short_name}{q}: <matches request>\n")
continue
elif isinstance(param, NrcConstParameter):
of.write(f"{q}{param.short_name}{q}: NRC_const; choices = {param.coded_values}\n")
of.write(f"{q}{param.short_name}{q}: const; choices = {param.coded_values}\n")
continue
elif isinstance(param, ReservedParameter):
of.write(f"{q}{param.short_name}{q}: <reserved>\n")
Expand Down
1 change: 1 addition & 0 deletions odxtools/paramlengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta

encode_state.emplace_atomic_value(
internal_value=internal_value,
used_mask=None,
bit_length=bit_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand Down
45 changes: 40 additions & 5 deletions odxtools/standardlengthtype.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Optional
from typing import Literal, Optional

from typing_extensions import override

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
from .encodestate import EncodeState
from .exceptions import odxassert, odxraise
from .exceptions import odxassert, odxraise, odxrequire
from .odxtypes import AtomicOdxType, DataType


Expand All @@ -22,6 +22,10 @@ class StandardLengthType(DiagCodedType):
def dct_type(self) -> DctType:
return "STANDARD-LENGTH-TYPE"

@property
def is_condensed(self) -> bool:
return self.is_condensed_raw is True

def __post_init__(self) -> None:
if self.bit_mask is not None:
maskable_types = (DataType.A_UINT32, DataType.A_INT32, DataType.A_BYTEFIELD)
Expand All @@ -30,10 +34,41 @@ def __post_init__(self) -> None:
'Can not apply a bit_mask on a value of type {self.base_data_type}',
)

def __get_raw_mask(self, internal_value: AtomicOdxType) -> Optional[bytes]:
"""Returns a byte field where all bits that are used by the
DiagCoded type are set and all unused ones are not set.
If `None` is returned, all bits are used.
"""
if self.bit_mask is None:
return None

if self.is_condensed:
odxraise("Condensed bit masks are not yet supported", NotImplementedError)
return

endianness: Literal["little", "big"] = "big"
if not self.is_highlow_byte_order and self.base_data_type in [
DataType.A_INT32, DataType.A_UINT32, DataType.A_FLOAT32, DataType.A_FLOAT64
]:
# TODO (?): Technically, little endian A_UNICODE2STRING
# objects require a byte swap for each 16 bit letter, and
# thus also for the mask. I somehow doubt that this has
# been anticipated by the standard, though...
endianness = "little"

sz: int
if isinstance(internal_value, (bytes, bytearray)):
sz = len(internal_value)
else:
sz = (odxrequire(self.get_static_bit_length()) + 7) // 8

return self.bit_mask.to_bytes(sz, endianness)

def __apply_mask(self, internal_value: AtomicOdxType) -> AtomicOdxType:
if self.bit_mask is None:
return internal_value
if self.is_condensed_raw is True:
if self.is_condensed:
odxraise("Serialization of condensed bit mask is not supported", NotImplementedError)
return
if isinstance(internal_value, int):
Expand All @@ -53,10 +88,10 @@ def get_static_bit_length(self) -> Optional[int]:
def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeState) -> None:
encode_state.emplace_atomic_value(
internal_value=self.__apply_mask(internal_value),
used_mask=self.__get_raw_mask(internal_value),
bit_length=self.bit_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)
is_highlow_byte_order=self.is_highlow_byte_order)

@override
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
Expand Down
18 changes: 10 additions & 8 deletions tests/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def test_encode_overlapping(self) -> None:
parameters=NamedItemList([param1, param2, param3]),
byte_size=None,
)
self.assertEqual(req.encode(), bytearray([0x12, 0x34, 0x56]))
self.assertEqual(req.encode().hex(), "123456")
self.assertEqual(req.get_static_bit_length(), 24)

def _create_request(self, parameters: List[Parameter]) -> Request:
Expand All @@ -336,12 +336,12 @@ def _create_request(self, parameters: List[Parameter]) -> Request:

def test_bit_mask(self) -> None:
inner_dct = StandardLengthType(
bit_mask=0x0ff0,
bit_mask=0x3fc,
base_data_type=DataType.A_UINT32,
base_type_encoding=None,
is_highlow_byte_order_raw=None,
is_condensed_raw=None,
bit_length=16)
bit_length=14)
outer_dct = StandardLengthType(
bit_mask=0xf00f,
base_data_type=DataType.A_UINT32,
Expand Down Expand Up @@ -395,7 +395,7 @@ def test_bit_mask(self) -> None:
long_name=None,
description=None,
byte_position=0,
bit_position=None,
bit_position=2,
dop_ref=OdxLinkRef.from_id(inner_dop.odx_id),
dop_snref=None,
semantic=None,
Expand All @@ -421,11 +421,13 @@ def test_bit_mask(self) -> None:

req = self._create_request([inner_param, outer_param])

self.assertEqual(req.encode(inner_param=0x1111, outer_param=0x2222).hex(), "2112")
# the bit shifts here stem from the fact that we placed the
# inner parameter at bit position 2...
self.assertEqual(req.encode(inner_param=0x1234 >> 2, outer_param=0x4568).hex(), "4238")
self.assertEqual(
req.decode(bytes.fromhex('1234')), {
"inner_param": 0x0230,
"outer_param": 0x1004
req.decode(bytes.fromhex('abcd')), {
"inner_param": (0xbc << 2),
"outer_param": 0xa00d
})


Expand Down

0 comments on commit 4f17220

Please sign in to comment.