Skip to content

Commit

Permalink
Merge pull request #323 from andlaus/diag_variables
Browse files Browse the repository at this point in the history
Implement diagnostic variables
  • Loading branch information
andlaus committed Jul 31, 2024
2 parents 9d74fa2 + 2a4375f commit abf7fad
Show file tree
Hide file tree
Showing 16 changed files with 718 additions and 13 deletions.
9 changes: 9 additions & 0 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,9 @@ class SomersaultSID(IntEnum):
ecu_variant_patterns=[],
comparam_spec_ref=None,
prot_stack_snref=None,
diag_variables_raw=[],
variable_groups=NamedItemList(),
dyn_defined_spec=None,
)
somersault_diaglayer = DiagLayer(diag_layer_raw=somersault_diaglayer_raw)

Expand Down Expand Up @@ -2082,6 +2085,9 @@ class SomersaultSID(IntEnum):
ecu_variant_patterns=[],
comparam_spec_ref=None,
prot_stack_snref=None,
diag_variables_raw=[],
variable_groups=NamedItemList(),
dyn_defined_spec=None,
)
somersault_lazy_diaglayer = DiagLayer(diag_layer_raw=somersault_lazy_diaglayer_raw)

Expand Down Expand Up @@ -2306,6 +2312,9 @@ class SomersaultSID(IntEnum):
ecu_variant_patterns=[],
comparam_spec_ref=None,
prot_stack_snref=None,
diag_variables_raw=[],
variable_groups=NamedItemList(),
dyn_defined_spec=None,
)
somersault_assiduous_diaglayer = DiagLayer(diag_layer_raw=somersault_assiduous_diaglayer_raw)

Expand Down
122 changes: 122 additions & 0 deletions odxtools/commrelation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# SPDX-License-Identifier: MIT
import warnings
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
from xml.etree import ElementTree

from .description import Description
from .diagcomm import DiagComm
from .diagservice import DiagService
from .exceptions import OdxWarning, odxraise, odxrequire
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef, resolve_snref
from .parameters.parameter import Parameter
from .snrefcontext import SnRefContext


class CommRelationValueType(Enum):
CURRENT = "CURRENT"
STORED = "STORED"
STATIC = "STATIC"
SUBSTITUTED = "SUBSTITUTED"


@dataclass
class CommRelation:
description: Optional[Description]
relation_type: str
diag_comm_ref: Optional[OdxLinkRef]
diag_comm_snref: Optional[str]
in_param_if_snref: Optional[str]
#in_param_if_snpathref: Optional[str] # TODO
out_param_if_snref: Optional[str]
#out_param_if_snpathref: Optional[str] # TODO
value_type_raw: Optional[CommRelationValueType]

@property
def diag_comm(self) -> DiagComm:
return self._diag_comm

@property
def in_param_if(self) -> Optional[Parameter]:
return self._in_param_if

@property
def out_param_if(self) -> Optional[Parameter]:
return self._out_param_if

@property
def value_type(self) -> CommRelationValueType:
if self.value_type_raw is None:
return CommRelationValueType.CURRENT

return self.value_type_raw

@staticmethod
def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> "CommRelation":
description = Description.from_et(et_element.find("DESC"), doc_frags)
relation_type = odxrequire(et_element.findtext("RELATION-TYPE"))

diag_comm_ref = OdxLinkRef.from_et(et_element.find("DIAG-COMM-REF"), doc_frags)
diag_comm_snref = None
if (diag_comm_snref_elem := et_element.find("DIAG-COMM-SNREF")) is not None:
diag_comm_snref = odxrequire(diag_comm_snref_elem.get("SHORT-NAME"))

in_param_if_snref = None
if (in_param_if_snref_elem := et_element.find("IN-PARAM-IF-SNREF")) is not None:
in_param_if_snref = odxrequire(in_param_if_snref_elem.get("SHORT-NAME"))

if et_element.find("IN-PARAM-IF-SNPATHREF") is not None:
warnings.warn("SNPATHREFs are not supported by odxtools yet", OdxWarning, stacklevel=1)

out_param_if_snref = None
if (out_param_if_snref_elem := et_element.find("OUT-PARAM-IF-SNREF")) is not None:
out_param_if_snref = odxrequire(out_param_if_snref_elem.get("SHORT-NAME"))

if et_element.find("OUT-PARAM-IF-SNPATHREF") is not None:
warnings.warn("SNPATHREFs are not supported by odxtools yet", OdxWarning, stacklevel=1)

value_type_raw = None
if (value_type_str := et_element.get("VALUE-TYPE")) is not None:
try:
value_type_raw = CommRelationValueType(value_type_str)
except ValueError:
odxraise(f"Encountered unknown comm relation value type '{value_type_str}'")

