Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some random cleanups #162

Merged
merged 10 commits into from
Jul 18, 2023
8 changes: 4 additions & 4 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ class SomersaultSID(IntEnum):
sdgs=[],
),
CodedConstParameter(
short_name="odx_id",
short_name="id",
long_name=None,
semantic=None,
description=None,
Expand Down Expand Up @@ -1230,7 +1230,7 @@ class SomersaultSID(IntEnum):
sdgs=[],
),
CodedConstParameter(
short_name="odx_id",
short_name="id",
long_name=None,
semantic=None,
description=None,
Expand Down Expand Up @@ -1264,7 +1264,7 @@ class SomersaultSID(IntEnum):
sdgs=[],
),
CodedConstParameter(
short_name="odx_id",
short_name="id",
long_name=None,
semantic=None,
description=None,
Expand Down Expand Up @@ -1430,7 +1430,7 @@ class SomersaultSID(IntEnum):
sdgs=[],
),
CodedConstParameter(
short_name="odx_id",
short_name="id",
long_name=None,
semantic=None,
description=None,
Expand Down
3 changes: 2 additions & 1 deletion odxtools/compumethods/compuscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 MBition GmbH
from typing import NamedTuple, Optional, Union

from ..odxtypes import AtomicOdxType
from .compurationalcoeffs import CompuRationalCoeffs
from .limit import Limit

Expand Down Expand Up @@ -33,6 +34,6 @@ class CompuScale(NamedTuple):
description: Optional[str] = None
lower_limit: Optional[Limit] = None
upper_limit: Optional[Limit] = None
compu_inverse_value: Optional[Union[float, str, bytearray]] = None
compu_inverse_value: Optional[AtomicOdxType] = None
compu_const: Optional[Union[float, str]] = None
compu_rational_coeffs: Optional[CompuRationalCoeffs] = None
2 changes: 1 addition & 1 deletion odxtools/compumethods/limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def from_et(et_element, internal_type: DataType) -> Optional["Limit"]:
hex_text = et_element.text
if len(hex_text) % 2 == 1:
hex_text = "0" + hex_text
return Limit(bytearray.fromhex(hex_text), interval_type)
return Limit(bytes.fromhex(hex_text), interval_type)
else:
return Limit(internal_type.from_string(et_element.text), interval_type)

Expand Down
24 changes: 19 additions & 5 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2022 MBition GmbH
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

from .compumethods import CompuMethod, create_any_compu_method_from_et
from .decodestate import DecodeState
Expand Down Expand Up @@ -198,14 +198,21 @@ def bit_length(self):
else:
return None

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: Any) -> Any:
"""
Convert a physical representation of a parameter to its internal counterpart
"""
assert self.physical_type.base_data_type.isinstance(
physical_value
), f"Expected {self.physical_type.base_data_type.value}, got {type(physical_value)}"

return self.compu_method.convert_physical_to_internal(physical_value)

