diff --git a/canopen/objectdictionary/__init__.py b/canopen/objectdictionary/__init__.py index c8de5aa4..9d9002b1 100644 --- a/canopen/objectdictionary/__init__.py +++ b/canopen/objectdictionary/__init__.py @@ -210,6 +210,8 @@ def __init__(self, name: str, index: int): self.storage_location: Optional[str] = None self.subindices: dict[int, ODVariable] = {} self.names: dict[str, ODVariable] = {} + #: Key-Value pairs not defined by the standard + self.custom_options: dict[str, str] = {} def __repr__(self) -> str: return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" @@ -271,12 +273,17 @@ def __init__(self, name: str, index: int): self.storage_location: Optional[str] = None self.subindices: dict[int, ODVariable] = {} self.names: dict[str, ODVariable] = {} + #: Key-Value pairs not defined by the standard + self.custom_options: dict[str, str] = {} def __repr__(self) -> str: return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" def __getitem__(self, subindex: Union[int, str]) -> ODVariable: - var = self.names.get(subindex) or self.subindices.get(subindex) + var = ( + self.names.get(subindex) # type: ignore[arg-type] + or self.subindices.get(subindex) # type: ignore[arg-type] + ) if var is not None: # This subindex is defined pass @@ -291,6 +298,7 @@ def __getitem__(self, subindex: Union[int, str]) -> ODVariable: "bit_definitions", "storage_location"): if attr in template.__dict__: var.__dict__[attr] = template.__dict__[attr] + var.custom_options = template.custom_options else: raise KeyError(f"Could not find subindex {pretty_index(None, subindex)}") return var @@ -381,6 +389,8 @@ def __init__(self, name: str, index: int, subindex: int = 0): self.storage_location: Optional[str] = None #: Can this variable be mapped to a PDO self.pdo_mappable = False + #: Key-Value pairs not defined by the standard + self.custom_options: dict[str, str] = {} def __repr__(self) -> str: subindex = self.subindex if isinstance(self.parent, (ODRecord, ODArray)) else None diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index d47a3019..af6fa3f8 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) + def import_eds(source, node_id): eds = RawConfigParser(inline_comment_prefixes=(';',)) eds.optionxform = str @@ -133,20 +134,22 @@ def import_eds(source, node_id): od.add_object(var) elif object_type == objectcodes.ARRAY and eds.has_option(section, "CompactSubObj"): arr = ODArray(name, index) - last_subindex = ODVariable( - "Number of entries", index, 0) + last_subindex = ODVariable("Number of entries", index, 0) last_subindex.data_type = datatypes.UNSIGNED8 arr.add_member(last_subindex) arr.add_member(build_variable(eds, section, node_id, object_type, index, 1)) arr.storage_location = storage_location + arr.custom_options = _get_custom_options(eds, section) od.add_object(arr) elif object_type == objectcodes.ARRAY: arr = ODArray(name, index) arr.storage_location = storage_location + arr.custom_options = _get_custom_options(eds, section) od.add_object(arr) elif object_type == objectcodes.RECORD: record = ODRecord(name, index) record.storage_location = storage_location + record.custom_options = _get_custom_options(eds, section) od.add_object(record) continue @@ -258,6 +261,27 @@ def _revert_variable(var_type, value): return f"0x{value:02X}" +_STANDARD_OPTIONS = { + "ObjectType", "ParameterName", "DataType", "AccessType", + "PDOMapping", "LowLimit", "HighLimit", "DefaultValue", + "ParameterValue", "Factor", "Description", "Unit", + "StorageLocation", "CompactSubObj", + # CiA 306 fields parsed explicitly: + "SubNumber", + # ObjFlags and Denotation are intentionally absent: they are not yet + # parsed by this codebase, so they flow through custom_options and + # survive round-trips. Proper first-class support is tracked in #654. +} + + +def _get_custom_options(eds: RawConfigParser, section: str) -> dict[str, str]: + custom_options = {} + for option, value in eds.items(section): + if option not in _STANDARD_OPTIONS: + custom_options[option] = value + return custom_options + + def build_variable( eds: RawConfigParser, section: str, @@ -350,6 +374,8 @@ def build_variable( var.unit = eds.get(section, "Unit") except ValueError: pass + + var.custom_options = _get_custom_options(eds, section) return var @@ -425,12 +451,19 @@ def export_variable(var, eds): if getattr(var, 'unit', '') != '': eds.set(section, "Unit", var.unit) + for option, value in var.custom_options.items(): + if option not in _STANDARD_OPTIONS: + eds.set(section, option, str(value)) + def export_record(var, eds): section = f"{var.index:04X}" export_common(var, eds, section) eds.set(section, "SubNumber", f"0x{len(var.subindices):X}") ot = objectcodes.RECORD if isinstance(var, ODRecord) else objectcodes.ARRAY eds.set(section, "ObjectType", f"0x{ot:X}") + for option, value in var.custom_options.items(): + if option not in _STANDARD_OPTIONS: + eds.set(section, option, str(value)) for i in var: export_variable(var[i], eds) diff --git a/test/sample.eds b/test/sample.eds index ad00a12e..3e5e3bbc 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -1018,6 +1018,36 @@ Factor=ERROR Description= Unit= +[3061] +ParameterName=Object with custom options +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 +Category=Motor +Offset=100 + +[3062] +ParameterName=Record with custom options +SubNumber=0x2 +ObjectType=0x9 +RecordTag=vendor_specific + +[3062sub0] +ParameterName=Highest subindex +ObjectType=0x7 +DataType=0x0005 +AccessType=ro +DefaultValue=0x01 +PDOMapping=0 + +[3062sub1] +ParameterName=Value +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 + [3063] ParameterName=DOMAIN object ObjectType=0x2 diff --git a/test/test_eds.py b/test/test_eds.py index 7a19ffeb..7a1e359b 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -251,6 +251,54 @@ def test_roundtrip_domain_objects(self): self.assertTrue(od2[0x3063].is_domain) self.assertTrue(od2[0x3064][1].is_domain) + def test_reading_custom_options(self): + """Custom options (unknown EDS keys) are collected in custom_options dict.""" + var = self.od[0x3061] + self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertEqual(var.custom_options, {'Category': 'Motor', 'Offset': '100'}) + + def test_custom_options_standard_keys_excluded(self): + """Standard CiA 306 keys must NOT appear in custom_options.""" + var = self.od[0x3061] + for key in ('ParameterName', 'ObjectType', 'DataType', 'AccessType', 'PDOMapping'): + self.assertNotIn(key, var.custom_options, + f"Standard key {key!r} must not be in custom_options") + + def test_custom_options_empty_for_standard_object(self): + """Objects without extra keys must have an empty custom_options dict.""" + var = self.od['Producer heartbeat time'] + self.assertEqual(var.custom_options, {}) + + def test_custom_options_record(self): + """custom_options is read for ODRecord container objects too.""" + record = self.od[0x3062] + self.assertIsInstance(record, canopen.objectdictionary.ODRecord) + self.assertEqual(record.custom_options, {'RecordTag': 'vendor_specific'}) + # sub-entries without extra keys have empty custom_options + self.assertEqual(record[1].custom_options, {}) + + def test_roundtrip_custom_options(self): + """custom_options survive an EDS export/import round-trip.""" + import io + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3061].custom_options, {'Category': 'Motor', 'Offset': '100'}) + self.assertEqual(od2[0x3062].custom_options, {'RecordTag': 'vendor_specific'}) + + def test_roundtrip_custom_options_not_duplicated_as_standard(self): + """After round-trip the re-imported object must not contain standard keys.""" + import io + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + for key in ('ParameterName', 'ObjectType', 'DataType', 'AccessType', 'PDOMapping'): + self.assertNotIn(key, od2[0x3061].custom_options) + def test_comments(self): self.assertEqual(self.od.comments,