diff --git a/.github/workflows/Test.yml b/.github/workflows/Test.yml index bf12fff7..8ad1beee 100644 --- a/.github/workflows/Test.yml +++ b/.github/workflows/Test.yml @@ -2,7 +2,7 @@ name: Test-Publish on: push: - branches: ['master'] + branches: [ 'master' ] tags: - 'v*' # only publish when pushing version tags (e.g., v1.0.0) pull_request: @@ -22,13 +22,13 @@ jobs: # test for: # * oldest supported version # * latest available Python version - python-version: ['3.10', '3.14'] + python-version: [ '3.12', '3.14' ] # * Linux using ubuntu-latest # * Windows using windows-latest - os: ['ubuntu-latest', 'windows-latest'] + os: [ 'ubuntu-latest', 'windows-latest' ] # * OM stable - latest stable version # * OM nightly - latest nightly build - omc-version: ['stable', 'nightly'] + omc-version: [ 'stable', 'nightly' ] steps: - uses: actions/checkout@v6 @@ -82,24 +82,14 @@ jobs: click-to-expand: true report-title: 'Test Report' - - name: Run pytest based on v4.0.0 compatibility layer - uses: pavelzw/pytest-action@v2 - with: - verbose: true - emoji: true - job-summary: true - custom-arguments: '-v ${{ inputs.pytest_args }} ./tests_v400' - click-to-expand: true - report-title: 'Test Report (v4.0.0 compatibility layer)' - Publish: name: Publish to PyPI runs-on: ${{ matrix.os }} needs: test strategy: matrix: - python-version: ['3.10'] - os: ['ubuntu-latest'] + python-version: [ '3.12' ] + os: [ 'ubuntu-latest' ] if: startsWith(github.ref, 'refs/tags/') steps: - uses: actions/checkout@v6 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dd477775..46e1d477 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,7 +33,7 @@ repos: hooks: - id: mypy args: [] - exclude: 'test|test_v400' + exclude: 'test' additional_dependencies: - pyparsing - types-psutil diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py deleted file mode 100644 index 17678bb0..00000000 --- a/OMPython/ModelicaSystem.py +++ /dev/null @@ -1,223 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Definition of main class to run Modelica simulations - ModelicaSystem. -""" - -import logging -import os -import pathlib -import platform -from typing import Any, Optional - -import numpy as np - -from OMPython.model_execution import ( - ModelExecutionCmd, - ModelExecutionException, -) -from OMPython.om_session_omc import ( - OMCSessionLocal, -) -from OMPython.modelica_system_abc import ( - ModelicaSystemError, -) -from OMPython.modelica_system_omc import ( - ModelicaSystemOMC, -) -from OMPython.modelica_doe_omc import ( - ModelicaDoEOMC, -) - -# define logger using the current module name as ID -logger = logging.getLogger(__name__) - - -class ModelicaSystem(ModelicaSystemOMC): - """ - Compatibility class. - """ - - def __init__( - self, - fileName: Optional[str | os.PathLike | pathlib.Path] = None, - modelName: Optional[str] = None, - lmodel: Optional[list[str | tuple[str, str]]] = None, - commandLineOptions: Optional[list[str]] = None, - variableFilter: Optional[str] = None, - customBuildDirectory: Optional[str | os.PathLike] = None, - omhome: Optional[str] = None, - omc_process: Optional[OMCSessionLocal] = None, - build: bool = True, - ) -> None: - super().__init__( - command_line_options=commandLineOptions, - work_directory=customBuildDirectory, - omhome=omhome, - session=omc_process, - ) - self.model( - model_name=modelName, - model_file=fileName, - libraries=lmodel, - variable_filter=variableFilter, - build=build, - ) - self._getconn = self._session - - def setCommandLineOptions(self, commandLineOptions: str): - super().set_command_line_options(command_line_option=commandLineOptions) - - def setContinuous( # type: ignore[override] - self, - cvals: str | list[str] | dict[str, Any], - ) -> bool: - if isinstance(cvals, dict): - return super().setContinuous(**cvals) - raise ModelicaSystemError("Only dict input supported for setContinuous()") - - def setParameters( # type: ignore[override] - self, - pvals: str | list[str] | dict[str, Any], - ) -> bool: - if isinstance(pvals, dict): - return super().setParameters(**pvals) - raise ModelicaSystemError("Only dict input supported for setParameters()") - - def setOptimizationOptions( # type: ignore[override] - self, - optimizationOptions: str | list[str] | dict[str, Any], - ) -> bool: - if isinstance(optimizationOptions, dict): - return super().setOptimizationOptions(**optimizationOptions) - raise ModelicaSystemError("Only dict input supported for setOptimizationOptions()") - - def setInputs( # type: ignore[override] - self, - name: str | list[str] | dict[str, Any], - ) -> bool: - if isinstance(name, dict): - return super().setInputs(**name) - raise ModelicaSystemError("Only dict input supported for setInputs()") - - def setSimulationOptions( # type: ignore[override] - self, - simOptions: str | list[str] | dict[str, Any], - ) -> bool: - if isinstance(simOptions, dict): - return super().setSimulationOptions(**simOptions) - raise ModelicaSystemError("Only dict input supported for setSimulationOptions()") - - def setLinearizationOptions( # type: ignore[override] - self, - linearizationOptions: str | list[str] | dict[str, Any], - ) -> bool: - if isinstance(linearizationOptions, dict): - return super().setLinearizationOptions(**linearizationOptions) - raise ModelicaSystemError("Only dict input supported for setLinearizationOptions()") - - def getContinuous( - self, - names: Optional[str | list[str]] = None, - ): - retval = super().getContinuous(names=names) - if self._simulated: - return retval - - if isinstance(retval, dict): - retval2: dict = {} - for key, val in retval.items(): - if np.isnan(val): - retval2[key] = None - else: - retval2[key] = str(val) - return retval2 - if isinstance(retval, list): - retval3: list[str | None] = [] - for val in retval: - if np.isnan(val): - retval3.append(None) - else: - retval3.append(str(val)) - return retval3 - - raise ModelExecutionException("Invalid data!") - - def getOutputs( - self, - names: Optional[str | list[str]] = None, - ): - retval = super().getOutputs(names=names) - if self._simulated: - return retval - - if isinstance(retval, dict): - retval2: dict = {} - for key, val in retval.items(): - if np.isnan(val): - retval2[key] = None - else: - retval2[key] = str(val) - return retval2 - if isinstance(retval, list): - retval3: list[str | None] = [] - for val in retval: - if np.isnan(val): - retval3.append(None) - else: - retval3.append(str(val)) - return retval3 - - raise ModelExecutionException("Invalid data!") - - -class ModelicaSystemDoE(ModelicaDoEOMC): - """ - Compatibility class. - """ - - -class ModelicaSystemCmd(ModelExecutionCmd): - """ - Compatibility class; in the new version it is renamed as ModelExecutionCmd. - """ - - def __init__( - self, - runpath: pathlib.Path, - modelname: str, - timeout: Optional[float] = None, - ) -> None: - super().__init__( - runpath=runpath, - timeout=timeout, - cmd_prefix=[], - model_name=modelname, - ) - - def get_exe(self) -> pathlib.Path: - """Get the path to the compiled model executable.""" - - path_run = pathlib.Path(self._runpath) - if platform.system() == "Windows": - path_exe = path_run / f"{self._model_name}.exe" - else: - path_exe = path_run / self._model_name - - if not path_exe.exists(): - raise ModelicaSystemError(f"Application file path not found: {path_exe}") - - return path_exe - - def get_cmd(self) -> list: - """Get a list with the path to the executable and all command line args. - - This can later be used as an argument for subprocess.run(). - """ - - cmdl = [self.get_exe().as_posix()] + self.get_cmd_args() - - return cmdl - - def run(self): - cmd_definition = self.definition() - return cmd_definition.run() diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py deleted file mode 100644 index c5511923..00000000 --- a/OMPython/OMCSession.py +++ /dev/null @@ -1,309 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Definition of an OMC session. -""" - -from __future__ import annotations - -import logging -from typing import Any, Optional -import warnings - -import pyparsing - -from OMPython.om_session_abc import ( - OMPathABC, - OMSessionABC, - OMSessionException, -) -from OMPython.om_session_omc import ( - DockerPopen, - OMCSessionABC, - OMCSessionDocker, - OMCSessionDockerContainer, - OMCSessionLocal, - OMCSessionPort, - OMCSessionWSL, -) - - -# define logger using the current module name as ID -logger = logging.getLogger(__name__) - - -class OMCSessionException(OMSessionException): - """ - Just a compatibility layer ... - """ - - -class OMCSessionCmd: - """ - Implementation of Open Modelica Compiler API functions. Depreciated! - """ - - def __init__(self, session: OMSessionABC, readonly: bool = False): - warnings.warn( - message="The class OMCSessionCMD is depreciated and will be removed in future versions; " - "please use OMCSession*.sendExpression(...) instead!", - category=DeprecationWarning, - stacklevel=2, - ) - - if not isinstance(session, OMSessionABC): - raise OMCSessionException("Invalid OMC process definition!") - self._session = session - self._readonly = readonly - self._omc_cache: dict[tuple[str, bool], Any] = {} - - def _ask(self, question: str, opt: Optional[list[str]] = None, parsed: bool = True): - - if opt is None: - expression = question - elif isinstance(opt, list): - expression = f"{question}({','.join([str(x) for x in opt])})" - else: - raise OMSessionException(f"Invalid definition of options for {repr(question)}: {repr(opt)}") - - p = (expression, parsed) - - if self._readonly and question != 'getErrorString': - # can use cache if readonly - if p in self._omc_cache: - return self._omc_cache[p] - - try: - res = self._session.sendExpression(expression, parsed=parsed) - except OMSessionException as ex: - raise OMSessionException(f"OMC _ask() failed: {expression} (parsed={parsed})") from ex - - # save response - self._omc_cache[p] = res - - return res - - # TODO: Open Modelica Compiler API functions. Would be nice to generate these. - def loadFile(self, filename): - return self._ask(question='loadFile', opt=[f'"{filename}"']) - - def loadModel(self, className): - return self._ask(question='loadModel', opt=[className]) - - def isModel(self, className): - return self._ask(question='isModel', opt=[className]) - - def isPackage(self, className): - return self._ask(question='isPackage', opt=[className]) - - def isPrimitive(self, className): - return self._ask(question='isPrimitive', opt=[className]) - - def isConnector(self, className): - return self._ask(question='isConnector', opt=[className]) - - def isRecord(self, className): - return self._ask(question='isRecord', opt=[className]) - - def isBlock(self, className): - return self._ask(question='isBlock', opt=[className]) - - def isType(self, className): - return self._ask(question='isType', opt=[className]) - - def isFunction(self, className): - return self._ask(question='isFunction', opt=[className]) - - def isClass(self, className): - return self._ask(question='isClass', opt=[className]) - - def isParameter(self, className): - return self._ask(question='isParameter', opt=[className]) - - def isConstant(self, className): - return self._ask(question='isConstant', opt=[className]) - - def isProtected(self, className): - return self._ask(question='isProtected', opt=[className]) - - def getPackages(self, className="AllLoadedClasses"): - return self._ask(question='getPackages', opt=[className]) - - def getClassRestriction(self, className): - return self._ask(question='getClassRestriction', opt=[className]) - - def getDerivedClassModifierNames(self, className): - return self._ask(question='getDerivedClassModifierNames', opt=[className]) - - def getDerivedClassModifierValue(self, className, modifierName): - return self._ask(question='getDerivedClassModifierValue', opt=[className, modifierName]) - - def typeNameStrings(self, className): - return self._ask(question='typeNameStrings', opt=[className]) - - def getComponents(self, className): - return self._ask(question='getComponents', opt=[className]) - - def getClassComment(self, className): - try: - return self._ask(question='getClassComment', opt=[className]) - except pyparsing.ParseException as ex: - logger.warning("Method 'getClassComment(%s)' failed; OMTypedParser error: %s", - className, ex.msg) - return 'No description available' - - def getNthComponent(self, className, comp_id): - """ returns with (type, name, description) """ - return self._ask(question='getNthComponent', opt=[className, comp_id]) - - def getNthComponentAnnotation(self, className, comp_id): - return self._ask(question='getNthComponentAnnotation', opt=[className, comp_id]) - - def getImportCount(self, className): - return self._ask(question='getImportCount', opt=[className]) - - def getNthImport(self, className, importNumber): - # [Path, id, kind] - return self._ask(question='getNthImport', opt=[className, importNumber]) - - def getInheritanceCount(self, className): - return self._ask(question='getInheritanceCount', opt=[className]) - - def getNthInheritedClass(self, className, inheritanceDepth): - return self._ask(question='getNthInheritedClass', opt=[className, inheritanceDepth]) - - def getParameterNames(self, className): - try: - return self._ask(question='getParameterNames', opt=[className]) - except KeyError as ex: - logger.warning('OMPython error: %s', ex) - # FIXME: OMC returns with a different structure for empty parameter set - return [] - - def getParameterValue(self, className, parameterName): - try: - return self._ask(question='getParameterValue', opt=[className, parameterName]) - except pyparsing.ParseException as ex: - logger.warning("Method 'getParameterValue(%s, %s)' failed; OMTypedParser error: %s", - className, parameterName, ex.msg) - return "" - - def getComponentModifierNames(self, className, componentName): - return self._ask(question='getComponentModifierNames', opt=[className, componentName]) - - def getComponentModifierValue(self, className, componentName): - return self._ask(question='getComponentModifierValue', opt=[className, componentName]) - - def getExtendsModifierNames(self, className, componentName): - return self._ask(question='getExtendsModifierNames', opt=[className, componentName]) - - def getExtendsModifierValue(self, className, extendsName, modifierName): - return self._ask(question='getExtendsModifierValue', opt=[className, extendsName, modifierName]) - - def getNthComponentModification(self, className, comp_id): - # FIXME: OMPython exception Results KeyError exception - - # get {$Code(....)} field - # \{\$Code\((\S*\s*)*\)\} - value = self._ask(question='getNthComponentModification', opt=[className, comp_id], parsed=False) - value = value.replace("{$Code(", "") - return value[:-3] - # return self.re_Code.findall(value) - - # function getClassNames - # input TypeName class_ = $Code(AllLoadedClasses); - # input Boolean recursive = false; - # input Boolean qualified = false; - # input Boolean sort = false; - # input Boolean builtin = false "List also builtin classes if true"; - # input Boolean showProtected = false "List also protected classes if true"; - # output TypeName classNames[:]; - # end getClassNames; - def getClassNames(self, className=None, recursive=False, qualified=False, sort=False, builtin=False, - showProtected=False): - opt = [className] if className else [] + [f'recursive={str(recursive).lower()}', - f'qualified={str(qualified).lower()}', - f'sort={str(sort).lower()}', - f'builtin={str(builtin).lower()}', - f'showProtected={str(showProtected).lower()}'] - return self._ask(question='getClassNames', opt=opt) - - -class OMCSessionZMQ(OMSessionABC): - """ - This class is a compatibility layer for the new schema using OMCSession* classes. - """ - - def __init__( - self, - timeout: Optional[float] = None, - omhome: Optional[str] = None, - omc_process: Optional[OMCSessionABC] = None, - ) -> None: - """ - Initialisation for OMCSessionZMQ - """ - warnings.warn(message="The class OMCSessionZMQ is depreciated and will be removed in future versions; " - "please use OMCProcess* classes instead!", - category=DeprecationWarning, - stacklevel=2) - - if omc_process is None: - omc_process = OMCSessionLocal(omhome=omhome, timeout=timeout) - elif not isinstance(omc_process, OMCSessionABC): - raise OMSessionException("Invalid definition of the OMC process!") - self.omc_process = omc_process - - super().__init__(timeout=timeout) - - def __del__(self): - if hasattr(self, 'omc_process'): - del self.omc_process - - @staticmethod - def escape_str(value: str) -> str: - """ - Escape a string such that it can be used as string within OMC expressions, i.e. escape all double quotes. - """ - return OMCSessionABC.escape_str(value=value) - - def omcpath(self, *path) -> OMPathABC: - """ - Create an OMCPath object based on the given path segments and the current OMC process definition. - """ - return self.omc_process.omcpath(*path) - - def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC: - """ - Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all - filesystem related access. - """ - return self.omc_process.omcpath_tempdir(tempdir_base=tempdir_base) - - def execute(self, command: str): - return self.omc_process.execute(command=command) - - def sendExpression(self, command: str, parsed: bool = True) -> Any: - """ - Send an expression to the OMC server and return the result. - - The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'. - Caller should only check for OMSessionException. - """ - return self.omc_process.sendExpression(expr=command, parsed=parsed) - - def get_version(self) -> str: - return self.omc_process.get_version() - - def model_execution_prefix(self, cwd: Optional[OMPathABC] = None) -> list[str]: - return self.omc_process.model_execution_prefix(cwd=cwd) - - def set_workdir(self, workdir: OMPathABC) -> None: - return self.omc_process.set_workdir(workdir=workdir) - - -DummyPopen = DockerPopen -OMCProcessLocal = OMCSessionLocal -OMCProcessPort = OMCSessionPort -OMCProcessDocker = OMCSessionDocker -OMCProcessDockerContainer = OMCSessionDockerContainer -OMCProcessWSL = OMCSessionWSL diff --git a/OMPython/OMTypedParser.py b/OMPython/OMTypedParser.py index 9fe810e0..06912221 100644 --- a/OMPython/OMTypedParser.py +++ b/OMPython/OMTypedParser.py @@ -161,6 +161,3 @@ def om_parser_typed(string) -> Any: if len(res) == 0: return None return res[0] - - -parseString = om_parser_typed diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 282923a7..f54a6eda 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -6,14 +6,14 @@ ``` import OMPython omc = OMPython.OMCSessionLocal() -omc.sendExpression("command") +omc.sendExpression("getVersion()") ``` """ from OMPython.model_execution import ( - ModelExecutionCmd, - ModelExecutionData, + ModelExecutionConfig, + ModelExecutionRun, ModelExecutionException, ) from OMPython.om_session_abc import ( @@ -58,37 +58,20 @@ ModelicaDoERunner, ) -from OMPython.ModelicaSystem import ( - ModelicaSystem, - ModelicaSystemDoE, - ModelicaSystemCmd, -) -from OMPython.OMCSession import ( - OMCSessionCmd, - OMCSessionZMQ, - OMCSessionException, - - OMCProcessLocal, - OMCProcessPort, - OMCProcessDocker, - OMCProcessDockerContainer, -) - # global names imported if import 'from OMPython import *' is used __all__ = [ 'doe_get_solutions', 'LinearizationResult', - 'ModelExecutionCmd', - 'ModelExecutionData', + 'ModelExecutionConfig', + 'ModelExecutionRun', 'ModelExecutionException', 'ModelicaDoEABC', 'ModelicaDoEOMC', 'ModelicaDoERunner', 'ModelicaSystemABC', - 'ModelicaSystemDoE', 'ModelicaSystemError', 'ModelicaSystemOMC', 'ModelicaSystemRunner', @@ -108,19 +91,4 @@ 'OMPathRunnerBash', 'OMPathRunnerLocal', 'OMSessionRunner', - - 'ModelicaSystemCmd', - 'ModelicaSystem', - - 'OMCSessionABC', - 'OMCSessionCmd', - - 'OMCSessionException', - - 'OMCSessionZMQ', - - 'OMCProcessLocal', - 'OMCProcessPort', - 'OMCProcessDocker', - 'OMCProcessDockerContainer', ] diff --git a/OMPython/model_execution.py b/OMPython/model_execution.py index ebd4c011..d3b344d1 100644 --- a/OMPython/model_execution.py +++ b/OMPython/model_execution.py @@ -12,7 +12,6 @@ import re import subprocess from typing import Any, Optional -import warnings # define logger using the current module name as ID logger = logging.getLogger(__name__) @@ -27,14 +26,13 @@ class ModelExecutionException(Exception): @dataclasses.dataclass -class ModelExecutionData: +class ModelExecutionRun: """ - Data class to store the command line data for running a model executable in the OMC environment. + Data class to store the command line data for running a model executable. This definition is independent of the OMC + environment as only the executable is needed. - All data should be defined for the environment, where OMC is running (local, docker or WSL) - - To use this as a definition of an OMC simulation run, it has to be processed within - OMCProcess*.self_update(). This defines the attribute cmd_model_executable. + All data should be defined for the environment, where the executable was defined / is located. This is especially + important if OMPython and the executable are defined in different environments (docker or WSL). """ # cmd_path is the expected working directory cmd_path: str @@ -105,11 +103,12 @@ def run(self) -> int: return returncode -class ModelExecutionCmd: +class ModelExecutionConfig: """ - All information about a compiled model executable. This should include data about all structured parameters, i.e. - parameters which need a recompilation of the model. All non-structured parameters can be easily changed without - the need for recompilation. + This class collects all information about a compiled model executable. This includes data about all structured + parameters, i.e. parameters which need a recompilation of the model. All non-structured parameters can be easily + changed without the need for recompilation. The final result is an instance of class ModelExecutionRun - a + definition to run one simulation based on the compiled model executable. """ def __init__( @@ -261,7 +260,7 @@ def get_cmd_args(self) -> list[str]: return cmdl - def definition(self) -> ModelExecutionData: + def definition(self) -> ModelExecutionRun: """ Define all needed data to run the model executable. The data is stored in an OMCSessionRunData object. """ @@ -301,7 +300,7 @@ def definition(self) -> ModelExecutionData: if self._cmd_local: cmd_cwd_local = cmd_path.as_posix() - omc_run_data = ModelExecutionData( + omc_run_data = ModelExecutionRun( cmd_path=cmd_path.as_posix(), cmd_model_name=self._model_name, cmd_args=self.get_cmd_args(), @@ -314,45 +313,3 @@ def definition(self) -> ModelExecutionData: ) return omc_run_data - - @staticmethod - def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | numbers.Number]]: - """ - Parse a simflag definition; this is deprecated! - - The return data can be used as input for self.args_set(). - """ - warnings.warn( - message="The argument 'simflags' is depreciated and will be removed in future versions; " - "please use 'simargs' instead", - category=DeprecationWarning, - stacklevel=2, - ) - - simargs: dict[str, Optional[str | dict[str, Any] | numbers.Number]] = {} - - args = [s for s in simflags.split(' ') if s] - for arg in args: - if arg[0] != '-': - raise ModelExecutionException(f"Invalid simulation flag: {arg}") - arg = arg[1:] - parts = arg.split('=') - if len(parts) == 1: - simargs[parts[0]] = None - elif parts[0] == 'override': - override = '='.join(parts[1:]) - - override_dict = {} - for item in override.split(','): - kv = item.split('=') - if not 0 < len(kv) < 3: - raise ModelExecutionException(f"Invalid value for '-override': {override}") - if kv[0]: - try: - override_dict[kv[0]] = kv[1] - except (KeyError, IndexError) as ex: - raise ModelExecutionException(f"Invalid value for '-override': {override}") from ex - - simargs[parts[0]] = override_dict - - return simargs diff --git a/OMPython/modelica_doe_abc.py b/OMPython/modelica_doe_abc.py index e3ab8403..062f8833 100644 --- a/OMPython/modelica_doe_abc.py +++ b/OMPython/modelica_doe_abc.py @@ -13,7 +13,8 @@ from typing import Any, cast, Optional, Tuple from OMPython.model_execution import ( - ModelExecutionData, + ModelExecutionRun, + ModelExecutionException, ) from OMPython.om_session_abc import ( OMPathABC, @@ -138,7 +139,7 @@ def __init__( self._parameters = {} self._doe_def: Optional[dict[str, dict[str, Any]]] = None - self._doe_cmd: Optional[dict[str, ModelExecutionData]] = None + self._doe_cmd: Optional[dict[str, ModelExecutionRun]] = None def get_session(self) -> OMSessionABC: """ @@ -209,7 +210,7 @@ def prepare(self) -> int: } ) - self._mod.setParameters(sim_param_non_structural) + self._mod.setParameters(**sim_param_non_structural) mscmd = self._mod.simulate_cmd( result_file=resultfile, ) @@ -255,7 +256,7 @@ def get_doe_definition(self) -> Optional[dict[str, dict[str, Any]]]: """ return self._doe_def - def get_doe_command(self) -> Optional[dict[str, ModelExecutionData]]: + def get_doe_command(self) -> Optional[dict[str, ModelExecutionRun]]: """ Get the definitions of simulations commands to run for this DoE. """ @@ -310,8 +311,8 @@ def worker(worker_id, task_queue): returncode = cmd_definition.run() logger.info(f"[Worker {worker_id}] Simulation {resultpath.name} " f"finished with return code: {returncode}") - except ModelicaSystemError as ex: - logger.warning(f"Simulation error for {resultpath.name}: {ex}") + except ModelExecutionException as exc: + logger.warning(f"Simulation error for {resultpath.name}: {exc}") # Mark the task as done task_queue.task_done() diff --git a/OMPython/modelica_system_abc.py b/OMPython/modelica_system_abc.py index fcc31deb..41a67205 100644 --- a/OMPython/modelica_system_abc.py +++ b/OMPython/modelica_system_abc.py @@ -5,19 +5,20 @@ import abc import ast +import csv from dataclasses import dataclass import logging import numbers import os import re from typing import Any, Optional -import warnings import xml.etree.ElementTree as ET import numpy as np from OMPython.model_execution import ( - ModelExecutionCmd, + ModelExecutionConfig, + ModelExecutionException, ) from OMPython.om_session_abc import ( OMPathABC, @@ -189,7 +190,7 @@ def check_model_executable(self): Check if the model executable is working """ # check if the executable exists ... - om_cmd = ModelExecutionCmd( + om_cmd = ModelExecutionConfig( runpath=self.getWorkDirectory(), cmd_local=self._session.model_execution_local, cmd_windows=self._session.model_execution_windows, @@ -200,7 +201,10 @@ def check_model_executable(self): # ... by running it - output help for command help om_cmd.arg_set(key="help", val="help") cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() + try: + returncode = cmd_definition.run() + except ModelExecutionException as exc: + raise ModelicaSystemError(f"Cannot execute model: {exc}") from exc if returncode != 0: raise ModelicaSystemError("Model executable not working!") @@ -213,6 +217,15 @@ def _xmlparse(self, xml_file: OMPathABC): root = tree.getroot() if root is None: raise ModelicaSystemError(f"Cannot read XML file: {xml_file}") + # check OM version - force the version used by the model executable + if 'generationTool' in root.attrib: + generation_tool_version = self._parse_om_version(version=root.attrib['generationTool']) + if self._version != generation_tool_version: + logger.warning(f"Mismatch in OpenModelica version: {self._version!r} (OMSession) " + f"vs. {generation_tool_version!r} (model executable) " + f"- using {generation_tool_version!r}!") + self._version = generation_tool_version + for attr in root.iter('DefaultExperiment'): for key in ("startTime", "stopTime", "stepSize", "tolerance", "solver", "outputFormat"): @@ -579,16 +592,24 @@ def _parse_om_version(version: str) -> tuple[int, int, int]: def _process_override_data( self, - om_cmd: ModelExecutionCmd, + om_cmd: ModelExecutionConfig, override_file: OMPathABC, override_var: dict[str, str], override_sim: dict[str, str], + variable_filter: Optional[str] = None, ) -> None: """ Define the override parameters. As the definition of simulation specific override parameter changes with OM 1.26.0, version specific code is needed. Please keep in mind, that this will fail if OMC is not used to run the model executable. + + Including also override of variable filter settings. """ + + # define variable filter if defined (override any original setting) + if variable_filter is not None: + om_cmd.arg_set(key="variableFilter", val=variable_filter) + if len(override_var) == 0 and len(override_sim) == 0: return @@ -617,9 +638,8 @@ def _process_override_data( def simulate_cmd( self, result_file: OMPathABC, - simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, - ) -> ModelExecutionCmd: + ) -> ModelExecutionConfig: """ This method prepares the simulates model according to the simulation options. It returns an instance of ModelicaSystemCmd which can be used to run the simulation. @@ -633,7 +653,6 @@ def simulate_cmd( Parameters ---------- result_file - simflags simargs Returns @@ -641,7 +660,7 @@ def simulate_cmd( An instance if ModelicaSystemCmd to run the requested simulation. """ - om_cmd = ModelExecutionCmd( + om_cmd = ModelExecutionConfig( runpath=self.getWorkDirectory(), cmd_local=self._session.model_execution_local, cmd_windows=self._session.model_execution_windows, @@ -653,10 +672,6 @@ def simulate_cmd( # always define the result file to use om_cmd.arg_set(key="r", val=result_file.as_posix()) - # allow runtime simulation flags from user input - if simflags is not None: - om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) - if simargs: om_cmd.args_set(args=simargs) @@ -665,6 +680,7 @@ def simulate_cmd( override_file=result_file.parent / f"{result_file.stem}_override.txt", override_var=self._override_variables, override_sim=self._simulate_options_override, + variable_filter=self._variable_filter, ) if self._inputs: # if model has input quantities @@ -690,7 +706,6 @@ def simulate_cmd( def simulate( self, resultfile: Optional[str | os.PathLike] = None, - simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, ) -> None: """Simulate the model according to simulation options. @@ -699,16 +714,11 @@ def simulate( Args: resultfile: Path to a custom result file - simflags: String of extra command line flags for the model binary. - This argument is deprecated, use simargs instead. simargs: Dict with simulation runtime flags. Examples: mod.simulate() mod.simulate(resultfile="a.mat") - # set runtime simulation flags, deprecated - mod.simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") - # using simargs mod.simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "override": {"e": 0.3, "g": 10}}) """ @@ -727,7 +737,6 @@ def simulate( om_cmd = self.simulate_cmd( result_file=self._result_file, - simflags=simflags, simargs=simargs, ) @@ -736,7 +745,10 @@ def simulate( self._result_file.unlink() # ... run simulation ... cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() + try: + returncode = cmd_definition.run() + except ModelExecutionException as exc: + raise ModelicaSystemError(f"Cannot execute model: {exc}") from exc # and check returncode *AND* resultfile if returncode != 0 and self._result_file.is_file(): # check for an empty (=> 0B) result file which indicates a crash of the model executable @@ -752,49 +764,13 @@ def simulate( @staticmethod def _prepare_input_data( - input_args: Any, input_kwargs: dict[str, Any], ) -> dict[str, str]: """ Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. """ - - def prepare_str(str_in: str) -> dict[str, str]: - str_in = str_in.replace(" ", "") - key_val_list: list[str] = str_in.split("=") - if len(key_val_list) != 2: - raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") - - input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} - - return input_data_from_str - input_data: dict[str, str] = {} - for input_arg in input_args: - if isinstance(input_arg, str): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - input_data = input_data | prepare_str(input_arg) - elif isinstance(input_arg, list): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - - for item in input_arg: - if not isinstance(item, str): - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") - input_data = input_data | prepare_str(item) - elif isinstance(input_arg, dict): - input_data = input_data | input_arg - else: - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") - if len(input_kwargs): for key, val in input_kwargs.items(): # ensure all values are strings to align it on one type: dict[str, str] @@ -872,21 +848,17 @@ def isParameterChangeable( def setContinuous( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set continuous values. It can be called: - with a sequence of continuous name and assigning corresponding values as arguments as show in the example below: - usage - >>> setContinuous("Name=value") # depreciated - >>> setContinuous(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set continuous values. + usage: >>> setContinuous(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setContinuous(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -896,21 +868,17 @@ def setContinuous( def setParameters( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set parameter values. It can be called: - with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: - usage - >>> setParameters("Name=value") # depreciated - >>> setParameters(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set parameter values + usage: >>> setParameters(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setParameters(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -920,22 +888,17 @@ def setParameters( def setSimulationOptions( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set simulation options. It can be called: - with a sequence of simulation options name and assigning corresponding values as arguments as show in the - example below: - usage - >>> setSimulationOptions("Name=value") # depreciated - >>> setSimulationOptions(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set simulation options. + usage: >>> setSimulationOptions(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setSimulationOptions(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -945,22 +908,17 @@ def setSimulationOptions( def setLinearizationOptions( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set linearization options. It can be called: - with a sequence of linearization options name and assigning corresponding value as arguments as show in the - example below - usage - >>> setLinearizationOptions("Name=value") # depreciated - >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set linearization options. + usage: >>> setLinearizationOptions(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setLinearizationOptions(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -970,22 +928,17 @@ def setLinearizationOptions( def setOptimizationOptions( self, - *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set optimization options. It can be called: - with a sequence of optimization options name and assigning corresponding values as arguments as show in the - example below: - usage - >>> setOptimizationOptions("Name=value") # depreciated - >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) # depreciated + This method is used to set optimization options. + usage: >>> setOptimizationOptions(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setOptimizationOptions(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) return self._set_method_helper( inputdata=inputdata, @@ -993,25 +946,69 @@ def setOptimizationOptions( datatype="optimization-option", overridedata=None) + def set_variable_filter( + self, + variable_filter: Optional[str] = None, + escape: bool = False, + ) -> None: + """ + This method is used to set variable filters. If escape is True, all regex special characters are escaped. + """ + if variable_filter is None: + self._variable_filter = None + return + + if escape: + variable_filter = re.escape(variable_filter) + + # Validate filter_val as a regular expression + try: + re.compile(variable_filter) + except re.error as exc: + raise ModelicaSystemError(f"Invalid variable_filter regular expression: {variable_filter!r} ({exc})") + + self._variable_filter = variable_filter + + @staticmethod + def toInputs(data: dict[str, list[float]]) -> dict[str, list[tuple[float, float]]]: + """ + Converts a dictionary of lists (from pandas DataFrame.to_dict(orient='list')) + into the OMPython setInputs input format. + + Example: mod.setInputs(**toInputs(pdf.to_dict(orient='list'))) + + Assumes the dictionary contains a key named 'time'. + """ + if "time" not in data: + raise ValueError("The provided data must contain a 'time' key.") + + time_series = data["time"] + + inputs = { + var_name: list(zip(time_series, values)) + for var_name, values in data.items() + if var_name != "time" + } + + return inputs + def setInputs( self, *args: Any, **kwargs: dict[str, Any], ) -> bool: """ - This method is used to set input values. It can be called with a sequence of input name and assigning - corresponding values as arguments as show in the example below. Compared to other set*() methods this is a - special case as value could be a list of tuples - these are converted to a string in _prepare_input_data() - and restored here via ast.literal_eval(). + This method is used to set input values. - >>> setInputs("Name=value") # depreciated - >>> setInputs(["Name1=value1","Name2=value2"]) # depreciated + Compared to other set*() methods this is a special case as value could be a list of tuples - these are + converted to a string in _prepare_input_data() and restored here via ast.literal_eval(). + usage: >>> setInputs(Name1="value1", Name2="value2") >>> param = {"Name1": "value1", "Name2": "value2"} >>> setInputs(**param) """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + inputdata = self._prepare_input_data(input_kwargs=kwargs) for key, val in inputdata.items(): if key not in self._inputs: @@ -1021,32 +1018,80 @@ def setInputs( raise ModelicaSystemError(f"Invalid data in input for {repr(key)}: {repr(val)}") val_evaluated = ast.literal_eval(val) - if isinstance(val_evaluated, (int, float)): self._inputs[key] = [(float(self._simulate_options["startTime"]), float(val)), (float(self._simulate_options["stopTime"]), float(val))] elif isinstance(val_evaluated, list): - if not all([isinstance(item, tuple) for item in val_evaluated]): + if not all(isinstance(item, tuple) for item in val_evaluated): raise ModelicaSystemError("Value for setInput() must be in tuple format; " f"got {repr(val_evaluated)}") - if val_evaluated != sorted(val_evaluated, key=lambda x: x[0]): - raise ModelicaSystemError("Time value should be in increasing order; " - f"got {repr(val_evaluated)}") + val_evaluated_checked: list[tuple[float, float]] = [] for item in val_evaluated: - if item[0] < float(self._simulate_options["startTime"]): - raise ModelicaSystemError(f"Time value in {repr(item)} of {repr(val_evaluated)} is less " - "than the simulation start time") if len(item) != 2: raise ModelicaSystemError(f"Value {repr(item)} of {repr(val_evaluated)} " "is in incorrect format!") - self._inputs[key] = val_evaluated + try: + val_evaluated_checked.append((float(item[0]), float(item[1]))) + except (ValueError, TypeError) as exc: + raise ModelicaSystemError("All elements of the input for setInput() should be convertible to " + "type Tuple[float, float] - " + f"found [{repr(item[0])}, {repr(item[1])}] with types " + f"[{type(item[0])}, {type(item[1])}]!") from exc + + if item[0] < float(self._simulate_options["startTime"]): + raise ModelicaSystemError(f"Time value in {repr(item)} of {repr(val_evaluated)} is less " + "than the simulation start time") + + if val_evaluated_checked != sorted(val_evaluated_checked, key=lambda x: x[0]): + raise ModelicaSystemError("Time value should be in increasing order; " + f"got {repr(val_evaluated_checked)}") + + self._inputs[key] = val_evaluated_checked else: raise ModelicaSystemError(f"Data cannot be evaluated for {repr(key)}: {repr(val)}") return True + def setInputsCSV( + self, + csvfile: os.PathLike, + ) -> None: + """ + Read content from a CSV file and use it to define the time based input data. + """ + + # real type is 'dict[str, list[tuple[float, float]]]' - 'dict[str, Any]' is used to make setInputs() happy + inputs: dict[str, Any] = {} + try: + with open(csvfile, newline='') as csvfh: + dialect = csv.Sniffer().sniff(csvfh.read(1024)) + csvfh.seek(0) + reader = csv.DictReader(csvfh, dialect=dialect) + + keys: list[str] = [] + for idx, line in enumerate(reader): + if not keys: + keys = list(line.keys()) + for var in keys[1:]: + if var in inputs: + raise ModelicaSystemError(f"Error reading {csvfile}: duplicated column {var}!") + inputs[var] = [] + try: + # use key[0] as time; all other columns use the header as name + for var in keys[1:]: + inputs[var].append((float(line[keys[0]]), float(line[var]))) + except (ValueError, TypeError) as exc2: + raise ModelicaSystemError(f"Invalid value reading {csvfile} line {idx}/{var}: " + f"{line}!") from exc2 + + except IOError as exc1: + raise ModelicaSystemError(f"Error reading {csvfile}: {exc1}") from exc1 + + if inputs: + self.setInputs(**inputs) + def _createCSVData(self, csvfile: Optional[OMPathABC] = None) -> OMPathABC: """ Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument, @@ -1105,7 +1150,6 @@ def _createCSVData(self, csvfile: Optional[OMPathABC] = None) -> OMPathABC: def linearize( self, lintime: Optional[float] = None, - simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, ) -> LinearizationResult: """Linearize the model according to linearization options. @@ -1114,8 +1158,6 @@ def linearize( Args: lintime: Override "stopTime" value. - simflags: String of extra command line flags for the model binary. - This argument is deprecated, use simargs instead. simargs: A dict with command line flags and possible options; example: "simargs={'csvInput': 'a.csv'}" Returns: @@ -1134,7 +1176,7 @@ def linearize( "use ModelicaSystemOMC() to build the model first" ) - om_cmd = ModelExecutionCmd( + om_cmd = ModelExecutionConfig( runpath=self.getWorkDirectory(), cmd_local=self._session.model_execution_local, cmd_windows=self._session.model_execution_windows, @@ -1148,6 +1190,7 @@ def linearize( override_file=self.getWorkDirectory() / f'{self._model_name}_override_linear.txt', override_var=self._override_variables, override_sim=self._linearization_options, + variable_filter=self._variable_filter, ) if self._inputs: @@ -1168,10 +1211,6 @@ def linearize( f"<= lintime <= {self._linearization_options['stopTime']}") om_cmd.arg_set(key="l", val=str(lintime)) - # allow runtime simulation flags from user input - if simflags is not None: - om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) - if simargs: om_cmd.args_set(args=simargs) @@ -1180,7 +1219,10 @@ def linearize( linear_file.unlink(missing_ok=True) cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() + try: + returncode = cmd_definition.run() + except ModelExecutionException as exc: + raise ModelicaSystemError(f"Cannot execute model: {exc}") from exc if returncode != 0: raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") if not linear_file.is_file(): diff --git a/OMPython/modelica_system_omc.py b/OMPython/modelica_system_omc.py index 34805e0f..4e0f14ce 100644 --- a/OMPython/modelica_system_omc.py +++ b/OMPython/modelica_system_omc.py @@ -126,7 +126,7 @@ def model( # set variables self._model_name = model_name # Model class name self._libraries = libraries # may be needed if model is derived from other model - self._variable_filter = variable_filter + self.set_variable_filter(variable_filter=variable_filter, escape=True) if self._libraries: self._loadLibrary(libraries=self._libraries) diff --git a/OMPython/om_session_abc.py b/OMPython/om_session_abc.py index 70e897d7..385ab3be 100644 --- a/OMPython/om_session_abc.py +++ b/OMPython/om_session_abc.py @@ -7,10 +7,8 @@ import abc import logging -import os import pathlib import platform -import sys from typing import Any, Optional import uuid @@ -26,151 +24,110 @@ class OMSessionException(Exception): """ -# due to the compatibility layer to Python < 3.12, the OM(C)Path classes must be hidden behind the following if -# conditions. This is also the reason for OMPathABC, a simple base class to be used in ModelicaSystem* classes. -# Reason: before Python 3.12, pathlib.PurePosixPath can not be derived from; therefore, OMPathABC is not possible -if sys.version_info < (3, 12): - class _OMPathCompatibility(pathlib.Path): +class OMPathABC(pathlib.PurePosixPath, metaclass=abc.ABCMeta): + """ + Implementation of a basic (PurePosix)Path object to be used within OMPython. The derived classes can use OMC as + backend and - thus - work on different configurations like docker or WSL. The connection to OMC is provided via + an instances of classes derived from BaseSession. + + PurePosixPath is selected as it covers all but Windows systems (Linux, docker, WSL). However, the code is + written such that possible Windows system are taken into account. Nevertheless, the overall functionality is + limited compared to standard pathlib.Path objects. + """ + + def __init__(self, *path, session: OMSessionABC) -> None: + super().__init__(*path) + self._session = session + + def get_session(self) -> OMSessionABC: + """ + Get session definition used for this instance of OMPath. + """ + return self._session + + def with_segments(self, *pathsegments) -> OMPathABC: """ - Compatibility class for OMPathABC in Python < 3.12. This allows to run all code which uses OMPathABC (mainly - ModelicaSystem) on these Python versions. There are remaining limitation as only local execution is possible. + Create a new OMCPath object with the given path segments. + + The original definition of Path is overridden to ensure the session data is set. """ + return type(self)(*pathsegments, session=self._session) - # modified copy of pathlib.Path.__new__() definition - def __new__(cls, *args, **kwargs): - logger.warning("Python < 3.12 - using a version of class OMCPath " - "based on pathlib.Path for local usage only.") + @abc.abstractmethod + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ - if cls is _OMPathCompatibility: - cls = _OMPathCompatibilityWindows if os.name == 'nt' else _OMPathCompatibilityPosix - self = cls._from_parts(args) - if not self._flavour.is_supported: - raise NotImplementedError(f"cannot instantiate {cls.__name__} on your system") - return self + @abc.abstractmethod + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ - def size(self) -> int: - """ - Needed compatibility function to have the same interface as OMCPathReal - """ - return self.stat().st_size + @abc.abstractmethod + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. + """ - class _OMPathCompatibilityPosix(pathlib.PosixPath, _OMPathCompatibility): + @abc.abstractmethod + def read_text(self, encoding=None, errors=None, newline=None) -> str: """ - Compatibility class for OMCPath on Posix systems (Python < 3.12) + Read the content of the file represented by this path as text. """ - class _OMPathCompatibilityWindows(pathlib.WindowsPath, _OMPathCompatibility): + @abc.abstractmethod + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> int: """ - Compatibility class for OMCPath on Windows systems (Python < 3.12) + Write text data to the file represented by this path. + """ + + @abc.abstractmethod + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: """ + Create a directory at the path represented by this class. - OMPathABC = _OMPathCompatibility - -else: - class OMPathABC(pathlib.PurePosixPath, metaclass=abc.ABCMeta): - """ - Implementation of a basic (PurePosix)Path object to be used within OMPython. The derived classes can use OMC as - backend and - thus - work on different configurations like docker or WSL. The connection to OMC is provided via - an instances of classes derived from BaseSession. - - PurePosixPath is selected as it covers all but Windows systems (Linux, docker, WSL). However, the code is - written such that possible Windows system are taken into account. Nevertheless, the overall functionality is - limited compared to standard pathlib.Path objects. - """ - - def __init__(self, *path, session: OMSessionABC) -> None: - super().__init__(*path) - self._session = session - - def get_session(self) -> OMSessionABC: - """ - Get session definition used for this instance of OMPath. - """ - return self._session - - def with_segments(self, *pathsegments) -> OMPathABC: - """ - Create a new OMCPath object with the given path segments. - - The original definition of Path is overridden to ensure the session data is set. - """ - return type(self)(*pathsegments, session=self._session) - - @abc.abstractmethod - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - - @abc.abstractmethod - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - - @abc.abstractmethod - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. - """ - - @abc.abstractmethod - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - - @abc.abstractmethod - def write_text(self, data: str) -> int: - """ - Write text data to the file represented by this path. - """ - - @abc.abstractmethod - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - - @abc.abstractmethod - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - - @abc.abstractmethod - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ - - @abc.abstractmethod - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. - """ - - def absolute(self) -> OMPathABC: - """ - Resolve the path to an absolute path. Just a wrapper for resolve(). - """ - return self.resolve() - - def exists(self) -> bool: - """ - Semi replacement for pathlib.Path.exists(). - """ - return self.is_file() or self.is_dir() - - @abc.abstractmethod - def size(self) -> int: - """ - Get the size of the file in bytes - this is an extra function and the best we can do using OMC. - """ + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + + @abc.abstractmethod + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + + @abc.abstractmethod + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + + @abc.abstractmethod + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. + """ + + def absolute(self) -> OMPathABC: + """ + Resolve the path to an absolute path. Just a wrapper for resolve(). + """ + return self.resolve() + + def exists(self) -> bool: + """ + Semi replacement for pathlib.Path.exists(). + """ + return self.is_file() or self.is_dir() + + @abc.abstractmethod + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ class PostInitCaller(type): diff --git a/OMPython/om_session_omc.py b/OMPython/om_session_omc.py index 6626cd17..b2698181 100644 --- a/OMPython/om_session_omc.py +++ b/OMPython/om_session_omc.py @@ -21,7 +21,6 @@ import time from typing import Any, Optional, Tuple import uuid -import warnings import psutil import pyparsing @@ -39,151 +38,154 @@ # define logger using the current module name as ID logger = logging.getLogger(__name__) -# due to the compatibility layer to Python < 3.12, the OM(C)Path classes must be hidden behind the following if -# conditions. This is also the reason for OMPathABC, a simple base class to be used in ModelicaSystem* classes. -# Reason: before Python 3.12, pathlib.PurePosixPath can not be derived from; therefore, OMPathABC is not possible -if sys.version_info < (3, 12): - OMCPath = OMPathABC - -else: - class _OMCPath(OMPathABC): + + +class OMCPath(OMPathABC): + """ + Implementation of a OMPathABC using OMC as backend. The connection to OMC is provided via an instances of an + OMCSession* classes. + """ + + def is_file(self, *, follow_symlinks=True) -> bool: """ - Implementation of a OMPathABC using OMC as backend. The connection to OMC is provided via an instances of an - OMCSession* classes. + Check if the path is a regular file. """ + del follow_symlinks - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - retval = self.get_session().sendExpression(expr=f'regularFileExists("{self.as_posix()}")') - if not isinstance(retval, bool): - raise OMSessionException(f"Invalid return value for is_file(): {retval} - expect bool") - return retval - - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - retval = self.get_session().sendExpression(expr=f'directoryExists("{self.as_posix()}")') - if not isinstance(retval, bool): - raise OMSessionException(f"Invalid return value for is_dir(): {retval} - expect bool") - return retval - - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. Special handling to differentiate Windows and Posix definitions. - """ - if self._session.model_execution_windows and self._session.model_execution_local: - return pathlib.PureWindowsPath(self.as_posix()).is_absolute() - return pathlib.PurePosixPath(self.as_posix()).is_absolute() - - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - retval = self.get_session().sendExpression(expr=f'readFile("{self.as_posix()}")') - if not isinstance(retval, str): - raise OMSessionException(f"Invalid return value for read_text(): {retval} - expect str") - return retval - - def write_text(self, data: str) -> int: - """ - Write text data to the file represented by this path. - """ - if not isinstance(data, str): - raise TypeError(f"data must be str, not {data.__class__.__name__}") - - data_omc = self._session.escape_str(data) - self._session.sendExpression(expr=f'writeFile("{self.as_posix()}", "{data_omc}", false);') - - return len(data) - - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - if self.is_dir() and not exist_ok: - raise FileExistsError(f"Directory {self.as_posix()} already exists!") - - if not self._session.sendExpression(expr=f'mkdir("{self.as_posix()}")'): - raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") - - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - cwd_str = self._session.sendExpression(expr='cd()') - return type(self)(cwd_str, session=self._session) - - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ - res = self._session.sendExpression(expr=f'deleteFile("{self.as_posix()}")') - if not res and not missing_ok: - raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") - - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. This is done based on available OMC functions. - """ - if strict and not (self.is_file() or self.is_dir()): - raise OMSessionException(f"Path {self.as_posix()} does not exist!") - - if self.is_file(): - pathstr_resolved = self._omc_resolve(self.parent.as_posix()) - omcpath_resolved = self._session.omcpath(pathstr_resolved) / self.name - elif self.is_dir(): - pathstr_resolved = self._omc_resolve(self.as_posix()) - omcpath_resolved = self._session.omcpath(pathstr_resolved) - else: - raise OMSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + retval = self.get_session().sendExpression(expr=f'regularFileExists("{self.as_posix()}")') + if not isinstance(retval, bool): + raise OMSessionException(f"Invalid return value for is_file(): {retval} - expect bool") + return retval - if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): - raise OMSessionException(f"OMCPath resolve failed for {self.as_posix()} - path does not exist!") + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ + del follow_symlinks - return omcpath_resolved + retval = self.get_session().sendExpression(expr=f'directoryExists("{self.as_posix()}")') + if not isinstance(retval, bool): + raise OMSessionException(f"Invalid return value for is_dir(): {retval} - expect bool") + return retval - def _omc_resolve(self, pathstr: str) -> str: - """ - Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd - within OMC. - """ - expr = ('omcpath_cwd := cd(); ' - f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring - 'cd(omcpath_cwd)') + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. Special handling to differentiate Windows and Posix definitions. + """ + if self._session.model_execution_windows and self._session.model_execution_local: + return pathlib.PureWindowsPath(self.as_posix()).is_absolute() + return pathlib.PurePosixPath(self.as_posix()).is_absolute() - try: - retval = self.get_session().sendExpression(expr=expr, parsed=False) - if not isinstance(retval, str): - raise OMSessionException(f"Invalid return value for _omc_resolve(): {retval} - expect str") - result_parts = retval.split('\n') - pathstr_resolved = result_parts[1] - pathstr_resolved = pathstr_resolved[1:-1] # remove quotes - except OMSessionException as ex: - raise OMSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + """ + del encoding, errors, newline + + retval = self.get_session().sendExpression(expr=f'readFile("{self.as_posix()}")') + if not isinstance(retval, str): + raise OMSessionException(f"Invalid return value for read_text(): {retval} - expect str") + return retval + + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> int: + """ + Write text data to the file represented by this path. + """ + del encoding, errors, newline + + if not isinstance(data, str): + raise TypeError(f"data must be str, not {data.__class__.__name__}") - return pathstr_resolved + data_omc = self._session.escape_str(data) + self._session.sendExpression(expr=f'writeFile("{self.as_posix()}", "{data_omc}", false);') - def size(self) -> int: - """ - Get the size of the file in bytes - this is an extra function and the best we can do using OMC. - """ - if not self.is_file(): - raise OMSessionException(f"Path {self.as_posix()} is not a file!") + return len(data) - res = self._session.sendExpression(expr=f'stat("{self.as_posix()}")') - if res[0]: - return int(res[1]) + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + del mode - raise OMSessionException(f"Error reading file size for path {self.as_posix()}!") + if self.is_dir() and not exist_ok: + raise FileExistsError(f"Directory {self.as_posix()} already exists!") - OMCPath = _OMCPath + if not self._session.sendExpression(expr=f'mkdir("{self.as_posix()}")'): + raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") + + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + cwd_str = self._session.sendExpression(expr='cd()') + return type(self)(cwd_str, session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + res = self._session.sendExpression(expr=f'deleteFile("{self.as_posix()}")') + if not res and not missing_ok: + raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") + + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + if strict and not (self.is_file() or self.is_dir()): + raise OMSessionException(f"Path {self.as_posix()} does not exist!") + + if self.is_file(): + pathstr_resolved = self._omc_resolve(self.parent.as_posix()) + omcpath_resolved = self._session.omcpath(pathstr_resolved) / self.name + elif self.is_dir(): + pathstr_resolved = self._omc_resolve(self.as_posix()) + omcpath_resolved = self._session.omcpath(pathstr_resolved) + else: + raise OMSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + + if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): + raise OMSessionException(f"OMCPath resolve failed for {self.as_posix()} - path does not exist!") + + return omcpath_resolved + + def _omc_resolve(self, pathstr: str) -> str: + """ + Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd + within OMC. + """ + expr = ('omcpath_cwd := cd(); ' + f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring + 'cd(omcpath_cwd)') + + try: + retval = self.get_session().sendExpression(expr=expr, parsed=False) + if not isinstance(retval, str): + raise OMSessionException(f"Invalid return value for _omc_resolve(): {retval} - expect str") + result_parts = retval.split('\n') + pathstr_resolved = result_parts[1] + pathstr_resolved = pathstr_resolved[1:-1] # remove quotes + except OMSessionException as ex: + raise OMSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + + return pathstr_resolved + + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ + if not self.is_file(): + raise OMSessionException(f"Path {self.as_posix()} is not a file!") + + res = self._session.sendExpression(expr=f'stat("{self.as_posix()}")') + if res[0]: + return int(res[1]) + + raise OMSessionException(f"Error reading file size for path {self.as_posix()}!") class OMCSessionABC(OMSessionABC, metaclass=abc.ABCMeta): @@ -377,16 +379,6 @@ def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC return self._tempdir(tempdir_base=tempdir_base) - def execute(self, command: str): - warnings.warn( - message="This function is depreciated and will be removed in future versions; " - "please use sendExpression() instead", - category=DeprecationWarning, - stacklevel=2, - ) - - return self.sendExpression(command, parsed=False) - def sendExpression(self, expr: str, parsed: bool = True) -> Any: """ Send an expression to the OMC server and return the result. diff --git a/OMPython/om_session_runner.py b/OMPython/om_session_runner.py index fc8e5ac8..3bd3abeb 100644 --- a/OMPython/om_session_runner.py +++ b/OMPython/om_session_runner.py @@ -9,7 +9,6 @@ import logging import pathlib import subprocess -import sys import tempfile from typing import Any, Optional, Type @@ -22,275 +21,283 @@ # define logger using the current module name as ID logger = logging.getLogger(__name__) -# due to the compatibility layer to Python < 3.12, the OM(C)Path classes must be hidden behind the following if -# conditions. This is also the reason for OMPathABC, a simple base class to be used in ModelicaSystem* classes. -# Reason: before Python 3.12, pathlib.PurePosixPath can not be derived from; therefore, OMPathABC is not possible -if sys.version_info < (3, 12): - OMPathRunnerABC = OMPathABC - OMPathRunnerLocal = OMPathABC - OMPathRunnerBash = OMPathABC - -else: - class OMPathRunnerABC(OMPathABC, metaclass=abc.ABCMeta): - """ - Base function for OMPath definitions *without* OMC server - """ - - def _path(self) -> pathlib.Path: - return pathlib.Path(self.as_posix()) - - class _OMPathRunnerLocal(OMPathRunnerABC): - """ - Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run - locally without any usage of OMC. - - This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not - the correct implementation on Windows systems. To get a valid Windows representation of the path, use the - conversion via pathlib.Path(.as_posix()). - """ - - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - return self._path().is_file() - - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - return self._path().is_dir() - - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. - """ - return self._path().is_absolute() - - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - return self._path().read_text(encoding='utf-8') - - def write_text(self, data: str): - """ - Write text data to the file represented by this path. - """ - if not isinstance(data, str): - raise TypeError(f"data must be str, not {data.__class__.__name__}") - - return self._path().write_text(data=data, encoding='utf-8') - - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - self._path().mkdir(parents=parents, exist_ok=exist_ok) - - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - return type(self)(self._path().cwd().as_posix(), session=self._session) - - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ - self._path().unlink(missing_ok=missing_ok) - - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. This is done based on available OMC functions. - """ - path_resolved = self._path().resolve(strict=strict) - return type(self)(path_resolved, session=self._session) - - def size(self) -> int: - """ - Get the size of the file in bytes - implementation based on pathlib.Path. - """ - if not self.is_file(): - raise OMSessionException(f"Path {self.as_posix()} is not a file!") - - path = self._path() - return path.stat().st_size - - class _OMPathRunnerBash(OMPathRunnerABC): - """ - Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run - locally without any usage of OMC. The special case of this class is the usage of POSIX bash to run all the - commands. Thus, it can be used in WSL or docker. - - This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not - the correct implementation on Windows systems. To get a valid Windows representation of the path, use the - conversion via pathlib.Path(.as_posix()). - """ - - def is_file(self) -> bool: - """ - Check if the path is a regular file. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'test -f "{self.as_posix()}"'] - try: - subprocess.run(cmdl, check=True) - return True - except subprocess.CalledProcessError: - return False - - def is_dir(self) -> bool: - """ - Check if the path is a directory. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'test -d "{self.as_posix()}"'] +class OMPathRunnerABC(OMPathABC, metaclass=abc.ABCMeta): + """ + Base function for OMPath definitions *without* OMC server + """ - try: - subprocess.run(cmdl, check=True) - return True - except subprocess.CalledProcessError: - return False + def _path(self) -> pathlib.Path: + return pathlib.Path(self.as_posix()) - def is_absolute(self) -> bool: - """ - Check if the path is an absolute path. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'case "{self.as_posix()}" in /*) exit 0;; *) exit 1;; esac'] +class OMPathRunnerLocal(OMPathRunnerABC): + """ + Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run + locally without any usage of OMC. - try: - subprocess.check_call(cmdl) - return True - except subprocess.CalledProcessError: - return False - - def read_text(self) -> str: - """ - Read the content of the file represented by this path as text. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'cat "{self.as_posix()}"'] - - result = subprocess.run(cmdl, capture_output=True, check=True) - if result.returncode == 0: - return result.stdout.decode('utf-8') - raise FileNotFoundError(f"Cannot read file: {self.as_posix()}") - - def write_text(self, data: str) -> int: - """ - Write text data to the file represented by this path. - """ - if not isinstance(data, str): - raise TypeError(f"data must be str, not {data.__class__.__name__}") - - data_escape = self._session.escape_str(data) - - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'printf %s "{data_escape}" > "{self.as_posix()}"'] + This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not + the correct implementation on Windows systems. To get a valid Windows representation of the path, use the + conversion via pathlib.Path(.as_posix()). + """ - try: - subprocess.run(cmdl, check=True) - return len(data) - except subprocess.CalledProcessError as exc: - raise IOError(f"Error writing data to file {self.as_posix()}!") from exc - - def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: - """ - Create a directory at the path represented by this class. - - The argument parents with default value True exists to ensure compatibility with the fallback solution for - Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent - directories are also created. - """ - - if self.is_file(): - raise OSError(f"The given path {self.as_posix()} exists and is a file!") - if self.is_dir() and not exist_ok: - raise OSError(f"The given path {self.as_posix()} exists and is a directory!") - if not parents and not self.parent.is_dir(): - raise FileNotFoundError(f"Parent directory of {self.as_posix()} does not exists!") - - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'mkdir -p "{self.as_posix()}"'] + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + del follow_symlinks - try: - subprocess.run(cmdl, check=True) - except subprocess.CalledProcessError as exc: - raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") from exc + return self._path().is_file() + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ + del follow_symlinks + + return self._path().is_dir() + + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. + """ + return self._path().is_absolute() + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + """ + del encoding, errors, newline + + return self._path().read_text(encoding='utf-8') + + def write_text(self, data: str, encoding=None, errors=None, newline=None): + """ + Write text data to the file represented by this path. + """ + del encoding, errors, newline + + if not isinstance(data, str): + raise TypeError(f"data must be str, not {data.__class__.__name__}") + + return self._path().write_text(data=data, encoding='utf-8') + + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + del mode + + self._path().mkdir(parents=parents, exist_ok=exist_ok) + + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + return type(self)(self._path().cwd().as_posix(), session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + self._path().unlink(missing_ok=missing_ok) + + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + path_resolved = self._path().resolve(strict=strict) + return type(self)(path_resolved, session=self._session) + + def size(self) -> int: + """ + Get the size of the file in bytes - implementation based on pathlib.Path. + """ + if not self.is_file(): + raise OMSessionException(f"Path {self.as_posix()} is not a file!") + + path = self._path() + return path.stat().st_size + + +class OMPathRunnerBash(OMPathRunnerABC): + """ + Implementation of OMPathABC which does not use the session data at all. Thus, this implementation can run + locally without any usage of OMC. The special case of this class is the usage of POSIX bash to run all the + commands. Thus, it can be used in WSL or docker. - def cwd(self) -> OMPathABC: - """ - Returns the current working directory as an OMPathABC object. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', 'pwd'] + This class is based on OMPathABC and, therefore, on pathlib.PurePosixPath. This is working well, but it is not + the correct implementation on Windows systems. To get a valid Windows representation of the path, use the + conversion via pathlib.Path(.as_posix()). + """ + + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + del follow_symlinks + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'test -f "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + return True + except subprocess.CalledProcessError: + return False + + def is_dir(self, *, follow_symlinks: bool = True) -> bool: + """ + Check if the path is a directory. + """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'test -d "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + return True + except subprocess.CalledProcessError: + return False + + def is_absolute(self) -> bool: + """ + Check if the path is an absolute path. + """ + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'case "{self.as_posix()}" in /*) exit 0;; *) exit 1;; esac'] + + try: + subprocess.check_call(cmdl) + return True + except subprocess.CalledProcessError: + return False + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + """ + del encoding, errors, newline + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'cat "{self.as_posix()}"'] + + result = subprocess.run(cmdl, capture_output=True, check=True) + if result.returncode == 0: + return result.stdout.decode('utf-8') + raise FileNotFoundError(f"Cannot read file: {self.as_posix()}") + + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> int: + """ + Write text data to the file represented by this path. + """ + del encoding, errors, newline + + if not isinstance(data, str): + raise TypeError(f"data must be str, not {data.__class__.__name__}") - result = subprocess.run(cmdl, capture_output=True, text=True, check=True) - if result.returncode == 0: - return type(self)(result.stdout.strip(), session=self._session) - raise OSError("Can not get current work directory ...") + data_escape = self._session.escape_str(data) - def unlink(self, missing_ok: bool = False) -> None: - """ - Unlink (delete) the file or directory represented by this path. - """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'printf %s "{data_escape}" > "{self.as_posix()}"'] - if not self.is_file(): - raise OSError(f"Can not unlink a directory: {self.as_posix()}!") + try: + subprocess.run(cmdl, check=True) + return len(data) + except subprocess.CalledProcessError as exc: + raise IOError(f"Error writing data to file {self.as_posix()}!") from exc - if not self.is_file(): - return + def mkdir(self, mode=0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + del mode + + if self.is_file(): + raise OSError(f"The given path {self.as_posix()} exists and is a file!") + if self.is_dir() and not exist_ok: + raise OSError(f"The given path {self.as_posix()} exists and is a directory!") + if not parents and not self.parent.is_dir(): + raise FileNotFoundError(f"Parent directory of {self.as_posix()} does not exists!") + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'mkdir -p "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + except subprocess.CalledProcessError as exc: + raise OMSessionException(f"Error on directory creation for {self.as_posix()}!") from exc + + def cwd(self) -> OMPathABC: # pylint: disable=W0221 # is @classmethod in the original; see pathlib.PathBase + """ + Returns the current working directory as an OMPathABC object. + """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', 'pwd'] + + result = subprocess.run(cmdl, capture_output=True, text=True, check=True) + if result.returncode == 0: + return type(self)(result.stdout.strip(), session=self._session) + raise OSError("Can not get current work directory ...") + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + + if not self.is_file(): + raise OSError(f"Can not unlink a directory: {self.as_posix()}!") + + if not self.is_file(): + return + + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'rm "{self.as_posix()}"'] + + try: + subprocess.run(cmdl, check=True) + except subprocess.CalledProcessError as exc: + raise OSError(f"Cannot unlink file {self.as_posix()}: {exc}") from exc + + def resolve(self, strict: bool = False) -> OMPathABC: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'readlink -f "{self.as_posix()}"'] + + result = subprocess.run(cmdl, capture_output=True, text=True, check=True) + if result.returncode == 0: + return type(self)(result.stdout.strip(), session=self._session) + raise FileNotFoundError(f"Cannot resolve path: {self.as_posix()}") + + def size(self) -> int: + """ + Get the size of the file in bytes - implementation based on pathlib.Path. + """ + if not self.is_file(): + raise OMSessionException(f"Path {self.as_posix()} is not a file!") - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'rm "{self.as_posix()}"'] + cmdl = self.get_session().get_cmd_prefix() + cmdl += ['bash', '-c', f'stat -c %s "{self.as_posix()}"'] + result = subprocess.run(cmdl, capture_output=True, text=True, check=True) + stdout = result.stdout.strip() + if result.returncode == 0: try: - subprocess.run(cmdl, check=True) - except subprocess.CalledProcessError as exc: - raise OSError(f"Cannot unlink file {self.as_posix()}: {exc}") from exc - - def resolve(self, strict: bool = False) -> OMPathABC: - """ - Resolve the path to an absolute path. This is done based on available OMC functions. - """ - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'readlink -f "{self.as_posix()}"'] - - result = subprocess.run(cmdl, capture_output=True, text=True, check=True) - if result.returncode == 0: - return type(self)(result.stdout.strip(), session=self._session) - raise FileNotFoundError(f"Cannot resolve path: {self.as_posix()}") - - def size(self) -> int: - """ - Get the size of the file in bytes - implementation based on pathlib.Path. - """ - if not self.is_file(): - raise OMSessionException(f"Path {self.as_posix()} is not a file!") - - cmdl = self.get_session().get_cmd_prefix() - cmdl += ['bash', '-c', f'stat -c %s "{self.as_posix()}"'] - - result = subprocess.run(cmdl, capture_output=True, text=True, check=True) - stdout = result.stdout.strip() - if result.returncode == 0: - try: - return int(stdout) - except ValueError as exc: - raise OSError(f"Invalid return value for file size ({self.as_posix()}): {stdout}") from exc - else: - raise OSError(f"Cannot get size for file {self.as_posix()}") - - OMPathRunnerLocal = _OMPathRunnerLocal - OMPathRunnerBash = _OMPathRunnerBash + return int(stdout) + except ValueError as exc: + raise OSError(f"Invalid return value for file size ({self.as_posix()}): {stdout}") from exc + else: + raise OSError(f"Cannot get size for file {self.as_posix()}") class OMSessionRunnerABC(OMSessionABC, metaclass=abc.ABCMeta): diff --git a/README.md b/README.md index a9cf3bdc..d65528fc 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ OMPython is a Python interface that uses ZeroMQ to communicate with OpenModelica ## Dependencies -- Python 3.x supported -- PyZMQ is required + - Python >= 3.12 + - Additional packages: numpy, psutil, pyparsing and pyzmq ## Installation @@ -49,8 +49,8 @@ help(OMPython) ``` ```python -from OMPython import OMCSessionLocal -omc = OMCSessionLocal() +import OMPython +omc = OMPython.OMCSessionLocal() omc.sendExpression("getVersion()") ``` diff --git a/tests/test_ModelExecutionCmd.py b/tests/test_ModelExecutionCmd.py index db5aadeb..19111070 100644 --- a/tests/test_ModelExecutionCmd.py +++ b/tests/test_ModelExecutionCmd.py @@ -24,7 +24,7 @@ def mscmd_firstorder(model_firstorder): model_name="M", ) - mscmd = OMPython.ModelExecutionCmd( + mscmd = OMPython.ModelExecutionConfig( runpath=mod.getWorkDirectory(), cmd_local=mod.get_session().model_execution_local, cmd_windows=mod.get_session().model_execution_windows, @@ -38,17 +38,19 @@ def mscmd_firstorder(model_firstorder): def test_simflags(mscmd_firstorder): mscmd = mscmd_firstorder - mscmd.args_set({ + mscmd.args_set(args={ + "override": { + 'b': 2, + 'a': 4, + }, + "noRestart": None, "noEventEmit": None, - "override": {'b': 2} }) - with pytest.deprecated_call(): - mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) assert mscmd.get_cmd_args() == [ '-noEventEmit', '-noRestart', - '-override=a=1,b=2,x=3', + '-override=a=4,b=2', ] mscmd.args_set({ @@ -58,5 +60,5 @@ def test_simflags(mscmd_firstorder): assert mscmd.get_cmd_args() == [ '-noEventEmit', '-noRestart', - '-override=a=1,x=3', + '-override=a=4', ] diff --git a/tests/test_ModelicaSystemOMC.py b/tests/test_ModelicaSystemOMC.py index c63b92e1..a5e996c2 100644 --- a/tests/test_ModelicaSystemOMC.py +++ b/tests/test_ModelicaSystemOMC.py @@ -64,9 +64,8 @@ def test_setParameters(): model_name="BouncingBall", ) - # method 1 (test depreciated variants) - mod.setParameters("e=1.234") - mod.setParameters(["g=321.0"]) + mod.setParameters(e=1.234) + mod.setParameters(g=321.0) assert mod.getParameters("e") == ["1.234"] assert mod.getParameters("g") == ["321.0"] assert mod.getParameters() == { @@ -76,7 +75,6 @@ def test_setParameters(): with pytest.raises(KeyError): mod.getParameters("thisParameterDoesNotExist") - # method 2 (new style) pvals = {"e": 21.3, "g": 0.12} mod.setParameters(**pvals) assert mod.getParameters() == { @@ -208,6 +206,53 @@ def _run_getSolutions(mod): assert np.isclose(x, x_analytical, rtol=1e-4).all() +def test_variable_filter(model_firstorder): + mod = OMPython.ModelicaSystemOMC() + mod.model( + model_file=model_firstorder, + model_name="M", + ) + + a = -1 + tau = -1 / a + stopTime = 5 * tau + + simOptions = {"stopTime": stopTime, "stepSize": 0.1, "tolerance": 1e-8} + mod.setSimulationOptions(**simOptions) + mod.simulate() + sol_names1 = mod.getSolutions() + assert isinstance(sol_names1, tuple) + assert sol_names1 == ('a', 'der(x)', 'time', 'x') + + mod.set_variable_filter(variable_filter='x') + mod.setSimulationOptions(stopTime=2.0) + mod.simulate() + sol_names2 = mod.getSolutions() + assert isinstance(sol_names2, tuple) + assert sol_names2 == ('a', 'time', 'x') + + mod.set_variable_filter(variable_filter='der(x)') + mod.setSimulationOptions(stopTime=3.0) + mod.simulate() + sol_names3 = mod.getSolutions() + assert isinstance(sol_names3, tuple) + assert sol_names3 == ('a', 'time') + + mod.set_variable_filter(variable_filter='der(x)', escape=True) + mod.setSimulationOptions(stopTime=3.0) + mod.simulate() + sol_names4 = mod.getSolutions() + assert isinstance(sol_names4, tuple) + assert sol_names4 == ('a', 'der(x)', 'time') + + mod.set_variable_filter(variable_filter='a') + mod.setSimulationOptions(stopTime=2.0) + mod.simulate() + sol_names5 = mod.getSolutions() + assert isinstance(sol_names5, tuple) + assert sol_names5 == ('a', 'time') + + def test_getters(tmp_path): model_file = tmp_path / "M_getters.mo" model_file.write_text(""" @@ -439,6 +484,14 @@ def test_simulate_inputs(tmp_path): simOptions = {"stopTime": 1.0} mod.setSimulationOptions(**simOptions) + # check invalid inputs + # * 'None' cannot be converted to float + with pytest.raises(OMPython.ModelicaSystemError): + mod.setInputs(u1=[(0.0, None), (0.5, 1)]) + # * 'abc' cannot be converted to float + with pytest.raises(OMPython.ModelicaSystemError): + mod.setInputs(u1=[(0.0, 0.0), ("abc", 1)]) + # integrate zero (no setInputs call) - it should default to None -> 0 assert mod.getInputs() == { "u1": None, diff --git a/tests/test_OMCPath.py b/tests/test_OMCPath.py index e15c75ff..dcbb2573 100644 --- a/tests/test_OMCPath.py +++ b/tests/test_OMCPath.py @@ -15,14 +15,6 @@ ) -# TODO: based on compatibility layer -def test_OMCPath_OMCSessionZMQ(): - om = OMPython.OMCSessionZMQ() - - _run_OMPath_checks(om) - _run_OMPath_write_file(om) - - def test_OMCPath_OMCSessionLocal(): oms = OMPython.OMCSessionLocal() diff --git a/tests/test_OMSessionCmd.py b/tests/test_OMSessionCmd.py deleted file mode 100644 index 7dbb9705..00000000 --- a/tests/test_OMSessionCmd.py +++ /dev/null @@ -1,20 +0,0 @@ -import OMPython - - -def test_isPackage(): - omcs = OMPython.OMCSessionLocal() - omccmd = OMPython.OMCSessionCmd(session=omcs) - assert not omccmd.isPackage('Modelica') - - -def test_isPackage2(): - mod = OMPython.ModelicaSystemOMC() - mod.model( - model_name="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", - libraries=["Modelica"], - ) - omccmd = OMPython.OMCSessionCmd(session=mod.get_session()) - assert omccmd.isPackage('Modelica') - - -# TODO: add more checks ... diff --git a/tests/test_ZMQ.py b/tests/test_ZMQ.py index 89a8387b..1ba62cb9 100644 --- a/tests/test_ZMQ.py +++ b/tests/test_ZMQ.py @@ -38,14 +38,12 @@ def test_Simulate(omcs, model_time_str): assert omcs.sendExpression('res.resultFile') -def test_execute(omcs): - with pytest.deprecated_call(): - assert omcs.execute('"HelloWorld!"') == '"HelloWorld!"\n' +def test_sendExpression(omcs): assert omcs.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' assert omcs.sendExpression('"HelloWorld!"', parsed=True) == 'HelloWorld!' -def test_omcprocessport_execute(omcs): +def test_sendExpression_port(omcs): port = omcs.get_port() omcs2 = OMPython.OMCSessionPort(omc_port=port) @@ -58,7 +56,7 @@ def test_omcprocessport_execute(omcs): del omcs2 -def test_omcprocessport_simulate(omcs, model_time_str): +def test_Simulate_port(omcs, model_time_str): port = omcs.get_port() omcs2 = OMPython.OMCSessionPort(omc_port=port) diff --git a/tests_v400/__init__.py b/tests_v400/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests_v400/test_ArrayDimension.py b/tests_v400/test_ArrayDimension.py deleted file mode 100644 index 13b3c11b..00000000 --- a/tests_v400/test_ArrayDimension.py +++ /dev/null @@ -1,19 +0,0 @@ -import OMPython - - -def test_ArrayDimension(tmp_path): - omc = OMPython.OMCSessionZMQ() - - omc.sendExpression(f'cd("{tmp_path.as_posix()}")') - - omc.sendExpression('loadString("model A Integer x[5+1,1+6]; end A;")') - omc.sendExpression("getErrorString()") - - result = omc.sendExpression("getComponents(A)") - assert result[0][-1] == (6, 7), "array dimension does not match" - - omc.sendExpression('loadString("model A Integer y = 5; Integer x[y+1,1+9]; end A;")') - omc.sendExpression("getErrorString()") - - result = omc.sendExpression("getComponents(A)") - assert result[-1][-1] == ('y+1', 10), "array dimension does not match" diff --git a/tests_v400/test_FMIExport.py b/tests_v400/test_FMIExport.py deleted file mode 100644 index f47b87ae..00000000 --- a/tests_v400/test_FMIExport.py +++ /dev/null @@ -1,24 +0,0 @@ -import OMPython -import shutil -import os - - -def test_CauerLowPassAnalog(): - mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", - lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() - try: - fmu = mod.convertMo2Fmu(fileNamePrefix="CauerLowPassAnalog") - assert os.path.exists(fmu) - finally: - shutil.rmtree(tmp, ignore_errors=True) - - -def test_DrumBoiler(): - mod = OMPython.ModelicaSystem(modelName="Modelica.Fluid.Examples.DrumBoiler.DrumBoiler", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() - try: - fmu = mod.convertMo2Fmu(fileNamePrefix="DrumBoiler") - assert os.path.exists(fmu) - finally: - shutil.rmtree(tmp, ignore_errors=True) diff --git a/tests_v400/test_ModelicaSystem.py b/tests_v400/test_ModelicaSystem.py deleted file mode 100644 index c55e95fc..00000000 --- a/tests_v400/test_ModelicaSystem.py +++ /dev/null @@ -1,411 +0,0 @@ -import OMPython -import os -import pathlib -import pytest -import tempfile -import numpy as np - - -@pytest.fixture -def model_firstorder(tmp_path): - mod = tmp_path / "M.mo" - mod.write_text("""model M - Real x(start = 1, fixed = true); - parameter Real a = -1; -equation - der(x) = x*a; -end M; -""") - return mod - - -def test_ModelicaSystem_loop(model_firstorder): - def worker(): - filePath = model_firstorder.as_posix() - m = OMPython.ModelicaSystem(filePath, "M") - m.simulate() - m.convertMo2Fmu(fmuType="me") - for _ in range(10): - worker() - - -def test_setParameters(): - omc = OMPython.OMCSessionZMQ() - model_path = omc.sendExpression("getInstallationDirectoryPath()") + "/share/doc/omc/testmodels/" - mod = OMPython.ModelicaSystem(model_path + "BouncingBall.mo", "BouncingBall") - - # method 1 - mod.setParameters(pvals={"e": 1.234}) - mod.setParameters(pvals={"g": 321.0}) - assert mod.getParameters("e") == ["1.234"] - assert mod.getParameters("g") == ["321.0"] - assert mod.getParameters() == { - "e": "1.234", - "g": "321.0", - } - with pytest.raises(KeyError): - mod.getParameters("thisParameterDoesNotExist") - - # method 2 - mod.setParameters(pvals={"e": 21.3, "g": 0.12}) - assert mod.getParameters() == { - "e": "21.3", - "g": "0.12", - } - assert mod.getParameters(["e", "g"]) == ["21.3", "0.12"] - assert mod.getParameters(["g", "e"]) == ["0.12", "21.3"] - with pytest.raises(KeyError): - mod.getParameters(["g", "thisParameterDoesNotExist"]) - - -def test_setSimulationOptions(): - omc = OMPython.OMCSessionZMQ() - model_path = omc.sendExpression("getInstallationDirectoryPath()") + "/share/doc/omc/testmodels/" - mod = OMPython.ModelicaSystem(fileName=model_path + "BouncingBall.mo", modelName="BouncingBall") - - # method 1 - mod.setSimulationOptions(simOptions={"stopTime": 1.234}) - mod.setSimulationOptions(simOptions={"tolerance": 1.1e-08}) - assert mod.getSimulationOptions("stopTime") == ["1.234"] - assert mod.getSimulationOptions("tolerance") == ["1.1e-08"] - assert mod.getSimulationOptions(["tolerance", "stopTime"]) == ["1.1e-08", "1.234"] - d = mod.getSimulationOptions() - assert isinstance(d, dict) - assert d["stopTime"] == "1.234" - assert d["tolerance"] == "1.1e-08" - with pytest.raises(KeyError): - mod.getSimulationOptions("thisOptionDoesNotExist") - - # method 2 - mod.setSimulationOptions(simOptions={"stopTime": 2.1, "tolerance": "1.2e-08"}) - d = mod.getSimulationOptions() - assert d["stopTime"] == "2.1" - assert d["tolerance"] == "1.2e-08" - - -def test_relative_path(model_firstorder): - cwd = pathlib.Path.cwd() - (fd, name) = tempfile.mkstemp(prefix='tmpOMPython.tests', dir=cwd, text=True) - try: - with os.fdopen(fd, 'w') as f: - f.write(model_firstorder.read_text()) - - model_file = pathlib.Path(name).relative_to(cwd) - model_relative = str(model_file) - assert "/" not in model_relative - - mod = OMPython.ModelicaSystem(fileName=model_relative, modelName="M") - assert float(mod.getParameters("a")[0]) == -1 - finally: - model_file.unlink() # clean up the temporary file - - -def test_customBuildDirectory(tmp_path, model_firstorder): - filePath = model_firstorder.as_posix() - tmpdir = tmp_path / "tmpdir1" - tmpdir.mkdir() - m = OMPython.ModelicaSystem(filePath, "M", customBuildDirectory=tmpdir) - assert pathlib.Path(m.getWorkDirectory().resolve()) == tmpdir.resolve() - result_file = tmpdir / "a.mat" - assert not result_file.exists() - m.simulate(resultfile="a.mat") - assert result_file.is_file() - - -def test_getSolutions(model_firstorder): - filePath = model_firstorder.as_posix() - mod = OMPython.ModelicaSystem(filePath, "M") - x0 = 1 - a = -1 - tau = -1 / a - stopTime = 5*tau - mod.setSimulationOptions(simOptions={"stopTime": stopTime, "stepSize": 0.1, "tolerance": 1e-8}) - mod.simulate() - - x = mod.getSolutions("x") - t, x2 = mod.getSolutions(["time", "x"]) - assert (x2 == x).all() - sol_names = mod.getSolutions() - assert isinstance(sol_names, tuple) - assert "time" in sol_names - assert "x" in sol_names - assert "der(x)" in sol_names - with pytest.raises(OMPython.ModelicaSystemError): - mod.getSolutions("thisVariableDoesNotExist") - assert np.isclose(t[0], 0), "time does not start at 0" - assert np.isclose(t[-1], stopTime), "time does not end at stopTime" - x_analytical = x0 * np.exp(a*t) - assert np.isclose(x, x_analytical, rtol=1e-4).all() - - -def test_getters(tmp_path): - model_file = tmp_path / "M_getters.mo" - model_file.write_text(""" -model M_getters -Real x(start = 1, fixed = true); -output Real y "the derivative"; -parameter Real a = -0.5; -parameter Real b = 0.1; -equation -der(x) = x*a + b; -y = der(x); -end M_getters; -""") - mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="M_getters") - - q = mod.getQuantities() - assert isinstance(q, list) - assert sorted(q, key=lambda d: d["name"]) == sorted([ - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'local', - 'changeable': 'true', - 'description': None, - 'max': None, - 'min': None, - 'name': 'x', - 'start': '1.0', - 'unit': None, - 'variability': 'continuous', - }, - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'local', - 'changeable': 'false', - 'description': None, - 'max': None, - 'min': None, - 'name': 'der(x)', - 'start': None, - 'unit': None, - 'variability': 'continuous', - }, - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'output', - 'changeable': 'false', - 'description': 'the derivative', - 'max': None, - 'min': None, - 'name': 'y', - 'start': '-0.4', - 'unit': None, - 'variability': 'continuous', - }, - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'parameter', - 'changeable': 'true', - 'description': None, - 'max': None, - 'min': None, - 'name': 'a', - 'start': '-0.5', - 'unit': None, - 'variability': 'parameter', - }, - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'parameter', - 'changeable': 'true', - 'description': None, - 'max': None, - 'min': None, - 'name': 'b', - 'start': '0.1', - 'unit': None, - 'variability': 'parameter', - } - ], key=lambda d: d["name"]) - - assert mod.getQuantities("y") == [ - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'output', - 'changeable': 'false', - 'description': 'the derivative', - 'max': None, - 'min': None, - 'name': 'y', - 'start': '-0.4', - 'unit': None, - 'variability': 'continuous', - } - ] - - assert mod.getQuantities(["y", "x"]) == [ - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'output', - 'changeable': 'false', - 'description': 'the derivative', - 'max': None, - 'min': None, - 'name': 'y', - 'start': '-0.4', - 'unit': None, - 'variability': 'continuous', - }, - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'local', - 'changeable': 'true', - 'description': None, - 'max': None, - 'min': None, - 'name': 'x', - 'start': '1.0', - 'unit': None, - 'variability': 'continuous', - }, - ] - - with pytest.raises(KeyError): - mod.getQuantities("thisQuantityDoesNotExist") - - assert mod.getInputs() == {} - with pytest.raises(KeyError): - mod.getInputs("thisInputDoesNotExist") - # getOutputs before simulate() - assert mod.getOutputs() == {'y': '-0.4'} - assert mod.getOutputs("y") == ["-0.4"] - assert mod.getOutputs(["y", "y"]) == ["-0.4", "-0.4"] - with pytest.raises(KeyError): - mod.getOutputs("thisOutputDoesNotExist") - - # getContinuous before simulate(): - assert mod.getContinuous() == { - 'x': '1.0', - 'der(x)': None, - 'y': '-0.4' - } - assert mod.getContinuous("y") == ['-0.4'] - assert mod.getContinuous(["y", "x"]) == ['-0.4', '1.0'] - with pytest.raises(KeyError): - mod.getContinuous("a") # a is a parameter - - stopTime = 1.0 - a = -0.5 - b = 0.1 - x0 = 1.0 - x_analytical = -b/a + (x0 + b/a) * np.exp(a * stopTime) - dx_analytical = (x0 + b/a) * a * np.exp(a * stopTime) - mod.setSimulationOptions(simOptions={"stopTime": stopTime}) - mod.simulate() - - # getOutputs after simulate() - d = mod.getOutputs() - assert d.keys() == {"y"} - assert np.isclose(d["y"], dx_analytical, 1e-4) - assert mod.getOutputs("y") == [d["y"]] - assert mod.getOutputs(["y", "y"]) == [d["y"], d["y"]] - with pytest.raises(KeyError): - mod.getOutputs("thisOutputDoesNotExist") - - # getContinuous after simulate() should return values at end of simulation: - with pytest.raises(KeyError): - mod.getContinuous("a") # a is a parameter - with pytest.raises(KeyError): - mod.getContinuous(["x", "a", "y"]) # a is a parameter - d = mod.getContinuous() - assert d.keys() == {"x", "der(x)", "y"} - assert np.isclose(d["x"], x_analytical, 1e-4) - assert np.isclose(d["der(x)"], dx_analytical, 1e-4) - assert np.isclose(d["y"], dx_analytical, 1e-4) - assert mod.getContinuous("x") == [d["x"]] - assert mod.getContinuous(["y", "x"]) == [d["y"], d["x"]] - - with pytest.raises(KeyError): - mod.getContinuous("a") # a is a parameter - - with pytest.raises(OMPython.ModelicaSystemError): - mod.setSimulationOptions(simOptions={"thisOptionDoesNotExist": 3}) - - -def test_simulate_inputs(tmp_path): - model_file = tmp_path / "M_input.mo" - model_file.write_text(""" -model M_input -Real x(start=0, fixed=true); -input Real u1; -input Real u2; -output Real y; -equation -der(x) = u1 + u2; -y = x; -end M_input; -""") - mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="M_input") - - mod.setSimulationOptions(simOptions={"stopTime": 1.0}) - - # integrate zero (no setInputs call) - it should default to None -> 0 - assert mod.getInputs() == { - "u1": None, - "u2": None, - } - mod.simulate() - y = mod.getSolutions("y")[0] - assert np.isclose(y[-1], 0.0) - - # integrate a constant - mod.setInputs(name={"u1": 2.5}) - assert mod.getInputs() == { - "u1": [ - (0.0, 2.5), - (1.0, 2.5), - ], - # u2 is set due to the call to simulate() above - "u2": [ - (0.0, 0.0), - (1.0, 0.0), - ], - } - mod.simulate() - y = mod.getSolutions("y")[0] - assert np.isclose(y[-1], 2.5) - - # now let's integrate the sum of two ramps - mod.setInputs(name={"u1": [(0.0, 0.0), (0.5, 2), (1.0, 0)]}) - assert mod.getInputs("u1") == [[ - (0.0, 0.0), - (0.5, 2.0), - (1.0, 0.0), - ]] - mod.simulate() - y = mod.getSolutions("y")[0] - assert np.isclose(y[-1], 1.0) - - # let's try some edge cases - # unmatched startTime - with pytest.raises(OMPython.ModelicaSystemError): - mod.setInputs(name={"u1": [(-0.5, 0.0), (1.0, 1)]}) - mod.simulate() - # unmatched stopTime - with pytest.raises(OMPython.ModelicaSystemError): - mod.setInputs(name={"u1": [(0.0, 0.0), (0.5, 1)]}) - mod.simulate() - - # Let's use both inputs, but each one with different number of - # samples. This has an effect when generating the csv file. - mod.setInputs(name={"u1": [(0.0, 0), (1.0, 1)], - "u2": [(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]}) - csv_file = mod._createCSVData() - assert pathlib.Path(csv_file).read_text() == """time,u1,u2,end -0.0,0.0,0.0,0 -0.25,0.25,0.5,0 -0.5,0.5,1.0,0 -1.0,1.0,0.0,0 -""" - - mod.simulate() - y = mod.getSolutions("y")[0] - assert np.isclose(y[-1], 1.0) diff --git a/tests_v400/test_ModelicaSystemCmd.py b/tests_v400/test_ModelicaSystemCmd.py deleted file mode 100644 index 3544a1bd..00000000 --- a/tests_v400/test_ModelicaSystemCmd.py +++ /dev/null @@ -1,51 +0,0 @@ -import OMPython -import pytest - - -@pytest.fixture -def model_firstorder(tmp_path): - mod = tmp_path / "M.mo" - mod.write_text("""model M - Real x(start = 1, fixed = true); - parameter Real a = -1; -equation - der(x) = x*a; -end M; -""") - return mod - - -@pytest.fixture -def mscmd_firstorder(model_firstorder): - mod = OMPython.ModelicaSystem(fileName=model_firstorder.as_posix(), modelName="M") - mscmd = OMPython.ModelicaSystemCmd(runpath=mod.getWorkDirectory(), modelname=mod._model_name) - return mscmd - - -def test_simflags(mscmd_firstorder): - mscmd = mscmd_firstorder - - mscmd.args_set({ - "noEventEmit": None, - "override": {'b': 2} - }) - with pytest.deprecated_call(): - mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) - - assert mscmd.get_cmd() == [ - mscmd.get_exe().as_posix(), - '-noEventEmit', - '-noRestart', - '-override=a=1,b=2,x=3', - ] - - mscmd.args_set({ - "override": {'b': None}, - }) - - assert mscmd.get_cmd() == [ - mscmd.get_exe().as_posix(), - '-noEventEmit', - '-noRestart', - '-override=a=1,x=3', - ] diff --git a/tests_v400/test_OMParser.py b/tests_v400/test_OMParser.py deleted file mode 100644 index 875604e5..00000000 --- a/tests_v400/test_OMParser.py +++ /dev/null @@ -1,43 +0,0 @@ -from OMPython import OMParser - -typeCheck = OMParser.typeCheck - - -def test_newline_behaviour(): - pass - - -def test_boolean(): - assert typeCheck('TRUE') is True - assert typeCheck('True') is True - assert typeCheck('true') is True - assert typeCheck('FALSE') is False - assert typeCheck('False') is False - assert typeCheck('false') is False - - -def test_int(): - assert typeCheck('2') == 2 - assert type(typeCheck('1')) == int - assert type(typeCheck('123123123123123123232323')) == int - assert type(typeCheck('9223372036854775808')) == int - - -def test_float(): - assert type(typeCheck('1.2e3')) == float - - -# def test_dict(): -# assert type(typeCheck('{"a": "b"}')) == dict - - -def test_ident(): - assert typeCheck('blabla2') == "blabla2" - - -def test_str(): - pass - - -def test_UnStringable(): - pass diff --git a/tests_v400/test_OMSessionCmd.py b/tests_v400/test_OMSessionCmd.py deleted file mode 100644 index 1588fac8..00000000 --- a/tests_v400/test_OMSessionCmd.py +++ /dev/null @@ -1,17 +0,0 @@ -import OMPython - - -def test_isPackage(): - omczmq = OMPython.OMCSessionZMQ() - omccmd = OMPython.OMCSessionCmd(session=omczmq) - assert not omccmd.isPackage('Modelica') - - -def test_isPackage2(): - mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", - lmodel=["Modelica"]) - omccmd = OMPython.OMCSessionCmd(session=mod._getconn) - assert omccmd.isPackage('Modelica') - - -# TODO: add more checks ... diff --git a/tests_v400/test_ZMQ.py b/tests_v400/test_ZMQ.py deleted file mode 100644 index 30bf78e7..00000000 --- a/tests_v400/test_ZMQ.py +++ /dev/null @@ -1,70 +0,0 @@ -import OMPython -import pathlib -import os -import pytest - - -@pytest.fixture -def model_time_str(): - return """model M - Real r = time; -end M; -""" - - -@pytest.fixture -def om(tmp_path): - origDir = pathlib.Path.cwd() - os.chdir(tmp_path) - om = OMPython.OMCSessionZMQ() - os.chdir(origDir) - return om - - -def testHelloWorld(om): - assert om.sendExpression('"HelloWorld!"') == "HelloWorld!" - - -def test_Translate(om, model_time_str): - assert om.sendExpression(model_time_str) == ("M",) - assert om.sendExpression('translateModel(M)') is True - - -def test_Simulate(om, model_time_str): - assert om.sendExpression(f'loadString("{model_time_str}")') is True - om.sendExpression('res:=simulate(M, stopTime=2.0)') - assert om.sendExpression('res.resultFile') - - -def test_execute(om): - with pytest.deprecated_call(): - assert om.execute('"HelloWorld!"') == '"HelloWorld!"\n' - assert om.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' - assert om.sendExpression('"HelloWorld!"', parsed=True) == 'HelloWorld!' - - -def test_omcprocessport_execute(om): - port = om.omc_process.get_port() - omcp = OMPython.OMCProcessPort(omc_port=port) - - # run 1 - om1 = OMPython.OMCSessionZMQ(omc_process=omcp) - assert om1.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' - - # run 2 - om2 = OMPython.OMCSessionZMQ(omc_process=omcp) - assert om2.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' - - del om1 - del om2 - - -def test_omcprocessport_simulate(om, model_time_str): - port = om.omc_process.get_port() - omcp = OMPython.OMCProcessPort(omc_port=port) - - om = OMPython.OMCSessionZMQ(omc_process=omcp) - assert om.sendExpression(f'loadString("{model_time_str}")') is True - om.sendExpression('res:=simulate(M, stopTime=2.0)') - assert om.sendExpression('res.resultFile') != "" - del om diff --git a/tests_v400/test_docker.py b/tests_v400/test_docker.py deleted file mode 100644 index 8d68f11f..00000000 --- a/tests_v400/test_docker.py +++ /dev/null @@ -1,32 +0,0 @@ -import sys -import pytest -import OMPython - -skip_on_windows = pytest.mark.skipif( - sys.platform.startswith("win"), - reason="OpenModelica Docker image is Linux-only; skipping on Windows.", -) - - -@skip_on_windows -def test_docker(): - omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") - om = OMPython.OMCSessionZMQ(omc_process=omcp) - assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0" - - omcpInner = OMPython.OMCProcessDockerContainer(dockerContainer=omcp.get_docker_container_id()) - omInner = OMPython.OMCSessionZMQ(omc_process=omcpInner) - assert omInner.sendExpression("getVersion()") == "OpenModelica 1.25.0" - - omcp2 = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal", port=11111) - om2 = OMPython.OMCSessionZMQ(omc_process=omcp2) - assert om2.sendExpression("getVersion()") == "OpenModelica 1.25.0" - - del omcp2 - del om2 - - del omcpInner - del omInner - - del omcp - del om diff --git a/tests_v400/test_linearization.py b/tests_v400/test_linearization.py deleted file mode 100644 index bccbc40b..00000000 --- a/tests_v400/test_linearization.py +++ /dev/null @@ -1,102 +0,0 @@ -import OMPython -import pytest -import numpy as np - - -@pytest.fixture -def model_linearTest(tmp_path): - mod = tmp_path / "M.mo" - mod.write_text(""" -model linearTest - Real x1(start=1); - Real x2(start=-2); - Real x3(start=3); - Real x4(start=-5); - parameter Real a=3,b=2,c=5,d=7,e=1,f=4; -equation - a*x1 = b*x2 -der(x1); - der(x2) + c*x3 + d*x1 = x4; - f*x4 - e*x3 - der(x3) = x1; - der(x4) = x1 + x2 + der(x3) + x4; -end linearTest; -""") - return mod - - -def test_example(model_linearTest): - mod = OMPython.ModelicaSystem(model_linearTest, "linearTest") - [A, B, C, D] = mod.linearize() - expected_matrixA = [[-3, 2, 0, 0], [-7, 0, -5, 1], [-1, 0, -1, 4], [0, 1, -1, 5]] - assert A == expected_matrixA, f"Matrix does not match the expected value. Got: {A}, Expected: {expected_matrixA}" - assert B == [], f"Matrix does not match the expected value. Got: {B}, Expected: {[]}" - assert C == [], f"Matrix does not match the expected value. Got: {C}, Expected: {[]}" - assert D == [], f"Matrix does not match the expected value. Got: {D}, Expected: {[]}" - assert mod.getLinearInputs() == [] - assert mod.getLinearOutputs() == [] - assert mod.getLinearStates() == ["x1", "x2", "x3", "x4"] - - -def test_getters(tmp_path): - model_file = tmp_path / "pendulum.mo" - model_file.write_text(""" -model Pendulum -Real phi(start=Modelica.Constants.pi, fixed=true); -Real omega(start=0, fixed=true); -input Real u1; -input Real u2; -output Real y1; -output Real y2; -parameter Real l = 1.2; -parameter Real g = 9.81; -equation -der(phi) = omega + u2; -der(omega) = -g/l * sin(phi); -y1 = y2 + 0.5*omega; -y2 = phi + u1; -end Pendulum; -""") - mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="Pendulum", lmodel=["Modelica"]) - - d = mod.getLinearizationOptions() - assert isinstance(d, dict) - assert "startTime" in d - assert "stopTime" in d - assert mod.getLinearizationOptions(["stopTime", "startTime"]) == [d["stopTime"], d["startTime"]] - mod.setLinearizationOptions(linearizationOptions={"stopTime": 0.02}) - assert mod.getLinearizationOptions("stopTime") == ["0.02"] - - mod.setInputs(name={"u1": 10, "u2": 0}) - [A, B, C, D] = mod.linearize() - param_g = float(mod.getParameters("g")[0]) - param_l = float(mod.getParameters("l")[0]) - assert mod.getLinearInputs() == ["u1", "u2"] - assert mod.getLinearStates() == ["omega", "phi"] - assert mod.getLinearOutputs() == ["y1", "y2"] - assert np.isclose(A, [[0, param_g/param_l], [1, 0]]).all() - assert np.isclose(B, [[0, 0], [0, 1]]).all() - assert np.isclose(C, [[0.5, 1], [0, 1]]).all() - assert np.isclose(D, [[1, 0], [1, 0]]).all() - - # test LinearizationResult - result = mod.linearize() - assert result[0] == A - assert result[1] == B - assert result[2] == C - assert result[3] == D - with pytest.raises(KeyError): - result[4] - - A2, B2, C2, D2 = result - assert A2 == A - assert B2 == B - assert C2 == C - assert D2 == D - - assert result.n == 2 - assert result.m == 2 - assert result.p == 2 - assert np.isclose(result.x0, [0, np.pi]).all() - assert np.isclose(result.u0, [10, 0]).all() - assert result.stateVars == ["omega", "phi"] - assert result.inputVars == ["u1", "u2"] - assert result.outputVars == ["y1", "y2"] diff --git a/tests_v400/test_optimization.py b/tests_v400/test_optimization.py deleted file mode 100644 index b4164397..00000000 --- a/tests_v400/test_optimization.py +++ /dev/null @@ -1,67 +0,0 @@ -import OMPython -import numpy as np - - -def test_optimization_example(tmp_path): - model_file = tmp_path / "BangBang2021.mo" - model_file.write_text(""" -model BangBang2021 "Model to verify that optimization gives bang-bang optimal control" -parameter Real m = 1; -parameter Real p = 1 "needed for final constraints"; - -Real a; -Real v(start = 0, fixed = true); -Real pos(start = 0, fixed = true); -Real pow(min = -30, max = 30) = f * v annotation(isConstraint = true); - -input Real f(min = -10, max = 10); - -Real costPos(nominal = 1) = -pos "minimize -pos(tf)" annotation(isMayer=true); - -Real conSpeed(min = 0, max = 0) = p * v " 0<= p*v(tf) <=0" annotation(isFinalConstraint = true); - -equation - -der(pos) = v; -der(v) = a; -f = m * a; - -annotation(experiment(StartTime = 0, StopTime = 1, Tolerance = 1e-07, Interval = 0.01), -__OpenModelica_simulationFlags(s="optimization", optimizerNP="1"), -__OpenModelica_commandLineOptions="+g=Optimica"); - -end BangBang2021; -""") - - mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="BangBang2021") - - mod.setOptimizationOptions(optimizationOptions={"numberOfIntervals": 16, - "stopTime": 1, - "stepSize": 0.001, - "tolerance": 1e-8}) - - # test the getter - assert mod.getOptimizationOptions()["stopTime"] == "1" - assert mod.getOptimizationOptions("stopTime") == ["1"] - assert mod.getOptimizationOptions(["tolerance", "stopTime"]) == ["1e-08", "1"] - - r = mod.optimize() - # it is necessary to specify resultfile, otherwise it wouldn't find it. - time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=r["resultFile"]) - assert np.isclose(f[0], 10) - assert np.isclose(f[-1], -10) - - def f_fcn(time, v): - if time < 0.3: - return 10 - if time <= 0.5: - return 30 / v - if time < 0.7: - return -30 / v - return -10 - f_expected = [f_fcn(t, v) for t, v in zip(time, v)] - - # The sharp edge at time=0.5 probably won't match, let's leave that out. - matches = np.isclose(f, f_expected, 1e-3) - assert matches[:498].all() - assert matches[502:].all() diff --git a/tests_v400/test_typedParser.py b/tests_v400/test_typedParser.py deleted file mode 100644 index 60daedec..00000000 --- a/tests_v400/test_typedParser.py +++ /dev/null @@ -1,53 +0,0 @@ -from OMPython import OMTypedParser - -typeCheck = OMTypedParser.parseString - - -def test_newline_behaviour(): - pass - - -def test_boolean(): - assert typeCheck('true') is True - assert typeCheck('false') is False - - -def test_int(): - assert typeCheck('2') == 2 - assert type(typeCheck('1')) == int - assert type(typeCheck('123123123123123123232323')) == int - assert type(typeCheck('9223372036854775808')) == int - - -def test_float(): - assert type(typeCheck('1.2e3')) == float - - -def test_ident(): - assert typeCheck('blabla2') == "blabla2" - - -def test_empty(): - assert typeCheck('') is None - - -def test_str(): - pass - - -def test_UnStringable(): - pass - - -def test_everything(): - # this test used to be in OMTypedParser.py's main() - testdata = """ - (1.0,{{1,true,3},{"4\\" -",5.9,6,NONE ( )},record ABC - startTime = ErrorLevel.warning, - 'stop*Time' = SOME(1.0) -end ABC;}) - """ - expected = (1.0, ((1, True, 3), ('4"\n', 5.9, 6, None), {"'stop*Time'": 1.0, 'startTime': 'ErrorLevel.warning'})) - results = typeCheck(testdata) - assert results == expected