def convert_physical_to_bytes(self, physical_value, encode_state, bit_position):
def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeState,
bit_position: int) -> bytes:
"""
Convert a physical representation of a parameter to a string bytes that can be send over the wire
"""
if not self.is_valid_physical_value(physical_value):
raise EncodeError(f"The value {repr(physical_value)} of type {type(physical_value)}"
f" is not a valid." +
Expand All @@ -217,7 +224,14 @@ def convert_physical_to_bytes(self, physical_value, encode_state, bit_position):
return self.diag_coded_type.convert_internal_to_bytes(
internal_val, encode_state, bit_position=bit_position)

def convert_bytes_to_physical(self, decode_state, bit_position: int = 0):
def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[Any, int]:
"""
Convert the internal representation of a value into its physical value.

Returns a (physical_value, start_position_of_next_parameter) tuple.
"""
assert 0 <= bit_position and bit_position < 8

internal, next_byte_position = self.diag_coded_type.convert_bytes_to_internal(
Expand Down Expand Up @@ -261,7 +275,7 @@ class DiagnosticTroubleCode:
short_name: Optional[str]
text: Optional[str]
display_trouble_code: Optional[str]
level: Union[bytes, bytearray, None]
level: Union[bytes, None]
is_temporary_raw: Optional[bool]
sdgs: List[SpecialDataGroup]

Expand Down
4 changes: 2 additions & 2 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

class ParameterValuePair(NamedTuple):
parameter: "Parameter"
value: Union[str, int, bytes, bytearray, Dict]
value: Union[str, int, bytes, Dict]


class DecodeState(NamedTuple):
"""Utility class to be used while decoding a message."""

coded_message: Union[bytes, bytearray]
coded_message: bytes
"""bytes to be decoded"""
parameter_value_pairs: List[ParameterValuePair]
"""values of already decoded parameters"""
Expand Down
6 changes: 3 additions & 3 deletions odxtools/diagcodedtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _minimal_byte_length_of(self, internal_value: Union[bytes, str]) -> int:

@abc.abstractmethod
def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState,
bit_position: int) -> Union[bytes, bytearray]:
bit_position: int) -> bytes:
"""Encode the internal value.

Parameters
Expand All @@ -218,7 +218,7 @@ def convert_bytes_to_internal(self, decode_state: DecodeState, bit_position: int

Returns
-------
str or int or bytes or bytearray or dict
str or int or bytes or dict
the decoded parameter value
int
the next byte position after the extracted parameter
Expand Down Expand Up @@ -253,7 +253,7 @@ def __init__(
], f"A leading length info type cannot have the base data type {self.base_data_type}."

def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState,
bit_position: int) -> Union[bytes, bytearray]:
bit_position: int) -> bytes:

byte_length = self._minimal_byte_length_of(internal_value)

Expand Down
4 changes: 2 additions & 2 deletions odxtools/diagdatadictionaryspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
self.tables):
obj._resolve_odxlinks(odxlinks)

if self.unit_spec:
if self.unit_spec is not None:
self.unit_spec._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
Expand All @@ -168,7 +168,7 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
self.tables):
obj._resolve_snrefs(diag_layer)

if self.unit_spec:
if self.unit_spec is not None:
self.unit_spec._resolve_snrefs(diag_layer)

@property
Expand Down
10 changes: 4 additions & 6 deletions odxtools/diaglayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ def _extend_prefix_tree(prefix_tree: PrefixTree, coded_prefix: bytes,
else:
cast(List[DiagService], sub_tree[-1]).append(service)

def _find_services_for_uds(self, message: Union[bytes, bytearray]) -> List[DiagService]:
def _find_services_for_uds(self, message: bytes) -> List[DiagService]:
prefix_tree = self._prefix_tree

# Find matching service(s) in prefix tree
Expand All @@ -907,8 +907,7 @@ def _find_services_for_uds(self, message: Union[bytes, bytearray]) -> List[DiagS
possible_services += cast(List[DiagService], prefix_tree[-1])
return possible_services

def _decode(self, message: Union[bytes, bytearray],
candidate_services: Iterable[DiagService]) -> List[Message]:
def _decode(self, message: bytes, candidate_services: Iterable[DiagService]) -> List[Message]:
decoded_messages: List[Message] = []

for service in candidate_services:
Expand All @@ -935,13 +934,12 @@ def _decode(self, message: Union[bytes, bytearray],

return decoded_messages

def decode(self, message: Union[bytes, bytearray]) -> List[Message]:
def decode(self, message: bytes) -> List[Message]:
candidate_services = self._find_services_for_uds(message)

return self._decode(message, candidate_services)

def decode_response(self, response: Union[bytes, bytearray],
request: Union[bytes, bytearray, Message]) -> Iterable[Message]:
def decode_response(self, response: bytes, request: Union[bytes, Message]) -> Iterable[Message]:
if isinstance(request, Message):
candidate_services = [request.service]
else:
Expand Down
25 changes: 13 additions & 12 deletions odxtools/ecu_variant_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
# Copyright (c) 2023 MBition GmbH

from enum import Enum
from typing import ByteString, Dict, Generator, List, Optional, Union
from typing import Dict, Generator, List, Optional, Union

from odxtools.diaglayer import DiagLayer
from odxtools.diaglayertype import DiagLayerType
from odxtools.ecu_variant_patterns import MatchingParameter
from odxtools.exceptions import OdxError
from odxtools.service import DiagService
from odxtools.structures import Response
from .diaglayer import DiagLayer
from .diaglayertype import DiagLayerType
from .ecu_variant_patterns import MatchingParameter
from .exceptions import OdxError
from .odxtypes import ParameterValue
from .service import DiagService
from .structures import Response


class EcuVariantMatcher:
Expand Down Expand Up @@ -58,7 +59,7 @@ def encode_ident_request(diag_layer: DiagLayer, matching_param: MatchingParamete
def decode_ident_response(
diag_layer: DiagLayer,
matching_param: MatchingParameter,
response_bytes: Union[bytes, bytearray],
response_bytes: bytes,
) -> str:
"""Decode a binary response and extract the identification string according
to the snref or snpathref of the matching_param.
Expand All @@ -74,11 +75,11 @@ def decode_ident_response(
pos_neg_responses.extend(service.negative_responses)

for any_response in pos_neg_responses:
decoded_val = any_response.decode(response_bytes)
decoded_val: Optional[ParameterValue] = any_response.decode(response_bytes)
# disassemble snref / snpathref
path_ref = matching_param.out_param_if.split(".")
for ref in path_ref:
if ref in decoded_val:
if isinstance(decoded_val, dict) and ref in decoded_val:
decoded_val = decoded_val[ref]
else:
decoded_val = None
Expand All @@ -98,7 +99,7 @@ def __init__(self, ecu_variant_candidates: List[DiagLayer], use_cache: bool = Tr
assert ecu.variant_type == DiagLayerType.ECU_VARIANT

self.use_cache = use_cache
self.req_resp_cache: Dict[ByteString, bytes] = {}
self.req_resp_cache: Dict[bytes, bytes] = {}
self._recent_ident_response: Optional[bytes] = None

self._state = EcuVariantMatcher.State.PENDING
Expand Down Expand Up @@ -136,7 +137,7 @@ def request_loop(self) -> Generator[bytes, None, None]:
# no pattern has matched for any ecu variant
self._state = EcuVariantMatcher.State.NO_MATCH

def evaluate(self, resp_bytes: Union[bytes, bytearray]) -> None:
def evaluate(self, resp_bytes: bytes) -> None:
"""Update the matcher with the response to a requst.

Warning: Use this method EXACTLY once within the loop body of the request loop.
Expand Down
3 changes: 2 additions & 1 deletion odxtools/encodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, NamedTuple, Optional, Union

from .odxlink import OdxLinkId
from .odxtypes import AtomicOdxType


class EncodeState(NamedTuple):
Expand All @@ -23,7 +24,7 @@ class EncodeState(NamedTuple):
"""payload that is constructed so far"""
parameter_values: Dict[str, Any]
"""a mapping from short name to value for each parameter"""
triggering_request: Optional[Union[bytes, bytearray]] = None
triggering_request: Optional[bytes] = None
"""If encoding a response: request that triggered the response"""
length_keys: Dict[OdxLinkId, int] = {}
"""Mapping from IDs to bit lengths (specified by LengthKeyParameters)"""
Expand Down
4 changes: 2 additions & 2 deletions odxtools/endofpdufield.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .decodestate import DecodeState
from .encodestate import EncodeState
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef
from .odxtypes import odxstr_to_bool
from .odxtypes import ParameterValueDict, odxstr_to_bool
from .specialdata import create_sdgs_from_et
from .structures import BasicStructure
from .utils import create_description_from_et
Expand Down Expand Up @@ -108,7 +108,7 @@ def convert_physical_to_internal(self, physical_value):

def convert_physical_to_bytes(
self,
physical_value: Union[dict, List[dict]],
physical_value: ParameterValueDict,
encode_state: EncodeState,
bit_position: int = 0,
):
Expand Down
5 changes: 2 additions & 3 deletions odxtools/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
class Message:
"""A CAN message with its interpretation."""

def __init__(self, *, coded_message: Union[bytes, bytearray], service, structure,
param_dict: dict):
def __init__(self, *, coded_message: bytes, service, structure, param_dict: dict):
"""
Parameters
----------
coded_message : bytes or bytearray
coded_message : bytes
service : DiagService
structure : Request or Response
param_dict : dict
Expand Down
16 changes: 14 additions & 2 deletions odxtools/odxlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,23 @@ def resolve(self, ref: OdxLinkRef, expected_type: Optional[Type[T]] = None) -> A
if obj is not None:
if expected_type is not None:
assert isinstance(obj, expected_type)
return obj

return obj

raise KeyError(f"ODXLINK reference {ref} could not be resolved for any "
f"of the document fragments {ref.ref_docs}")

def resolve_lenient(self, ref: OdxLinkRef) -> Optional[Any]:
@overload
def resolve_lenient(self, ref: OdxLinkRef, expected_type: None = None) -> Any:
...

@overload
def resolve_lenient(self, ref: OdxLinkRef, expected_type: Type[T]) -> Optional[T]:
...

def resolve_lenient(self,
ref: OdxLinkRef,
expected_type: Optional[Type[T]] = None) -> Optional[Any]:
"""
Resolve a reference to an object

Expand All @@ -236,6 +245,9 @@ def resolve_lenient(self, ref: OdxLinkRef) -> Optional[Any]:

obj = doc_frag_db.get(odx_id)
if obj is not None:
if expected_type is not None:
assert isinstance(obj, expected_type)

return obj

return None
Expand Down
Loading