diff --git a/.github/workflows/Test.yml b/.github/workflows/Test.yml index bf12fff7..ea65c9ba 100644 --- a/.github/workflows/Test.yml +++ b/.github/workflows/Test.yml @@ -2,7 +2,7 @@ name: Test-Publish on: push: - branches: ['master'] + branches: [ 'master' ] tags: - 'v*' # only publish when pushing version tags (e.g., v1.0.0) pull_request: @@ -22,13 +22,13 @@ jobs: # test for: # * oldest supported version # * latest available Python version - python-version: ['3.10', '3.14'] + python-version: [ '3.12', '3.14' ] # * Linux using ubuntu-latest # * Windows using windows-latest - os: ['ubuntu-latest', 'windows-latest'] + os: [ 'ubuntu-latest', 'windows-latest' ] # * OM stable - latest stable version # * OM nightly - latest nightly build - omc-version: ['stable', 'nightly'] + omc-version: [ 'stable', 'nightly' ] steps: - uses: actions/checkout@v6 @@ -98,8 +98,8 @@ jobs: needs: test strategy: matrix: - python-version: ['3.10'] - os: ['ubuntu-latest'] + python-version: [ '3.12' ] + os: [ 'ubuntu-latest' ] if: startsWith(github.ref, 'refs/tags/') steps: - uses: actions/checkout@v6 diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index 17678bb0..3f15c372 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -4,21 +4,26 @@ """ import logging +import numbers import os import pathlib -import platform from typing import Any, Optional +import warnings import numpy as np from OMPython.model_execution import ( - ModelExecutionCmd, + ModelExecutionConfig, ModelExecutionException, ) +from OMPython.om_session_abc import ( + OMPathABC, +) from OMPython.om_session_omc import ( OMCSessionLocal, ) from OMPython.modelica_system_abc import ( + LinearizationResult, ModelicaSystemError, ) from OMPython.modelica_system_omc import ( @@ -28,10 +33,15 @@ ModelicaDoEOMC, ) +from OMPython.compatibility_v400 import ( + depreciated_class, +) + # define logger using the current module name as ID logger = logging.getLogger(__name__) +@depreciated_class(msg="Please use class ModelicaSystemOMC instead!") class ModelicaSystem(ModelicaSystemOMC): """ Compatibility class. @@ -67,58 +77,262 @@ def __init__( def setCommandLineOptions(self, commandLineOptions: str): super().set_command_line_options(command_line_option=commandLineOptions) - def setContinuous( # type: ignore[override] + def simulate_cmd( # type: ignore[override] + self, + result_file: OMPathABC, + simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, + ) -> ModelExecutionConfig: + """ + Compatibility layer for OMPython v4.0.0 - keep simflags available and use ModelicaSystemCmd! + """ + + if simargs is None: + simargs = {} + + if simflags is not None: + simargs_extra = parse_simflags(simflags=simflags) + simargs = simargs | simargs_extra + + return super().simulate_cmd( + result_file=result_file, + simargs=simargs, + ) + + def simulate( # type: ignore[override] + self, + resultfile: Optional[str | os.PathLike] = None, + simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, + ) -> None: + """ + Compatibility layer for OMPython v4.0.0 - keep simflags available and use ModelicaSystemCmd! + """ + + if simargs is None: + simargs = {} + + if simflags is not None: + simargs_extra = parse_simflags(simflags=simflags) + simargs = simargs | simargs_extra + + return super().simulate( + resultfile=resultfile, + simargs=simargs, + ) + + def linearize( # type: ignore[override] self, - cvals: str | list[str] | dict[str, Any], + lintime: Optional[float] = None, + simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, + ) -> LinearizationResult: + """ + Compatibility layer for OMPython v4.0.0 - keep simflags available and use ModelicaSystemCmd! + """ + if simargs is None: + simargs = {} + + if simflags is not None: + simargs_extra = parse_simflags(simflags=simflags) + simargs = simargs | simargs_extra + + return super().linearize( + lintime=lintime, + simargs=simargs, + ) + + @staticmethod + def _set_compatibility_helper( + pkey: str, + args: Any, + kwargs: dict[str, Any], + ) -> dict[str, Any]: + input_args = [] + if len(args) == 1: + input_args.append(args[0]) + elif pkey in kwargs: + input_args.append(kwargs[pkey]) + + # the code below is based on _prepare_input_data2() + + def prepare_str(str_in: str) -> dict[str, str]: + str_in = str_in.replace(" ", "") + key_val_list: list[str] = str_in.split("=") + if len(key_val_list) != 2: + raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") + if len(key_val_list[0]) == 0: + raise ModelicaSystemError(f"Empty key: {str_in}") + + input_data_from_str: dict[str, str] = {str(key_val_list[0]): str(key_val_list[1])} + + return input_data_from_str + + input_data: dict[str, str] = {} + + if input_args is None: + return input_data + + for input_arg in input_args: + if isinstance(input_arg, str): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + input_data = input_data | prepare_str(input_arg) + elif isinstance(input_arg, list): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + + for item in input_arg: + if not isinstance(item, str): + raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") + input_data = input_data | prepare_str(item) + elif isinstance(input_arg, dict): + input_arg_str: dict[str, str] = {} + for key, val in input_arg.items(): + if not isinstance(key, str) or len(key) == 0: + raise ModelicaSystemError(f"Invalid key for set*() functions: {repr(key)}") + input_arg_str[key] = str(val).replace(' ', '') + input_data = input_data | input_arg_str + else: + raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") + + return input_data + + def setContinuous( + self, + *args: Any, + **kwargs: dict[str, Any], ) -> bool: - if isinstance(cvals, dict): - return super().setContinuous(**cvals) - raise ModelicaSystemError("Only dict input supported for setContinuous()") + """ + Compatibility wrapper for setContinuous() from OMPython v4.0.0 + + Original definition: + + ``` + def setContinuous( + self, + cvals: str | list[str] | dict[str, Any], + ) -> bool: + ``` + """ + param = self._set_compatibility_helper(pkey='cvals', args=args, kwargs=kwargs) + return super().setContinuous(**param) - def setParameters( # type: ignore[override] + def setParameters( self, - pvals: str | list[str] | dict[str, Any], + *args: Any, + **kwargs: dict[str, Any], ) -> bool: - if isinstance(pvals, dict): - return super().setParameters(**pvals) - raise ModelicaSystemError("Only dict input supported for setParameters()") + """ + Compatibility wrapper for setParameters() from OMPython v4.0.0 - def setOptimizationOptions( # type: ignore[override] + Original definition: + + ``` + def setParameters( + self, + pvals: str | list[str] | dict[str, Any], + ) -> bool: + ``` + """ + param = self._set_compatibility_helper(pkey='pvals', args=args, kwargs=kwargs) + return super().setParameters(**param) + + def setOptimizationOptions( self, - optimizationOptions: str | list[str] | dict[str, Any], + *args: Any, + **kwargs: dict[str, Any], ) -> bool: - if isinstance(optimizationOptions, dict): - return super().setOptimizationOptions(**optimizationOptions) - raise ModelicaSystemError("Only dict input supported for setOptimizationOptions()") + """ + Compatibility wrapper for setOptimizationOptions() from OMPython v4.0.0 + + Original definition: + + ``` + def setOptimizationOptions( + self, + optimizationOptions: str | list[str] | dict[str, Any], + ) -> bool: + ``` + """ + param = self._set_compatibility_helper(pkey='optimizationOptions', args=args, kwargs=kwargs) + return super().setOptimizationOptions(**param) - def setInputs( # type: ignore[override] + def setInputs( self, - name: str | list[str] | dict[str, Any], + *args: Any, + **kwargs: dict[str, Any], ) -> bool: - if isinstance(name, dict): - return super().setInputs(**name) - raise ModelicaSystemError("Only dict input supported for setInputs()") + """ + Compatibility wrapper for setInputs() from OMPython v4.0.0 + + Original definition: + + ``` + def setInputs( + self, + name: str | list[str] | dict[str, Any], + ) -> bool: + ``` + """ + param = self._set_compatibility_helper(pkey='name', args=args, kwargs=kwargs) + return super().setInputs(**param) - def setSimulationOptions( # type: ignore[override] + def setSimulationOptions( self, - simOptions: str | list[str] | dict[str, Any], + *args: Any, + **kwargs: dict[str, Any], ) -> bool: - if isinstance(simOptions, dict): - return super().setSimulationOptions(**simOptions) - raise ModelicaSystemError("Only dict input supported for setSimulationOptions()") + """ + Compatibility wrapper for setSimulationOptions() from OMPython v4.0.0 + + Original definition: + + ``` + def setSimulationOptions( + self, + simOptions: str | list[str] | dict[str, Any], + ) -> bool: + ``` + """ + param = self._set_compatibility_helper(pkey='simOptions', args=args, kwargs=kwargs) + return super().setSimulationOptions(**param) - def setLinearizationOptions( # type: ignore[override] + def setLinearizationOptions( self, - linearizationOptions: str | list[str] | dict[str, Any], + *args: Any, + **kwargs: dict[str, Any], ) -> bool: - if isinstance(linearizationOptions, dict): - return super().setLinearizationOptions(**linearizationOptions) - raise ModelicaSystemError("Only dict input supported for setLinearizationOptions()") + """ + Compatibility wrapper for setLinearizationOptions() from OMPython v4.0.0 + + Original definition: + + ``` + def setLinearizationOptions( + self, + linearizationOptions: str | list[str] | dict[str, Any], + ) -> bool: + ``` + """ + param = self._set_compatibility_helper(pkey='linearizationOptions', args=args, kwargs=kwargs) + return super().setLinearizationOptions(**param) def getContinuous( self, names: Optional[str | list[str]] = None, ): + """ + Compatibility wrapper for getContinuous() from OMPython v4.0.0 + + If no model simulation was run (self._simulated == False), the return value should be converted to str. + """ retval = super().getContinuous(names=names) if self._simulated: return retval @@ -140,12 +354,17 @@ def getContinuous( retval3.append(str(val)) return retval3 - raise ModelExecutionException("Invalid data!") + raise ModelicaSystemError("Invalid data!") def getOutputs( self, names: Optional[str | list[str]] = None, ): + """ + Compatibility wrapper for getOutputs() from OMPython v4.0.0 + + If no model simulation was run (self._simulated == False), the return value should be converted to str. + """ retval = super().getOutputs(names=names) if self._simulated: return retval @@ -167,18 +386,25 @@ def getOutputs( retval3.append(str(val)) return retval3 - raise ModelExecutionException("Invalid data!") + raise ModelicaSystemError("Invalid data!") +@depreciated_class(msg="Please use class ModelicaDoEOMC instead!") class ModelicaSystemDoE(ModelicaDoEOMC): """ Compatibility class. """ -class ModelicaSystemCmd(ModelExecutionCmd): +@depreciated_class(msg="Please use class ModelExecutionConfig instead!") +class ModelicaSystemCmd(ModelExecutionConfig): """ - Compatibility class; in the new version it is renamed as ModelExecutionCmd. + Compatibility class; not much content. + + Missing definitions: + * get_exe() - see self.definition.cmd_model_executable + * get_cmd() - use self.get_cmd_args() or self.definition().get_cmd() + * run() - use self.definition().run() """ def __init__( @@ -194,30 +420,44 @@ def __init__( model_name=modelname, ) - def get_exe(self) -> pathlib.Path: - """Get the path to the compiled model executable.""" - - path_run = pathlib.Path(self._runpath) - if platform.system() == "Windows": - path_exe = path_run / f"{self._model_name}.exe" - else: - path_exe = path_run / self._model_name - if not path_exe.exists(): - raise ModelicaSystemError(f"Application file path not found: {path_exe}") - - return path_exe - - def get_cmd(self) -> list: - """Get a list with the path to the executable and all command line args. - - This can later be used as an argument for subprocess.run(). - """ - - cmdl = [self.get_exe().as_posix()] + self.get_cmd_args() - - return cmdl +def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | numbers.Number]]: + """ + Parse a simflag definition; this is deprecated! - def run(self): - cmd_definition = self.definition() - return cmd_definition.run() + The return data can be used as input for self.args_set(). + """ + warnings.warn( + message="The argument 'simflags' is depreciated and will be removed in future versions; " + "please use 'simargs' instead", + category=DeprecationWarning, + stacklevel=2, + ) + + simargs: dict[str, Optional[str | dict[str, Any] | numbers.Number]] = {} + + args = [s for s in simflags.split(' ') if s] + for arg in args: + if arg[0] != '-': + raise ModelExecutionException(f"Invalid simulation flag: {arg}") + arg = arg[1:] + parts = arg.split('=') + if len(parts) == 1: + simargs[parts[0]] = None + elif parts[0] == 'override': + override = '='.join(parts[1:]) + + override_dict = {} + for item in override.split(','): + kv = item.split('=') + if not 0 < len(kv) < 3: + raise ModelExecutionException(f"Invalid value for '-override': {override}") + if kv[0]: + try: + override_dict[kv[0]] = kv[1] + except (KeyError, IndexError) as ex: + raise ModelExecutionException(f"Invalid value for '-override': {override}") from ex + + simargs[parts[0]] = override_dict + + return simargs diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index c5511923..35e1271a 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -17,7 +17,6 @@ OMSessionException, ) from OMPython.om_session_omc import ( - DockerPopen, OMCSessionABC, OMCSessionDocker, OMCSessionDockerContainer, @@ -26,30 +25,28 @@ OMCSessionWSL, ) +from OMPython.compatibility_v400 import ( + depreciated_class, +) # define logger using the current module name as ID logger = logging.getLogger(__name__) +@depreciated_class(msg="Please use class OMSessionException instead!") class OMCSessionException(OMSessionException): """ Just a compatibility layer ... """ +@depreciated_class(msg="Please use OMCSession*.sendExpression(...) instead!") class OMCSessionCmd: """ Implementation of Open Modelica Compiler API functions. Depreciated! """ def __init__(self, session: OMSessionABC, readonly: bool = False): - warnings.warn( - message="The class OMCSessionCMD is depreciated and will be removed in future versions; " - "please use OMCSession*.sendExpression(...) instead!", - category=DeprecationWarning, - stacklevel=2, - ) - if not isinstance(session, OMSessionABC): raise OMCSessionException("Invalid OMC process definition!") self._session = session @@ -228,6 +225,7 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F return self._ask(question='getClassNames', opt=opt) +@depreciated_class(msg="Please use OMCSession* classes instead!") class OMCSessionZMQ(OMSessionABC): """ This class is a compatibility layer for the new schema using OMCSession* classes. @@ -242,11 +240,6 @@ def __init__( """ Initialisation for OMCSessionZMQ """ - warnings.warn(message="The class OMCSessionZMQ is depreciated and will be removed in future versions; " - "please use OMCProcess* classes instead!", - category=DeprecationWarning, - stacklevel=2) - if omc_process is None: omc_process = OMCSessionLocal(omhome=omhome, timeout=timeout) elif not isinstance(omc_process, OMCSessionABC): @@ -280,14 +273,22 @@ def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC return self.omc_process.omcpath_tempdir(tempdir_base=tempdir_base) def execute(self, command: str): - return self.omc_process.execute(command=command) + warnings.warn( + message="This function is depreciated and will be removed in future versions; " + "please use sendExpression() instead", + category=DeprecationWarning, + stacklevel=2, + ) + return self.omc_process.sendExpression(expr=command, parsed=False) - def sendExpression(self, command: str, parsed: bool = True) -> Any: + def sendExpression(self, command: str, parsed: bool = True) -> Any: # pylint: disable=W0237 """ Send an expression to the OMC server and return the result. - The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'. - Caller should only check for OMSessionException. + The complete error handling of the OMC result is done within this method using 'getMessagesStringInternal()'. + Caller should only check for OMCSessionException. + + Compatibility: 'command' was renamed to 'expr' """ return self.omc_process.sendExpression(expr=command, parsed=parsed) @@ -301,9 +302,36 @@ def set_workdir(self, workdir: OMPathABC) -> None: return self.omc_process.set_workdir(workdir=workdir) -DummyPopen = DockerPopen -OMCProcessLocal = OMCSessionLocal -OMCProcessPort = OMCSessionPort -OMCProcessDocker = OMCSessionDocker -OMCProcessDockerContainer = OMCSessionDockerContainer -OMCProcessWSL = OMCSessionWSL +@depreciated_class(msg="Please use class OMCSessionLocal instead!") +class OMCProcessLocal(OMCSessionLocal): + """ + Just a wrapper class; OMCProcessLocal => OMCSessionLocal + """ + + +@depreciated_class(msg="Please use class OMCSessionPort instead!") +class OMCProcessPort(OMCSessionPort): + """ + Just a wrapper class; OMCProcessPort => OMCSessionPort + """ + + +@depreciated_class(msg="Please use class OMCSessionDocker instead!") +class OMCProcessDocker(OMCSessionDocker): + """ + Just a wrapper class; OMCProcessDocker => OMCSessionDocker + """ + + +@depreciated_class(msg="Please use class OMCSessionDockerContainer instead!") +class OMCProcessDockerContainer(OMCSessionDockerContainer): + """ + Just a wrapper class; OMCProcessDockerContainer => OMCSessionDockerContainer + """ + + +@depreciated_class(msg="Please use class OMCSessionWSL instead!") +class OMCProcessWSL(OMCSessionWSL): + """ + Just a wrapper class; OMCProcessWSL => OMCSessionWSL + """ diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 282923a7..848421c5 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -6,14 +6,14 @@ ``` import OMPython omc = OMPython.OMCSessionLocal() -omc.sendExpression("command") +omc.sendExpression("getVersion()") ``` """ from OMPython.model_execution import ( - ModelExecutionCmd, - ModelExecutionData, + ModelExecutionConfig, + ModelExecutionRun, ModelExecutionException, ) from OMPython.om_session_abc import ( @@ -58,15 +58,16 @@ ModelicaDoERunner, ) +# the imports below are compatibility functionality (OMPython v4.0.0) from OMPython.ModelicaSystem import ( ModelicaSystem, ModelicaSystemDoE, - ModelicaSystemCmd, + parse_simflags, ) from OMPython.OMCSession import ( OMCSessionCmd, - OMCSessionZMQ, OMCSessionException, + OMCSessionZMQ, OMCProcessLocal, OMCProcessPort, @@ -80,15 +81,14 @@ 'LinearizationResult', - 'ModelExecutionCmd', - 'ModelExecutionData', + 'ModelExecutionConfig', + 'ModelExecutionRun', 'ModelExecutionException', 'ModelicaDoEABC', 'ModelicaDoEOMC', 'ModelicaDoERunner', 'ModelicaSystemABC', - 'ModelicaSystemDoE', 'ModelicaSystemError', 'ModelicaSystemOMC', 'ModelicaSystemRunner', @@ -109,10 +109,10 @@ 'OMPathRunnerLocal', 'OMSessionRunner', - 'ModelicaSystemCmd', 'ModelicaSystem', + 'ModelicaSystemDoE', + 'parse_simflags', - 'OMCSessionABC', 'OMCSessionCmd', 'OMCSessionException', diff --git a/OMPython/compatibility_v400.py b/OMPython/compatibility_v400.py new file mode 100644 index 00000000..61fa27a8 --- /dev/null +++ b/OMPython/compatibility_v400.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +""" +Helper functions for compatibility with OMPython v4.0.0 +""" +import warnings +from typing import Optional + + +def depreciated_class(msg: Optional[str] = None): + """ + Decorator for depreciated / compatibility classes. + """ + + def depreciated(cls): + """ + Helper functions to do the decoration part. + """ + + class Wrapper(cls): + """ + Wrapper to define the depreciation message. + """ + + def __init__(self, *args, **kwargs): + message = f"The class {cls.__name__} is depreciated and will be removed in future versions!" + if msg is not None: + message += f" {msg}" + + warnings.warn( + message=message, + category=DeprecationWarning, + stacklevel=3, + ) + + super().__init__(*args, **kwargs) + + return Wrapper + + return depreciated diff --git a/OMPython/model_execution.py b/OMPython/model_execution.py index ebd4c011..d3b344d1 100644 --- a/OMPython/model_execution.py +++ b/OMPython/model_execution.py @@ -12,7 +12,6 @@ import re import subprocess from typing import Any, Optional -import warnings # define logger using the current module name as ID logger = logging.getLogger(__name__) @@ -27,14 +26,13 @@ class ModelExecutionException(Exception): @dataclasses.dataclass -class ModelExecutionData: +class ModelExecutionRun: """ - Data class to store the command line data for running a model executable in the OMC environment. + Data class to store the command line data for running a model executable. This definition is independent of the OMC + environment as only the executable is needed. - All data should be defined for the environment, where OMC is running (local, docker or WSL) - - To use this as a definition of an OMC simulation run, it has to be processed within - OMCProcess*.self_update(). This defines the attribute cmd_model_executable. + All data should be defined for the environment, where the executable was defined / is located. This is especially + important if OMPython and the executable are defined in different environments (docker or WSL). """ # cmd_path is the expected working directory cmd_path: str @@ -105,11 +103,12 @@ def run(self) -> int: return returncode -class ModelExecutionCmd: +class ModelExecutionConfig: """ - All information about a compiled model executable. This should include data about all structured parameters, i.e. - parameters which need a recompilation of the model. All non-structured parameters can be easily changed without - the need for recompilation. + This class collects all information about a compiled model executable. This includes data about all structured + parameters, i.e. parameters which need a recompilation of the model. All non-structured parameters can be easily + changed without the need for recompilation. The final result is an instance of class ModelExecutionRun - a + definition to run one simulation based on the compiled model executable. """ def __init__( @@ -261,7 +260,7 @@ def get_cmd_args(self) -> list[str]: return cmdl - def definition(self) -> ModelExecutionData: + def definition(self) -> ModelExecutionRun: """ Define all needed data to run the model executable. The data is stored in an OMCSessionRunData object. """ @@ -301,7 +300,7 @@ def definition(self) -> ModelExecutionData: if self._cmd_local: cmd_cwd_local = cmd_path.as_posix() - omc_run_data = ModelExecutionData( + omc_run_data = ModelExecutionRun( cmd_path=cmd_path.as_posix(), cmd_model_name=self._model_name, cmd_args=self.get_cmd_args(), @@ -314,45 +313,3 @@ def definition(self) -> ModelExecutionData: ) return omc_run_data - - @staticmethod - def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | numbers.Number]]: - """ - Parse a simflag definition; this is deprecated! - - The return data can be used as input for self.args_set(). - """ - warnings.warn( - message="The argument 'simflags' is depreciated and will be removed in future versions; " - "please use 'simargs' instead", - category=DeprecationWarning, - stacklevel=2, - ) - - simargs: dict[str, Optional[str | dict[str, Any] | numbers.Number]] = {} - - args = [s for s in simflags.split(' ') if s] - for arg in args: - if arg[0] != '-': - raise ModelExecutionException(f"Invalid simulation flag: {arg}") - arg = arg[1:] - parts = arg.split('=') - if len(parts) == 1: - simargs[parts[0]] = None - elif parts[0] == 'override': - override = '='.join(parts[1:]) - - override_dict = {} - for item in override.split(','): - kv = item.split('=') - if not 0 < len(kv) < 3: - raise ModelExecutionException(f"Invalid value for '-override': {override}") - if kv[0]: - try: - override_dict[kv[0]] = kv[1] - except (KeyError, IndexError) as ex: - raise ModelExecutionException(f"Invalid value for '-override': {override}") from ex - - simargs[parts[0]] = override_dict - - return simargs diff --git a/OMPython/modelica_doe_abc.py b/OMPython/modelica_doe_abc.py index e3ab8403..062f8833 100644 --- a/OMPython/modelica_doe_abc.py +++ b/OMPython/modelica_doe_abc.py @@ -13,7 +13,8 @@ from typing import Any, cast, Optional, Tuple from OMPython.model_execution import ( - ModelExecutionData, + ModelExecutionRun, + ModelExecutionException, ) from OMPython.om_session_abc import ( OMPathABC, @@ -138,7 +139,7 @@ def __init__( self._parameters = {} self._doe_def: Optional[dict[str, dict[str, Any]]] = None - self._doe_cmd: Optional[dict[str, ModelExecutionData]] = None + self._doe_cmd: Optional[dict[str, ModelExecutionRun]] = None def get_session(self) -> OMSessionABC: """ @@ -209,7 +210,7 @@ def prepare(self) -> int: } ) - self._mod.setParameters(sim_param_non_structural) + self._mod.setParameters(**sim_param_non_structural) mscmd = self._mod.simulate_cmd( result_file=resultfile, ) @@ -255,7 +256,7 @@ def get_doe_definition(self) -> Optional[dict[str, dict[str, Any]]]: """ return self._doe_def - def get_doe_command(self) -> Optional[dict[str, ModelExecutionData]]: + def get_doe_command(self) -> Optional[dict[str, ModelExecutionRun]]: """ Get the definitions of simulations commands to run for this DoE. """ @@ -310,8 +311,8 @@ def worker(worker_id, task_queue): returncode = cmd_definition.run() logger.info(f"[Worker {worker_id}] Simulation {resultpath.name} " f"finished with return code: {returncode}") - except ModelicaSystemError as ex: - logger.warning(f"Simulation error for {resultpath.name}: {ex}") + except ModelExecutionException as exc: + logger.warning(f"Simulation error for {resultpath.name}: {exc}") # Mark the task as done task_queue.task_done() diff --git a/OMPython/modelica_system_abc.py b/OMPython/modelica_system_abc.py index fcc31deb..41a67205 100644 --- a/OMPython/modelica_system_abc.py +++ b/OMPython/modelica_system_abc.py @@ -5,19 +5,20 @@ import abc import ast +import csv from dataclasses import dataclass import logging import numbers import os import re from typing import Any, Optional -import warnings import xml.etree.ElementTree as ET import numpy as np from OMPython.model_execution import ( - ModelExecutionCmd, + ModelExecutionConfig, + ModelExecutionException, ) from OMPython.om_session_abc import ( OMPathABC, @@ -189,7 +190,7 @@ def check_model_executable(self): Check if the model executable is working """ # check if the executable exists ... - om_cmd = ModelExecutionCmd( + om_cmd = ModelExecutionConfig( runpath=self.getWorkDirectory(), cmd_local=self._session.model_execution_local, cmd_windows=self._session.model_execution_windows, @@ -200,7 +201,10 @@ def check_model_executable(self): # ... by running it - output help for command help om_cmd.arg_set(key="help", val="help") cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() + try: + returncode = cmd_definition.run() + except ModelExecutionException as exc: + raise ModelicaSystemError(f"Cannot execute model: {exc}") from exc if returncode != 0: raise ModelicaSystemError("Model executable not working!") @@ -213,6 +217,15 @@ def _xmlparse(self, xml_file: OMPathABC): root = tree.getroot() if root is None: raise ModelicaSystemError(f"Cannot read XML file: {xml_file}") + # check OM version - force the version used by the model executable + if 'generationTool' in root.attrib: + generation_tool_version = self._parse_om_version(version=root.attrib['generationTool']) + if self._version != generation_tool_version: + logger.warning(f"Mismatch in OpenModelica version: {self._version!r} (OMSession) " + f"vs. {generation_tool_version!r} (model executable) " + f"- using {generation_tool_version!r}!") + self._version = generation_tool_version + for attr in root.iter('DefaultExperiment'): for key in ("startTime", "stopTime", "stepSize", "tolerance", "solver", "outputFormat"): @@ -579,16 +592,24 @@ def _parse_om_version(version: str) -> tuple[int, int, int]: def _process_override_data( self, - om_cmd: ModelExecutionCmd, + om_cmd: ModelExecutionConfig, override_file: OMPathABC, override_var: dict[str, str], override_sim: dict[str, str], + variable_filter: Optional[str] = None, ) -> None: """ Define the override parameters. As the definition of simulation specific override parameter changes with OM 1.26.0, version specific code is needed. Please keep in mind, that this will fail if OMC is not used to run the model executable. + + Including also override of variable filter settings. """ + + # define variable filter if defined (override any original setting) + if variable_filter is not None: + om_cmd.arg_set(key="variableFilter", val=variable_filter) + if len(override_var) == 0 and len(override_sim) == 0: return @@ -617,9 +638,8 @@ def _process_override_data( def simulate_cmd( self, result_file: OMPathABC, - simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, - ) -> ModelExecutionCmd: + ) -> ModelExecutionConfig: """ This method prepares the simulates model according to the simulation options. It returns an instance of ModelicaSystemCmd which can be used to run the simulation. @@ -633,7 +653,6 @@ def simulate_cmd( Parameters ---------- result_file - simflags simargs Returns @@ -641,7 +660,7 @@ def simulate_cmd( An instance if ModelicaSystemCmd to run the requested simulation. """ - om_cmd = ModelExecutionCmd( + om_cmd = ModelExecutionConfig( runpath=self.getWorkDirectory(), cmd_local=self._session.model_execution_local, cmd_windows=self._session.model_execution_windows, @@ -653,10 +672,6 @@ def simulate_cmd( # always define the result file to use om_cmd.arg_set(key="r", val=result_file.as_posix()) - # allow runtime simulation flags from user input - if simflags is not None: - om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) - if simargs: om_cmd.args_set(args=simargs) @@ -665,6 +680,7 @@ def simulate_cmd( override_file=result_file.parent / f"{result_file.stem}_override.txt", override_var=self._override_variables, override_sim=self._simulate_options_override, + variable_filter=self._variable_filter, ) if self._inputs: # if model has input quantities @@ -690,7 +706,6 @@ def simulate_cmd( def simulate( self, resultfile: Optional[str | os.PathLike] = None, - simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, ) -> None: """Simulate the model according to simulation options. @@ -699,16 +714,11 @@ def simulate( Args: resultfile: Path to a custom result file - simflags: String of extra command line flags for the model binary. - This argument is deprecated, use simargs instead. simargs: Dict with simulation runtime flags. Examples: mod.simulate() mod.simulate(resultfile="a.mat") - # set runtime simulation flags, deprecated - mod.simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") - # using simargs mod.simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "override": {"e": 0.3, "g": 10}}) """ @@ -727,7 +737,6 @@ def simulate( om_cmd = self.simulate_cmd( result_file=self._result_file, - simflags=simflags, simargs=simargs, ) @@ -736,7 +745,10 @@ def simulate( self._result_file.unlink() # ... run simulation ... cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() + try: + returncode = cmd_definition.run() + except ModelExecutionException as exc: + raise ModelicaSystemError(f"Cannot execute model: {exc}") from exc # and check returncode *AND* resultfile if returncode != 0 and self._result_file.is_file(): # check for an empty (=> 0B) result file which indicates a crash of the model executable @@ -752,49 +764,13 @@ def simulate( @staticmethod def _prepare_input_data( - input_args: Any, input_kwargs: dict[str, Any], ) -> dict[str, str]: """ Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. """ - - def prepare_str(str_in: str) -> dict[str, str]: - str_in = str_in.replace(" ", "") - key_val_list: list[str] = str_in.split("=") - if len(key_val_list) != 2: - raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") - - input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} - - return input_data_from_str - input_data: dict[str, str] = {} - for input_arg in input_args: - if isinstance(input_arg, str): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - input_data = input_data | prepare_str(input_arg) - elif isinstance(input_arg, list): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - - for item in input_arg: - if not isinstance(item, str): - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") - input_data = input_data | prepare_str(item) - elif isinstance(input_arg, dict): - input_data = input_data | input_arg - else: - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") - if len(input_kwargs): for key, val in input_kwargs.items(): # ensure all values are strings to align it on one type: dict[str, str] @@ -872,21 +848,17 @@ def isParameterChangeable( def setContinuous( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set continuous values. It can be called: - with a sequence of continuous name and assigning corresponding values as arguments as show in the example below: - usage - >>> setContinuous("Name=value") # depreciated - >>> setContinuous(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set continuous values. + usage: >>> setContinuous(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setContinuous(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -896,21 +868,17 @@ def setContinuous( def setParameters( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set parameter values. It can be called: - with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: - usage - >>> setParameters("Name=value") # depreciated - >>> setParameters(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set parameter values + usage: >>> setParameters(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setParameters(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -920,22 +888,17 @@ def setParameters( def setSimulationOptions( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set simulation options. It can be called: - with a sequence of simulation options name and assigning corresponding values as arguments as show in the - example below: - usage - >>> setSimulationOptions("Name=value") # depreciated - >>> setSimulationOptions(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set simulation options. + usage: >>> setSimulationOptions(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setSimulationOptions(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -945,22 +908,17 @@ def setSimulationOptions( def setLinearizationOptions( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set linearization options. It can be called: - with a sequence of linearization options name and assigning corresponding value as arguments as show in the - example below - usage - >>> setLinearizationOptions("Name=value") # depreciated - >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set linearization options. + usage: >>> setLinearizationOptions(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setLinearizationOptions(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -970,22 +928,17 @@ def setLinearizationOptions( def setOptimizationOptions( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set optimization options. It can be called: - with a sequence of optimization options name and assigning corresponding values as arguments as show in the - example below: - usage - >>> setOptimizationOptions("Name=value") # depreciated - >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set optimization options. + usage: >>> setOptimizationOptions(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setOptimizationOptions(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -993,25 +946,69 @@ def setOptimizationOptions( datatype="optimization-option", overridedata=None) + def set_variable_filter( + self, + variable_filter: Optional[str] = None, + escape: bool = False, + ) -> None: + """ + This method is used to set variable filters. If escape is True, all regex special characters are escaped. + """ + if variable_filter is None: + self._variable_filter = None + return + + if escape: + variable_filter = re.escape(variable_filter) + + # Validate filter_val as a regular expression + try: + re.compile(variable_filter) + except re.error as exc: + raise ModelicaSystemError(f"Invalid variable_filter regular expression: {variable_filter!r} ({exc})") + + self._variable_filter = variable_filter + + @staticmethod + def toInputs(data: dict[str, list[float]]) -> dict[str, list[tuple[float, float]]]: + """ + Converts a dictionary of lists (from pandas DataFrame.to_dict(orient='list')) + into the OMPython setInputs input format. + + Example: mod.setInputs(**toInputs(pdf.to_dict(orient='list'))) + + Assumes the dictionary contains a key named 'time'. + """ + if "time" not in data: + raise ValueError("The provided data must contain a 'time' key.") + + time_series = data["time"] + + inputs = { + var_name: list(zip(time_series, values)) + for var_name, values in data.items() + if var_name != "time" + } + + return inputs + def setInputs( self, *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set input values. It can be called with a sequence of input name and assigning - corresponding values as arguments as show in the example below. Compared to other set*() methods this is a - special case as value could be a list of tuples - these are converted to a string in _prepare_input_data() - and restored here via ast.literal_eval(). + This method is used to set input values. - >>> setInputs("Name=value") # depreciated - >>> setInputs(["Name1=value1","Name2=value2"]) # depreciated + Compared to other set*() methods this is a special case as value could be a list of tuples - these are + converted to a string in _prepare_input_data() and restored here via ast.literal_eval(). + usage: >>> setInputs(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setInputs(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) for key, val in inputdata.items(): if key not in self._inputs: @@ -1021,32 +1018,80 @@ def setInputs( raise ModelicaSystemError(f"Invalid data in input for {repr(key)}: {repr(val)}") val_evaluated = ast.literal_eval(val) - if isinstance(val_evaluated, (int, float)): self._inputs[key] = [(float(self._simulate_options["startTime"]), float(val)), (float(self._simulate_options["stopTime"]), float(val))] elif isinstance(val_evaluated, list): - if not all([isinstance(item, tuple) for item in val_evaluated]): + if not all(isinstance(item, tuple) for item in val_evaluated): raise ModelicaSystemError("Value for setInput() must be in tuple format; " f"got {repr(val_evaluated)}") - if val_evaluated != sorted(val_evaluated, key=lambda x: x[0]): - raise ModelicaSystemError("Time value should be in increasing order; " - f"got {repr(val_evaluated)}") + val_evaluated_checked: list[tuple[float, float]] = [] for item in val_evaluated: - if item[0] < float(self._simulate_options["startTime"]): - raise ModelicaSystemError(f"Time value in {repr(item)} of {repr(val_evaluated)} is less " - "than the simulation start time") if len(item) != 2: raise ModelicaSystemError(f"Value {repr(item)} of {repr(val_evaluated)} " "is in incorrect format!") - self._inputs[key] = val_evaluated + try: + val_evaluated_checked.append((float(item[0]), float(item[1]))) + except (ValueError, TypeError) as exc: + raise ModelicaSystemError("All elements of the input for setInput() should be convertible to " + "type Tuple[float, float] - " + f"found [{repr(item[0])}, {repr(item[1])}] with types " + f"[{type(item[0])}, {type(item[1])}]!") from exc + + if item[0] < float(self._simulate_options["startTime"]): + raise ModelicaSystemError(f"Time value in {repr(item)} of {repr(val_evaluated)} is less " + "than the simulation start time") + + if val_evaluated_checked != sorted(val_evaluated_checked, key=lambda x: x[0]): + raise ModelicaSystemError("Time value should be in increasing order; " + f"got {repr(val_evaluated_checked)}") + + self._inputs[key] = val_evaluated_checked else: raise ModelicaSystemError(f"Data cannot be evaluated for {repr(key)}: {repr(val)}") return True + def setInputsCSV( + self, + csvfile: os.PathLike, + ) -> None: + """ + Read content from a CSV file and use it to define the time based input data. + """ + + # real type is 'dict[str, list[tuple[float, float]]]' - 'dict[str, Any]' is used to make setInputs() happy + inputs: dict[str, Any] = {} + try: + with open(csvfile, newline='') as csvfh: + dialect = csv.Sniffer().sniff(csvfh.read(1024)) + csvfh.seek(0) + reader = csv.DictReader(csvfh, dialect=dialect) + + keys: list[str] = [] + for idx, line in enumerate(reader): + if not keys: + keys = list(line.keys()) + for var in keys[1:]: + if var in inputs: + raise ModelicaSystemError(f"Error reading {csvfile}: duplicated column {var}!") + inputs[var] = [] + try: + # use key[0] as time; all other columns use the header as name + for var in keys[1:]: + inputs[var].append((float(line[keys[0]]), float(line[var]))) + except (ValueError, TypeError) as exc2: + raise ModelicaSystemError(f"Invalid value reading {csvfile} line {idx}/{var}: " + f"{line}!") from exc2 + + except IOError as exc1: + raise ModelicaSystemError(f"Error reading {csvfile}: {exc1}") from exc1 + + if inputs: + self.setInputs(**inputs) + def _createCSVData(self, csvfile: Optional[OMPathABC] = None) -> OMPathABC: """ Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument, @@ -1105,7 +1150,6 @@ def _createCSVData(self, csvfile: Optional[OMPathABC] = None) -> OMPathABC: def linearize( self, lintime: Optional[float] = None, - simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, ) -> LinearizationResult: """Linearize the model according to linearization options. @@ -1114,8 +1158,6 @@ def linearize( Args: lintime: Override "stopTime" value. - simflags: String of extra command line flags for the model binary. - This argument is deprecated, use simargs instead. simargs: A dict with command line flags and possible options; example: "simargs={'csvInput': 'a.csv'}" Returns: @@ -1134,7 +1176,7 @@ def linearize( "use ModelicaSystemOMC() to build the model first" ) - om_cmd = ModelExecutionCmd( + om_cmd = ModelExecutionConfig( runpath=self.getWorkDirectory(), cmd_local=self._session.model_execution_local, cmd_windows=self._session.model_execution_windows, @@ -1148,6 +1190,7 @@ def linearize( override_file=self.getWorkDirectory() / f'{self._model_name}_override_linear.txt', override_var=self._override_variables, override_sim=self._linearization_options, + variable_filter=self._variable_filter, ) if self._inputs: @@ -1168,10 +1211,6 @@ def linearize( f"<= lintime <= {self._linearization_options['stopTime']}") om_cmd.arg_set(key="l", val=str(lintime)) - # allow runtime simulation flags from user input - if simflags is not None: - om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) - if simargs: om_cmd.args_set(args=simargs) @@ -1180,7 +1219,10 @@ def linearize( linear_file.unlink(missing_ok=True) cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() + try: + returncode = cmd_definition.run() + except ModelExecutionException as exc: + raise ModelicaSystemError(f"Cannot execute model: {exc}") from exc if returncode != 0: raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") if not linear_file.is_file(): diff --git a/OMPython/modelica_system_omc.py b/OMPython/modelica_system_omc.py index 34805e0f..4e0f14ce 100644 --- a/OMPython/modelica_system_omc.py +++ b/OMPython/modelica_system_omc.py @@ -126,7 +126,7 @@ def model( # set variables self._model_name = model_name # Model class name self._libraries = libraries # may be needed if model is derived from other model - self._variable_filter = variable_filter + self.set_variable_filter(variable_filter=variable_filter, escape=True) if self._libraries: self._loadLibrary(libraries=self._libraries) diff --git a/OMPython/om_session_abc.py b/OMPython/om_session_abc.py index 70e897d7..385ab3be 100644 --- a/OMPython/om_session_abc.py +++ b/OMPython/om_session_abc.py @@ -7,10 +7,8 @@ import abc import logging -import os import pathlib import platform -import sys from typing import Any, Optional import uuid @@ -26,151 +24,110 @@ class OMSessionException(Exception): """ -# due to the compatibility layer to Python < 3.12, the OM(C)Path classes must be hidden behind the following if -# conditions. This is also the reason for OMPathABC, a simple base class to be used in ModelicaSystem* classes. -# Reason: before Python 3.12, pathlib.PurePosixPath can not be derived from; therefore, OMPathABC is not possible -if sys.version_info < (3, 12): - class _OMPathCompatibility(pathlib.Path): +class OMPathABC(pathlib.PurePosixPath, metaclass=abc.ABCMeta): + """ + Implementation of a basic (PurePosix)Path object to be used within OMPython. The derived classes can use OMC as + backend and - thus - work on different configurations like docker or WSL. The connection to OMC is provided via + an instances of classes derived from BaseSession. + + PurePosixPath is selected as it covers all but Windows systems (Linux, docker, WSL). However, the code is + written such that possible Windows system are taken into account. Nevertheless, the overall functionality is + limited compared to standard pathlib.Path objects. + """ + + def __init__(self, *path, session: OMSessionABC) -> None: + super().__init__(*path) + self._session = session + + def get_session(self) -> OMSessionABC: + """ + Get session definition used for this instance of OMPath. + """ + return self._session + + def with_segments(self, *pathsegments) -> OMPathABC: """ - Compatibility class for OMPathABC in Python < 3.12. This allows to run all code which uses OMPathABC (mainly - ModelicaSystem) on these Python versions. There are remaining limitation as only local execution is possible. + Create a new OMCPath object with the given path segments. + + The original definition of Path is overridden to ensure the session data is set. """ + return type(self)(*pathsegments, session=self._session) - # modified copy of pathlib.Path.__new__() definition - def __new__(cls, *args, **kwargs): - logger.warning("Python < 3.12 - using a version of class OMCPath " - "based on pathlib.Path for local usage only.") + @abc.abstractmethod + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ - if cls is _OMPathCompatibility: - cls = _OMPathCompatibilityWindows if os.name == 'nt' else _OMPathCompatibilityPosix - self = cls._from_parts(args) - if not self._flavour.is_supported: - raise NotImplementedError(f"cannot instantiate {cls.__name__} on your system") - return self + @abc.abstractmethod + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ - def size(self) -> int: - """ - Needed compatibility function to have the same interface as OMCPathReal - """ - return self.stat().st_size + @abc.abstractmethod + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. + """ - class _OMPathCompatibilityPosix(pathlib.PosixPath, _OMPathCompatibility): + @abc.abstractmethod + def read_text(self, encoding=None, errors=None, newline=None) -> str: """ - Compatibility class for OMCPath on Posix systems (Python < 3.12) + Read the content of the file represented by this path as text. """ - class _OMPathCompatibilityWindows(pathlib.WindowsPath, _OMPathCompatibility): + @abc.abstractmethod + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> int: """ - Compatibility class for OMCPath on Windows systems (Python < 3.12) + Write text data to the file represented by this path. + """ + + @abc.abstractmethod + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: """ + Create a directory at the path represented by this class. - OMPathABC = _OMPathCompatibility - -else: - class OMPathABC(pathlib.PurePosixPath, metaclass=abc.ABCMeta): - """ - Implementation of a basic (PurePosix)Path object to be used within OMPython. The derived classes can use OMC as - backend and - thus - work on different configurations like docker or WSL. The connection to OMC is provided via - an instances of classes derived from BaseSession. - - PurePosixPath is selected as it covers all but Windows systems (Linux, docker, WSL). However, the code is - written such that possible Windows system are taken into account. Nevertheless, the overall functionality is - limited compared to standard pathlib.Path objects. - """ - - def __init__(self, *path, session: OMSessionABC) -> None: - super().__init__(*path) - self._session = session - - def get_session(self) -> OMSessionABC: - """ - Get session definition used for this instance of OMPath. - """ - return self._session - - def with_segments(self, *pathsegments) -> OMPathABC: - """ - Create a new OMCPath object with the given path segments. - - The original definition of Path is overridden to ensure the session data is set. - """ - return type(self)(*pathsegments, session=self._session) - - @abc.abstractmethod - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - - @abc.abstractmethod - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - - @abc.abstractmethod - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. - """ - - @abc.abstractmethod - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - - @abc.abstractmethod - def write_text(self, data: str) -> int: - """ - Write text data to the file represented by this path. - """ - - @abc.abstractmethod - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - - @abc.abstractmethod - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - - @abc.abstractmethod - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ - - @abc.abstractmethod - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. - """ - - def absolute(self) -> OMPathABC: - """ - Resolve the path to an absolute path. Just a wrapper for resolve(). - """ - return self.resolve() - - def exists(self) -> bool: - """ - Semi replacement for pathlib.Path.exists(). - """ - return self.is_file() or self.is_dir() - - @abc.abstractmethod - def size(self) -> int: - """ - Get the size of the file in bytes - this is an extra function and the best we can do using OMC. - """ + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + + @abc.abstractmethod + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + + @abc.abstractmethod + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + + @abc.abstractmethod + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. + """ + + def absolute(self) -> OMPathABC: + """ + Resolve the path to an absolute path. Just a wrapper for resolve(). + """ + return self.resolve() + + def exists(self) -> bool: + """ + Semi replacement for pathlib.Path.exists(). + """ + return self.is_file() or self.is_dir() + + @abc.abstractmethod + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ class PostInitCaller(type): diff --git a/OMPython/om_session_omc.py b/OMPython/om_session_omc.py index 6626cd17..b2698181 100644 --- a/OMPython/om_session_omc.py +++ b/OMPython/om_session_omc.py @@ -21,7 +21,6 @@ import time from typing import Any, Optional, Tuple import uuid -import warnings import psutil import pyparsing @@ -39,151 +38,154 @@ # define logger using the current module name as ID logger = logging.getLogger(__name__) -# due to the compatibility layer to Python < 3.12, the OM(C)Path classes must be hidden behind the following if -# conditions. This is also the reason for OMPathABC, a simple base class to be used in ModelicaSystem* classes. -# Reason: before Python 3.12, pathlib.PurePosixPath can not be derived from; therefore, OMPathABC is not possible -if sys.version_info < (3, 12): - OMCPath = OMPathABC - -else: - class _OMCPath(OMPathABC): + + +class OMCPath(OMPathABC): + """ + Implementation of a OMPathABC using OMC as backend. The connection to OMC is provided via an instances of an + OMCSession* classes. + """ + + def is_file(self, *, follow_symlinks=True) -> bool: """ - Implementation of a OMPathABC using OMC as backend. The connection to OMC is provided via an instances of an - OMCSession* classes. + Check if the path is a regular file. """ + del follow_symlinks - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - retval = self.get_session().sendExpression(expr=f'regularFileExists("{self.as_posix()}")') - if not isinstance(retval, bool): - raise OMSessionException(f"Invalid return value for is_file(): {retval} - expect bool") - return retval - - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - retval = self.get_session().sendExpression(expr=f'directoryExists("{self.as_posix()}")') - if not isinstance(retval, bool): - raise OMSessionException(f"Invalid return value for is_dir(): {retval} - expect bool") - return retval - - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. Special handling to differentiate Windows and Posix definitions. - """ - if self._session.model_execution_windows and self._session.model_execution_local: - return pathlib.PureWindowsPath(self.as_posix()).is_absolute() - return pathlib.PurePosixPath(self.as_posix()).is_absolute() - - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - retval = self.get_session().sendExpression(expr=f'readFile("{self.as_posix()}")') - if not isinstance(retval, str): - raise OMSessionException(f"Invalid return value for read_text(): {retval} - expect str") - return retval - - def write_text(self, data: str) -> int: - """ - Write text data to the file represented by this path. - """ - if not isinstance(data, str): - raise TypeError(f"data must be str, not {data.__class__.__name__}") - - data_omc = self._session.escape_str(data) - self._session.sendExpression(expr=f'writeFile("{self.as_posix()}", "{data_omc}", false);') - - return len(data) - - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - if self.is_dir() and not exist_ok: - raise FileExistsError(f"Directory {self.as_posix()} already exists!") - - if not self._session.sendExpression(expr=f'mkdir("{self.as_posix()}")'): - raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") - - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - cwd_str = self._session.sendExpression(expr='cd()') - return type(self)(cwd_str, session=self._session) - - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ - res = self._session.sendExpression(expr=f'deleteFile("{self.as_posix()}")') - if not res and not missing_ok: - raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") - - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. This is done based on available OMC functions. - """ - if strict and not (self.is_file() or self.is_dir()): - raise OMSessionException(f"Path {self.as_posix()} does not exist!") - - if self.is_file(): - pathstr_resolved = self._omc_resolve(self.parent.as_posix()) - omcpath_resolved = self._session.omcpath(pathstr_resolved) / self.name - elif self.is_dir(): - pathstr_resolved = self._omc_resolve(self.as_posix()) - omcpath_resolved = self._session.omcpath(pathstr_resolved) - else: - raise OMSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + retval = self.get_session().sendExpression(expr=f'regularFileExists("{self.as_posix()}")') + if not isinstance(retval, bool): + raise OMSessionException(f"Invalid return value for is_file(): {retval} - expect bool") + return retval - if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): - raise OMSessionException(f"OMCPath resolve failed for {self.as_posix()} - path does not exist!") + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ + del follow_symlinks - return omcpath_resolved + retval = self.get_session().sendExpression(expr=f'directoryExists("{self.as_posix()}")') + if not isinstance(retval, bool): + raise OMSessionException(f"Invalid return value for is_dir(): {retval} - expect bool") + return retval - def _omc_resolve(self, pathstr: str) -> str: - """ - Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd - within OMC. - """ - expr = ('omcpath_cwd := cd(); ' - f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring - 'cd(omcpath_cwd)') + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. Special handling to differentiate Windows and Posix definitions. + """ + if self._session.model_execution_windows and self._session.model_execution_local: + return pathlib.PureWindowsPath(self.as_posix()).is_absolute() + return pathlib.PurePosixPath(self.as_posix()).is_absolute() - try: - retval = self.get_session().sendExpression(expr=expr, parsed=False) - if not isinstance(retval, str): - raise OMSessionException(f"Invalid return value for _omc_resolve(): {retval} - expect str") - result_parts = retval.split('\n') - pathstr_resolved = result_parts[1] - pathstr_resolved = pathstr_resolved[1:-1] # remove quotes - except OMSessionException as ex: - raise OMSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + """ + del encoding, errors, newline + + retval = self.get_session().sendExpression(expr=f'readFile("{self.as_posix()}")') + if not isinstance(retval, str): + raise OMSessionException(f"Invalid return value for read_text(): {retval} - expect str") + return retval + + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> int: + """ + Write text data to the file represented by this path. + """ + del encoding, errors, newline + + if not isinstance(data, str): + raise TypeError(f"data must be str, not {data.__class__.__name__}") - return pathstr_resolved + data_omc = self._session.escape_str(data) + self._session.sendExpression(expr=f'writeFile("{self.as_posix()}", "{data_omc}", false);') - def size(self) -> int: - """ - Get the size of the file in bytes - this is an extra function and the best we can do using OMC. - """ - if not self.is_file(): - raise OMSessionException(f"Path {self.as_posix()} is not a file!") + return len(data) - res = self._session.sendExpression(expr=f'stat("{self.as_posix()}")') - if res[0]: - return int(res[1]) + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + del mode - raise OMSessionException(f"Error reading file size for path {self.as_posix()}!") + if self.is_dir() and not exist_ok: + raise FileExistsError(f"Directory {self.as_posix()} already exists!") - OMCPath = _OMCPath + if not self._session.sendExpression(expr=f'mkdir("{self.as_posix()}")'): + raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") + + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + cwd_str = self._session.sendExpression(expr='cd()') + return type(self)(cwd_str, session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + res = self._session.sendExpression(expr=f'deleteFile("{self.as_posix()}")') + if not res and not missing_ok: + raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") + + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + if strict and not (self.is_file() or self.is_dir()): + raise OMSessionException(f"Path {self.as_posix()} does not exist!") + + if self.is_file(): + pathstr_resolved = self._omc_resolve(self.parent.as_posix()) + omcpath_resolved = self._session.omcpath(pathstr_resolved) / self.name + elif self.is_dir(): + pathstr_resolved = self._omc_resolve(self.as_posix()) + omcpath_resolved = self._session.omcpath(pathstr_resolved) + else: + raise OMSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + + if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): + raise OMSessionException(f"OMCPath resolve failed for {self.as_posix()} - path does not exist!") + + return omcpath_resolved + + def _omc_resolve(self, pathstr: str) -> str: + """ + Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd + within OMC. + """ + expr = ('omcpath_cwd := cd(); ' + f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring + 'cd(omcpath_cwd)') + + try: + retval = self.get_session().sendExpression(expr=expr, parsed=False) + if not isinstance(retval, str): + raise OMSessionException(f"Invalid return value for _omc_resolve(): {retval} - expect str") + result_parts = retval.split('\n') + pathstr_resolved = result_parts[1] + pathstr_resolved = pathstr_resolved[1:-1] # remove quotes + except OMSessionException as ex: + raise OMSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + + return pathstr_resolved + + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ + if not self.is_file(): + raise OMSessionException(f"Path {self.as_posix()} is not a file!") + + res = self._session.sendExpression(expr=f'stat("{self.as_posix()}")') + if res[0]: + return int(res[1]) + + raise OMSessionException(f"Error reading file size for path {self.as_posix()}!") class OMCSessionABC(OMSessionABC, metaclass=abc.ABCMeta): @@ -377,16 +379,6 @@ def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC return self._tempdir(tempdir_base=tempdir_base) - def execute(self, command: str): - warnings.warn( - message="This function is depreciated and will be removed in future versions; " - "please use sendExpression() instead", - category=DeprecationWarning, - stacklevel=2, - ) - - return self.sendExpression(command, parsed=False) - def sendExpression(self, expr: str, parsed: bool = True) -> Any: """ Send an expression to the OMC server and return the result. diff --git a/OMPython/om_session_runner.py b/OMPython/om_session_runner.py index fc8e5ac8..3bd3abeb 100644 --- a/OMPython/om_session_runner.py +++ b/OMPython/om_session_runner.py @@ -9,7 +9,6 @@ import logging import pathlib import subprocess -import sys import tempfile from typing import Any, Optional, Type @@ -22,275 +21,283 @@ # define logger using the current module name as ID logger = logging.getLogger(__name__) -# due to the compatibility layer to Python < 3.12, the OM(C)Path classes must be hidden behind the following if -# conditions. This is also the reason for OMPathABC, a simple base class to be used in ModelicaSystem* classes. -# Reason: before Python 3.12, pathlib.PurePosixPath can not be derived from; therefore, OMPathABC is not possible -if sys.version_info < (3, 12): - OMPathRunnerABC = OMPathABC - OMPathRunnerLocal = OMPathABC - OMPathRunnerBash = OMPathABC - -else: - class OMPathRunnerABC(OMPathABC, metaclass=abc.ABCMeta): - """ - Base function for OMPath definitions *without* OMC server - """ - - def _path(self) -> pathlib.Path: - return pathlib.Path(self.as_posix()) - - class _OMPathRunnerLocal(OMPathRunnerABC): - """ - Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run - locally without any usage of OMC. - - This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not - the correct implementation on Windows systems. To get a valid Windows representation of the path, use the - conversion via pathlib.Path(.as_posix()). - """ - - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - return self._path().is_file() - - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - return self._path().is_dir() - - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. - """ - return self._path().is_absolute() - - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - return self._path().read_text(encoding='utf-8') - - def write_text(self, data: str): - """ - Write text data to the file represented by this path. - """ - if not isinstance(data, str): - raise TypeError(f"data must be str, not {data.__class__.__name__}") - - return self._path().write_text(data=data, encoding='utf-8') - - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - self._path().mkdir(parents=parents, exist_ok=exist_ok) - - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - return type(self)(self._path().cwd().as_posix(), session=self._session) - - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ - self._path().unlink(missing_ok=missing_ok) - - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. This is done based on available OMC functions. - """ - path_resolved = self._path().resolve(strict=strict) - return type(self)(path_resolved, session=self._session) - - def size(self) -> int: - """ - Get the size of the file in bytes - implementation based on pathlib.Path. - """ - if not self.is_file(): - raise OMSessionException(f"Path {self.as_posix()} is not a file!") - - path = self._path() - return path.stat().st_size - - class _OMPathRunnerBash(OMPathRunnerABC): - """ - Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run - locally without any usage of OMC. The special case of this class is the usage of POSIX bash to run all the - commands. Thus, it can be used in WSL or docker. - - This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not - the correct implementation on Windows systems. To get a valid Windows representation of the path, use the - conversion via pathlib.Path(.as_posix()). - """ - - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'test -f "{self.as_posix()}"'] - try: - subprocess.run(cmdl, check=True) - return True - except subprocess.CalledProcessError: - return False - - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'test -d "{self.as_posix()}"'] +class OMPathRunnerABC(OMPathABC, metaclass=abc.ABCMeta): + """ + Base function for OMPath definitions *without* OMC server + """ - try: - subprocess.run(cmdl, check=True) - return True - except subprocess.CalledProcessError: - return False + def _path(self) -> pathlib.Path: + return pathlib.Path(self.as_posix()) - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'case "{self.as_posix()}" in /*) exit 0;; *) exit 1;; esac'] +class OMPathRunnerLocal(OMPathRunnerABC): + """ + Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run + locally without any usage of OMC. - try: - subprocess.check_call(cmdl) - return True - except subprocess.CalledProcessError: - return False - - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'cat "{self.as_posix()}"'] - - result = subprocess.run(cmdl, capture_output=True, check=True) - if result.returncode == 0: - return result.stdout.decode('utf-8') - raise FileNotFoundError(f"Cannot read file: {self.as_posix()}") - - def write_text(self, data: str) -> int: - """ - Write text data to the file represented by this path. - """ - if not isinstance(data, str): - raise TypeError(f"data must be str, not {data.__class__.__name__}") - - data_escape = self._session.escape_str(data) - - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'printf %s "{data_escape}" > "{self.as_posix()}"'] + This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not + the correct implementation on Windows systems. To get a valid Windows representation of the path, use the + conversion via pathlib.Path(.as_posix()). + """ - try: - subprocess.run(cmdl, check=True) - return len(data) - except subprocess.CalledProcessError as exc: - raise IOError(f"Error writing data to file {self.as_posix()}!") from exc - - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - - if self.is_file(): - raise OSError(f"The given path {self.as_posix()} exists and is a file!") - if self.is_dir() and not exist_ok: - raise OSError(f"The given path {self.as_posix()} exists and is a directory!") - if not parents and not self.parent.is_dir(): - raise FileNotFoundError(f"Parent directory of {self.as_posix()} does not exists!") - - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'mkdir -p "{self.as_posix()}"'] + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + del follow_symlinks - try: - subprocess.run(cmdl, check=True) - except subprocess.CalledProcessError as exc: - raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") from exc + return self._path().is_file() + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ + del follow_symlinks + + return self._path().is_dir() + + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. + """ + return self._path().is_absolute() + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + """ + del encoding, errors, newline + + return self._path().read_text(encoding='utf-8') + + def write_text(self, data: str, encoding=None, errors=None, newline=None): + """ + Write text data to the file represented by this path. + """ + del encoding, errors, newline + + if not isinstance(data, str): + raise TypeError(f"data must be str, not {data.__class__.__name__}") + + return self._path().write_text(data=data, encoding='utf-8') + + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + del mode + + self._path().mkdir(parents=parents, exist_ok=exist_ok) + + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + return type(self)(self._path().cwd().as_posix(), session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + self._path().unlink(missing_ok=missing_ok) + + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + path_resolved = self._path().resolve(strict=strict) + return type(self)(path_resolved, session=self._session) + + def size(self) -> int: + """ + Get the size of the file in bytes - implementation based on pathlib.Path. + """ + if not self.is_file(): + raise OMSessionException(f"Path {self.as_posix()} is not a file!") + + path = self._path() + return path.stat().st_size + + +class OMPathRunnerBash(OMPathRunnerABC): + """ + Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run + locally without any usage of OMC. The special case of this class is the usage of POSIX bash to run all the + commands. Thus, it can be used in WSL or docker. - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', 'pwd'] + This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not + the correct implementation on Windows systems. To get a valid Windows representation of the path, use the + conversion via pathlib.Path(.as_posix()). + """ + + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + del follow_symlinks + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'test -f "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + return True + except subprocess.CalledProcessError: + return False + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'test -d "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + return True + except subprocess.CalledProcessError: + return False + + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. + """ + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'case "{self.as_posix()}" in /*) exit 0;; *) exit 1;; esac'] + + try: + subprocess.check_call(cmdl) + return True + except subprocess.CalledProcessError: + return False + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + """ + del encoding, errors, newline + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'cat "{self.as_posix()}"'] + + result = subprocess.run(cmdl, capture_output=True, check=True) + if result.returncode == 0: + return result.stdout.decode('utf-8') + raise FileNotFoundError(f"Cannot read file: {self.as_posix()}") + + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> int: + """ + Write text data to the file represented by this path. + """ + del encoding, errors, newline + + if not isinstance(data, str): + raise TypeError(f"data must be str, not {data.__class__.__name__}") - result = subprocess.run(cmdl, capture_output=True, text=True, check=True) - if result.returncode == 0: - return type(self)(result.stdout.strip(), session=self._session) - raise OSError("Can not get current work directory ...") + data_escape = self._session.escape_str(data) - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'printf %s "{data_escape}" > "{self.as_posix()}"'] - if not self.is_file(): - raise OSError(f"Can not unlink a directory: {self.as_posix()}!") + try: + subprocess.run(cmdl, check=True) + return len(data) + except subprocess.CalledProcessError as exc: + raise IOError(f"Error writing data to file {self.as_posix()}!") from exc - if not self.is_file(): - return + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + del mode + + if self.is_file(): + raise OSError(f"The given path {self.as_posix()} exists and is a file!") + if self.is_dir() and not exist_ok: + raise OSError(f"The given path {self.as_posix()} exists and is a directory!") + if not parents and not self.parent.is_dir(): + raise FileNotFoundError(f"Parent directory of {self.as_posix()} does not exists!") + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'mkdir -p "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + except subprocess.CalledProcessError as exc: + raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") from exc + + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', 'pwd'] + + result = subprocess.run(cmdl, capture_output=True, text=True, check=True) + if result.returncode == 0: + return type(self)(result.stdout.strip(), session=self._session) + raise OSError("Can not get current work directory ...") + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + + if not self.is_file(): + raise OSError(f"Can not unlink a directory: {self.as_posix()}!") + + if not self.is_file(): + return + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'rm "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + except subprocess.CalledProcessError as exc: + raise OSError(f"Cannot unlink file {self.as_posix()}: {exc}") from exc + + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'readlink -f "{self.as_posix()}"'] + + result = subprocess.run(cmdl, capture_output=True, text=True, check=True) + if result.returncode == 0: + return type(self)(result.stdout.strip(), session=self._session) + raise FileNotFoundError(f"Cannot resolve path: {self.as_posix()}") + + def size(self) -> int: + """ + Get the size of the file in bytes - implementation based on pathlib.Path. + """ + if not self.is_file(): + raise OMSessionException(f"Path {self.as_posix()} is not a file!") - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'rm "{self.as_posix()}"'] + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'stat -c %s "{self.as_posix()}"'] + result = subprocess.run(cmdl, capture_output=True, text=True, check=True) + stdout = result.stdout.strip() + if result.returncode == 0: try: - subprocess.run(cmdl, check=True) - except subprocess.CalledProcessError as exc: - raise OSError(f"Cannot unlink file {self.as_posix()}: {exc}") from exc - - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. This is done based on available OMC functions. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'readlink -f "{self.as_posix()}"'] - - result = subprocess.run(cmdl, capture_output=True, text=True, check=True) - if result.returncode == 0: - return type(self)(result.stdout.strip(), session=self._session) - raise FileNotFoundError(f"Cannot resolve path: {self.as_posix()}") - - def size(self) -> int: - """ - Get the size of the file in bytes - implementation based on pathlib.Path. - """ - if not self.is_file(): - raise OMSessionException(f"Path {self.as_posix()} is not a file!") - - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'stat -c %s "{self.as_posix()}"'] - - result = subprocess.run(cmdl, capture_output=True, text=True, check=True) - stdout = result.stdout.strip() - if result.returncode == 0: - try: - return int(stdout) - except ValueError as exc: - raise OSError(f"Invalid return value for file size ({self.as_posix()}): {stdout}") from exc - else: - raise OSError(f"Cannot get size for file {self.as_posix()}") - - OMPathRunnerLocal = _OMPathRunnerLocal - OMPathRunnerBash = _OMPathRunnerBash + return int(stdout) + except ValueError as exc: + raise OSError(f"Invalid return value for file size ({self.as_posix()}): {stdout}") from exc + else: + raise OSError(f"Cannot get size for file {self.as_posix()}") class OMSessionRunnerABC(OMSessionABC, metaclass=abc.ABCMeta): diff --git a/README.md b/README.md index a9cf3bdc..d65528fc 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ OMPython is a Python interface that uses ZeroMQ to communicate with OpenModelica ## Dependencies -- Python 3.x supported -- PyZMQ is required + - Python >= 3.12 + - Additional packages: numpy, psutil, pyparsing and pyzmq ## Installation @@ -49,8 +49,8 @@ help(OMPython) ``` ```python -from OMPython import OMCSessionLocal -omc = OMCSessionLocal() +import OMPython +omc = OMPython.OMCSessionLocal() omc.sendExpression("getVersion()") ``` diff --git a/tests/test_ModelExecutionCmd.py b/tests/test_ModelExecutionCmd.py index db5aadeb..19111070 100644 --- a/tests/test_ModelExecutionCmd.py +++ b/tests/test_ModelExecutionCmd.py @@ -24,7 +24,7 @@ def mscmd_firstorder(model_firstorder): model_name="M", ) - mscmd = OMPython.ModelExecutionCmd( + mscmd = OMPython.ModelExecutionConfig( runpath=mod.getWorkDirectory(), cmd_local=mod.get_session().model_execution_local, cmd_windows=mod.get_session().model_execution_windows, @@ -38,17 +38,19 @@ def mscmd_firstorder(model_firstorder): def test_simflags(mscmd_firstorder): mscmd = mscmd_firstorder - mscmd.args_set({ + mscmd.args_set(args={ + "override": { + 'b': 2, + 'a': 4, + }, + "noRestart": None, "noEventEmit": None, - "override": {'b': 2} }) - with pytest.deprecated_call(): - mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) assert mscmd.get_cmd_args() == [ '-noEventEmit', '-noRestart', - '-override=a=1,b=2,x=3', + '-override=a=4,b=2', ] mscmd.args_set({ @@ -58,5 +60,5 @@ def test_simflags(mscmd_firstorder): assert mscmd.get_cmd_args() == [ '-noEventEmit', '-noRestart', - '-override=a=1,x=3', + '-override=a=4', ] diff --git a/tests/test_ModelicaSystemOMC.py b/tests/test_ModelicaSystemOMC.py index c63b92e1..a5e996c2 100644 --- a/tests/test_ModelicaSystemOMC.py +++ b/tests/test_ModelicaSystemOMC.py @@ -64,9 +64,8 @@ def test_setParameters(): model_name="BouncingBall", ) - # method 1 (test depreciated variants) - mod.setParameters("e=1.234") - mod.setParameters(["g=321.0"]) + mod.setParameters(e=1.234) + mod.setParameters(g=321.0) assert mod.getParameters("e") == ["1.234"] assert mod.getParameters("g") == ["321.0"] assert mod.getParameters() == { @@ -76,7 +75,6 @@ def test_setParameters(): with pytest.raises(KeyError): mod.getParameters("thisParameterDoesNotExist") - # method 2 (new style) pvals = {"e": 21.3, "g": 0.12} mod.setParameters(**pvals) assert mod.getParameters() == { @@ -208,6 +206,53 @@ def _run_getSolutions(mod): assert np.isclose(x, x_analytical, rtol=1e-4).all() +def test_variable_filter(model_firstorder): + mod = OMPython.ModelicaSystemOMC() + mod.model( + model_file=model_firstorder, + model_name="M", + ) + + a = -1 + tau = -1 / a + stopTime = 5 * tau + + simOptions = {"stopTime": stopTime, "stepSize": 0.1, "tolerance": 1e-8} + mod.setSimulationOptions(**simOptions) + mod.simulate() + sol_names1 = mod.getSolutions() + assert isinstance(sol_names1, tuple) + assert sol_names1 == ('a', 'der(x)', 'time', 'x') + + mod.set_variable_filter(variable_filter='x') + mod.setSimulationOptions(stopTime=2.0) + mod.simulate() + sol_names2 = mod.getSolutions() + assert isinstance(sol_names2, tuple) + assert sol_names2 == ('a', 'time', 'x') + + mod.set_variable_filter(variable_filter='der(x)') + mod.setSimulationOptions(stopTime=3.0) + mod.simulate() + sol_names3 = mod.getSolutions() + assert isinstance(sol_names3, tuple) + assert sol_names3 == ('a', 'time') + + mod.set_variable_filter(variable_filter='der(x)', escape=True) + mod.setSimulationOptions(stopTime=3.0) + mod.simulate() + sol_names4 = mod.getSolutions() + assert isinstance(sol_names4, tuple) + assert sol_names4 == ('a', 'der(x)', 'time') + + mod.set_variable_filter(variable_filter='a') + mod.setSimulationOptions(stopTime=2.0) + mod.simulate() + sol_names5 = mod.getSolutions() + assert isinstance(sol_names5, tuple) + assert sol_names5 == ('a', 'time') + + def test_getters(tmp_path): model_file = tmp_path / "M_getters.mo" model_file.write_text(""" @@ -439,6 +484,14 @@ def test_simulate_inputs(tmp_path): simOptions = {"stopTime": 1.0} mod.setSimulationOptions(**simOptions) + # check invalid inputs + # * 'None' cannot be converted to float + with pytest.raises(OMPython.ModelicaSystemError): + mod.setInputs(u1=[(0.0, None), (0.5, 1)]) + # * 'abc' cannot be converted to float + with pytest.raises(OMPython.ModelicaSystemError): + mod.setInputs(u1=[(0.0, 0.0), ("abc", 1)]) + # integrate zero (no setInputs call) - it should default to None -> 0 assert mod.getInputs() == { "u1": None, diff --git a/tests/test_ZMQ.py b/tests/test_ZMQ.py index 89a8387b..1ba62cb9 100644 --- a/tests/test_ZMQ.py +++ b/tests/test_ZMQ.py @@ -38,14 +38,12 @@ def test_Simulate(omcs, model_time_str): assert omcs.sendExpression('res.resultFile') -def test_execute(omcs): - with pytest.deprecated_call(): - assert omcs.execute('"HelloWorld!"') == '"HelloWorld!"\n' +def test_sendExpression(omcs): assert omcs.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' assert omcs.sendExpression('"HelloWorld!"', parsed=True) == 'HelloWorld!' -def test_omcprocessport_execute(omcs): +def test_sendExpression_port(omcs): port = omcs.get_port() omcs2 = OMPython.OMCSessionPort(omc_port=port) @@ -58,7 +56,7 @@ def test_omcprocessport_execute(omcs): del omcs2 -def test_omcprocessport_simulate(omcs, model_time_str): +def test_Simulate_port(omcs, model_time_str): port = omcs.get_port() omcs2 = OMPython.OMCSessionPort(omc_port=port) diff --git a/tests_v400/test_ModelicaSystem.py b/tests_v400/test_ModelicaSystem.py index c55e95fc..aa713af0 100644 --- a/tests_v400/test_ModelicaSystem.py +++ b/tests_v400/test_ModelicaSystem.py @@ -35,7 +35,7 @@ def test_setParameters(): mod = OMPython.ModelicaSystem(model_path + "BouncingBall.mo", "BouncingBall") # method 1 - mod.setParameters(pvals={"e": 1.234}) + mod.setParameters(pvals="e=1.234") mod.setParameters(pvals={"g": 321.0}) assert mod.getParameters("e") == ["1.234"] assert mod.getParameters("g") == ["321.0"] diff --git a/tests_v400/test_ModelicaSystemCmd.py b/tests_v400/test_ModelicaSystemCmd.py index 3544a1bd..0f4535fd 100644 --- a/tests_v400/test_ModelicaSystemCmd.py +++ b/tests_v400/test_ModelicaSystemCmd.py @@ -18,7 +18,11 @@ def model_firstorder(tmp_path): @pytest.fixture def mscmd_firstorder(model_firstorder): mod = OMPython.ModelicaSystem(fileName=model_firstorder.as_posix(), modelName="M") - mscmd = OMPython.ModelicaSystemCmd(runpath=mod.getWorkDirectory(), modelname=mod._model_name) + mscmd = OMPython.ModelExecutionConfig( + runpath=mod.getWorkDirectory(), + model_name=mod._model_name, + cmd_prefix=[], + ) return mscmd @@ -30,10 +34,9 @@ def test_simflags(mscmd_firstorder): "override": {'b': 2} }) with pytest.deprecated_call(): - mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) + mscmd.args_set(args=OMPython.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) - assert mscmd.get_cmd() == [ - mscmd.get_exe().as_posix(), + assert mscmd.get_cmd_args() == [ '-noEventEmit', '-noRestart', '-override=a=1,b=2,x=3', @@ -43,8 +46,7 @@ def test_simflags(mscmd_firstorder): "override": {'b': None}, }) - assert mscmd.get_cmd() == [ - mscmd.get_exe().as_posix(), + assert mscmd.get_cmd_args() == [ '-noEventEmit', '-noRestart', '-override=a=1,x=3',