return CommRelation(
description=description,
relation_type=relation_type,
diag_comm_ref=diag_comm_ref,
diag_comm_snref=diag_comm_snref,
in_param_if_snref=in_param_if_snref,
out_param_if_snref=out_param_if_snref,
value_type_raw=value_type_raw)

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
return {}

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
if self.diag_comm_ref is not None:
self._diag_comm = odxlinks.resolve(self.diag_comm_ref, DiagComm)

def _resolve_snrefs(self, context: SnRefContext) -> None:
diag_layer = odxrequire(context.diag_layer)

if self.diag_comm_snref is not None:
self._diag_comm = resolve_snref(self.diag_comm_snref, diag_layer.diag_comms, DiagComm)

service = self.diag_comm
if not isinstance(service, DiagService):
odxraise(f"DIAG-VARIABLE references non-service {type(service).__name__} "
f"diagnostic communication")

self._in_param_if = None
if self.in_param_if_snref is not None:
self._in_param_if = resolve_snref(self.in_param_if_snref,
odxrequire(service.request).parameters, Parameter)

self._out_param_if = None
if self.out_param_if_snref is not None:
self._out_param_if = resolve_snref(self.out_param_if_snref,
odxrequire(service.positive_responses[0]).parameters,
Parameter)
64 changes: 61 additions & 3 deletions odxtools/diaglayerraw.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from .diagdatadictionaryspec import DiagDataDictionarySpec
from .diaglayertype import DiagLayerType
from .diagservice import DiagService
from .diagvariable import DiagVariable
from .dyndefinedspec import DynDefinedSpec
from .ecuvariantpattern import EcuVariantPattern
from .element import IdentifiableElement
from .exceptions import odxassert, odxraise, odxrequire
Expand All @@ -29,6 +31,7 @@
from .specialdatagroup import SpecialDataGroup
from .statechart import StateChart
from .utils import dataclass_fields_asdict
from .variablegroup import VariableGroup


@dataclass
Expand Down Expand Up @@ -63,11 +66,15 @@ class DiagLayerRaw(IdentifiableElement):
ecu_variant_patterns: List[EcuVariantPattern]
comparam_spec_ref: Optional[OdxLinkRef]
prot_stack_snref: Optional[str]
# diag_variables: List[DiagVariable] # TODO
# diag_variable_groups: List[DiagVariableGroup] # TODO
# dyn_defined_spec: Optional[DynDefinedSpec] # TODO
diag_variables_raw: List[Union[DiagVariable, OdxLinkRef]]
variable_groups: NamedItemList[VariableGroup]
dyn_defined_spec: Optional[DynDefinedSpec]
# base_variant_patterns: List[EcuVariantPattern] # TODO

@property
def diag_variables(self) -> NamedItemList[DiagVariable]:
return self._diag_variables

@staticmethod
def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> "DiagLayerRaw":
try:
Expand Down Expand Up @@ -178,6 +185,29 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) ->
if (prot_stack_snref_elem := et_element.find("PROT-STACK-SNREF")) is not None:
prot_stack_snref = odxrequire(prot_stack_snref_elem.get("SHORT-NAME"))

diag_variables_raw: List[Union[DiagVariable, OdxLinkRef]] = []
if (dv_elems := et_element.find("DIAG-VARIABLES")) is not None:
for dv_proxy_elem in dv_elems:
dv_proxy: Union[OdxLinkRef, DiagVariable]
if dv_proxy_elem.tag == "DIAG-VARIABLE-REF":
dv_proxy = OdxLinkRef.from_et(dv_proxy_elem, doc_frags)
elif dv_proxy_elem.tag == "DIAG-VARIABLE":
dv_proxy = DiagVariable.from_et(dv_proxy_elem, doc_frags)
else:
odxraise("DIAG-VARIABLES tags may only contain "
"DIAG-VARIABLE and DIAG-VARIABLE-REF subtags")

diag_variables_raw.append(dv_proxy)

variable_groups = NamedItemList([
VariableGroup.from_et(vg_elem, doc_frags)
for vg_elem in et_element.iterfind("VARIABLE-GROUPS/VARIABLE-GROUP")
])

dyn_defined_spec = None
if (dds_elem := et_element.find("DYN-DEFINED-SPEC")) is not None:
dyn_defined_spec = DynDefinedSpec.from_et(dds_elem, doc_frags)

# Create DiagLayer
return DiagLayerRaw(
variant_type=variant_type,
Expand All @@ -199,6 +229,9 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) ->
ecu_variant_patterns=ecu_variant_patterns,
comparam_spec_ref=comparam_spec_ref,
prot_stack_snref=prot_stack_snref,
diag_variables_raw=diag_variables_raw,
variable_groups=variable_groups,
dyn_defined_spec=dyn_defined_spec,
**kwargs)

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
Expand Down Expand Up @@ -236,6 +269,11 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks.update(parent_ref._build_odxlinks())
for comparam in self.comparams:
odxlinks.update(comparam._build_odxlinks())
for dv_proxy in self.diag_variables_raw:
if not isinstance(dv_proxy, OdxLinkRef):
odxlinks.update(dv_proxy._build_odxlinks())
if self.dyn_defined_spec is not None:
odxlinks.update(self.dyn_defined_spec._build_odxlinks())

return odxlinks

Expand Down Expand Up @@ -281,6 +319,19 @@ def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
for comparam in self.comparams:
comparam._resolve_odxlinks(odxlinks)

self._diag_variables: NamedItemList[DiagVariable] = NamedItemList()
for dv_proxy in self.diag_variables_raw:
if isinstance(dv_proxy, OdxLinkRef):
dv = odxlinks.resolve(dv_proxy, DiagVariable)
else:
dv_proxy._resolve_odxlinks(odxlinks)
dv = dv_proxy

self._diag_variables.append(dv)

if self.dyn_defined_spec is not None:
self.dyn_defined_spec._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, context: SnRefContext) -> None:
self._prot_stack: Optional[ProtStack] = None
if self.prot_stack_snref is not None:
Expand Down Expand Up @@ -322,6 +373,13 @@ def _resolve_snrefs(self, context: SnRefContext) -> None:
for comparam in self.comparams:
comparam._resolve_snrefs(context)

for dv_proxy in self.diag_variables_raw:
if not isinstance(dv_proxy, OdxLinkRef):
dv_proxy._resolve_snrefs(context)

if self.dyn_defined_spec is not None:
self.dyn_defined_spec._resolve_snrefs(context)

@property
def comparam_spec(self) -> Optional[Union[ComparamSpec, ComparamSubset]]:
return self._comparam_spec
Expand Down
104 changes: 104 additions & 0 deletions odxtools/diagvariable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from xml.etree import ElementTree

from .admindata import AdminData
from .commrelation import CommRelation
from .element import IdentifiableElement
from .exceptions import odxrequire
from .nameditemlist import NamedItemList
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef
from .odxtypes import odxstr_to_bool
from .snrefcontext import SnRefContext
from .specialdatagroup import SpecialDataGroup
from .swvariable import SwVariable
from .utils import dataclass_fields_asdict
from .variablegroup import VariableGroup


@dataclass
class DiagVariable(IdentifiableElement):
"""Representation of a diagnostic variable
"""

admin_data: Optional[AdminData]
variable_group_ref: OdxLinkRef
sw_variables: List[SwVariable]
comm_relations: List[CommRelation]
#snref_to_tablerow: Optional[SnrefToTableRow] # TODO
sdgs: List[SpecialDataGroup]
is_read_before_write_raw: Optional[bool]

@property
def variable_group(self) -> VariableGroup:
return self._variable_group

@property
def is_read_before_write(self) -> bool:
return self.is_read_before_write_raw is True

@staticmethod
def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> "DiagVariable":
kwargs = dataclass_fields_asdict(IdentifiableElement.from_et(et_element, doc_frags))

admin_data = AdminData.from_et(et_element.find("ADMIN-DATA"), doc_frags)
variable_group_ref = odxrequire(
OdxLinkRef.from_et(et_element.find("VARIABLE-GROUP-REF"), doc_frags))
sw_variables = NamedItemList([
SwVariable.from_et(swv_elem, doc_frags)
for swv_elem in et_element.iterfind("SW-VARIABLES/SW-VARIABLE")
])
comm_relations = [
CommRelation.from_et(cr_elem, doc_frags)
for cr_elem in et_element.iterfind("COMM-RELATIONS/COMM-RELATION")
]
sdgs = [
SpecialDataGroup.from_et(sdge, doc_frags) for sdge in et_element.iterfind("SDGS/SDG")
]
is_read_before_write_raw = odxstr_to_bool(et_element.get("IS-READ-BEFORE-WRITE"))

return DiagVariable(
admin_data=admin_data,
variable_group_ref=variable_group_ref,
sw_variables=sw_variables,
comm_relations=comm_relations,
sdgs=sdgs,
is_read_before_write_raw=is_read_before_write_raw,
**kwargs)

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
result = {self.odx_id: self}

if self.admin_data is not None:
result.update(self.admin_data._build_odxlinks())

for sdg in self.sdgs:
result.update(sdg._build_odxlinks())

for cr in self.comm_relations:
result.update(cr._build_odxlinks())

return result

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
self._variable_group = odxlinks.resolve(self.variable_group_ref, VariableGroup)

if self.admin_data is not None:
self.admin_data._resolve_odxlinks(odxlinks)

for sdg in self.sdgs:
sdg._resolve_odxlinks(odxlinks)

for cr in self.comm_relations:
cr._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, context: SnRefContext) -> None:
if self.admin_data is not None:
self.admin_data._resolve_snrefs(context)

for sdg in self.sdgs:
sdg._resolve_snrefs(context)

for cr in self.comm_relations:
cr._resolve_snrefs(context)
Loading

0 comments on commit abf7fad

Please sign in to comment.