diff --git a/examples/sdk_resampling_example.py b/examples/sdk_resampling_example.py index 2753077af..386b6d642 100644 --- a/examples/sdk_resampling_example.py +++ b/examples/sdk_resampling_example.py @@ -13,10 +13,11 @@ from frequenz.sdk.actor import ChannelRegistry from frequenz.sdk.actor.data_sourcing import DataSourcingActor -from frequenz.sdk.data_ingestion.resampling.component_metrics_resampling_actor import ( +from frequenz.sdk.actor.resampling import ( + ComponentMetricId, + ComponentMetricRequest, ComponentMetricsResamplingActor, ) -from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest from frequenz.sdk.microgrid import ComponentCategory, microgrid_api HOST = "microgrid.sandbox.api.frequenz.io" diff --git a/minimum-requirements-ci.txt b/minimum-requirements-ci.txt index 68f25267d..7be4150b0 100644 --- a/minimum-requirements-ci.txt +++ b/minimum-requirements-ci.txt @@ -1,6 +1,6 @@ # CI must ensure these dependency versions are supported frequenz-api-microgrid==0.11.0 -frequenz-channels==0.10.0 +frequenz-channels@git+https://github.com/frequenz-floss/frequenz-channels-python@v0.x.x google-api-python-client==2.15 grpcio==1.47 grpcio-tools==1.47 @@ -9,7 +9,6 @@ pandas==1.3.5 protobuf==3.20.2 pyarrow==6.0.0 pydantic==1.9.0 -pytz==2021.3 sympy==1.10.1 toml==0.10 tqdm==4.38.0 diff --git a/noxfile.py b/noxfile.py index b0717d726..397743af0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -53,8 +53,9 @@ def formatting(session: nox.Session, install_deps: bool = True) -> None: if install_deps: session.install(*FMT_DEPS) - session.run("black", "--check", *source_file_paths(session)) - session.run("isort", "--check", *source_file_paths(session)) + paths = source_file_paths(session) + session.run("black", "--check", *paths) + session.run("isort", "--check", *paths) @nox.session @@ -65,13 +66,6 @@ def mypy(session: nox.Session, install_deps: bool = True) -> None: # fast local tests with `nox -R -e mypy`. session.install("-e", ".", "mypy", *PYTEST_DEPS) - # Since we use other packages in the frequenz namespace, we need to run the - # checks for frequenz.sdk from the installed package instead of the src - # directory. - mypy_paths = [ - path for path in source_file_paths(session) if not path.startswith("src") - ] - mypy_cmd = [ "mypy", "--ignore-missing-imports", @@ -83,9 +77,21 @@ def mypy(session: nox.Session, install_deps: bool = True) -> None: "--strict", ] + # If we have session arguments, we just use those... + if session.posargs: + session.run(*mypy_cmd, *session.posargs) + return + # Runs on the installed package session.run(*mypy_cmd, "-p", "frequenz.sdk") + # Runs on the rest of the source folders + # Since we use other packages in the frequenz namespace, we need to run the + # checks for frequenz.sdk from the installed package instead of the src + # directory. + mypy_paths = [ + path for path in source_file_paths(session) if not path.startswith("src") + ] session.run(*mypy_cmd, *mypy_paths) @@ -97,10 +103,11 @@ def pylint(session: nox.Session, install_deps: bool = True) -> None: # fast local tests with `nox -R -e pylint`. session.install("-e", ".", "pylint", *PYTEST_DEPS) + paths = source_file_paths(session) session.run( "pylint", "--extension-pkg-whitelist=pydantic", - *source_file_paths(session), + *paths, ) @@ -110,12 +117,14 @@ def docstrings(session: nox.Session, install_deps: bool = True) -> None: if install_deps: session.install(*DOCSTRING_DEPS, "toml") - session.run("pydocstyle", *source_file_paths(session)) + paths = source_file_paths(session) + session.run("pydocstyle", *paths) # Darglint checks that function argument and return values are documented. # This is needed only for the `src` dir, so we exclude the other top level - # dirs that contain code. - darglint_paths = filter( + # dirs that contain code, unless some paths were specified by argument, in + # which case we use those untouched. + darglint_paths = session.posargs or filter( lambda path: not (path.startswith("tests") or path.startswith("benchmarks")), source_file_paths(session), ) @@ -158,4 +167,5 @@ def _pytest_impl( "--cov=frequenz.sdk", "--cov-report=term", f"--cov-report=html:.htmlcov-{max_or_min_deps}", + *session.posargs, ) diff --git a/pyproject.toml b/pyproject.toml index 1a9f0303d..5b5f32df3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ requires-python = ">= 3.8, < 4" dependencies = [ "frequenz-api-microgrid >= 0.11.0, < 0.12.0", - "frequenz-channels >= 0.10.0, < 0.11.0", + "frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python@v0.x.x", "google-api-python-client >= 2.15, < 3", "grpcio >= 1.47, < 2", "grpcio-tools >= 1.47, < 2", @@ -36,7 +36,6 @@ dependencies = [ "protobuf >= 3.20, < 4", "pyarrow >= 6.0.0, < 6.1", "pydantic >= 1.9", - "pytz >= 2021.3", "sympy >= 1.10.1, < 2", "toml >= 0.10", "tqdm >= 4.38.0, < 5", diff --git a/src/frequenz/sdk/actor/data_sourcing/__init__.py b/src/frequenz/sdk/actor/data_sourcing/__init__.py index 938c8ddba..bab7f0a99 100644 --- a/src/frequenz/sdk/actor/data_sourcing/__init__.py +++ b/src/frequenz/sdk/actor/data_sourcing/__init__.py @@ -9,7 +9,10 @@ """ from .data_sourcing import DataSourcingActor +from .types import ComponentMetricId, ComponentMetricRequest __all__ = [ "DataSourcingActor", + "ComponentMetricId", + "ComponentMetricRequest", ] diff --git a/src/frequenz/sdk/actor/data_sourcing/data_sourcing.py b/src/frequenz/sdk/actor/data_sourcing/data_sourcing.py index f0f823885..6905366e5 100644 --- a/src/frequenz/sdk/actor/data_sourcing/data_sourcing.py +++ b/src/frequenz/sdk/actor/data_sourcing/data_sourcing.py @@ -9,11 +9,9 @@ from frequenz.channels import Receiver -from frequenz.sdk.actor import actor -from frequenz.sdk.actor.channel_registry import ChannelRegistry -from frequenz.sdk.data_pipeline import ComponentMetricRequest - +from .. import ChannelRegistry, actor from .microgrid_api_source import MicrogridApiSource +from .types import ComponentMetricRequest @actor diff --git a/src/frequenz/sdk/actor/data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/actor/data_sourcing/microgrid_api_source.py index c3b1027f3..1cb4b22a0 100644 --- a/src/frequenz/sdk/actor/data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/actor/data_sourcing/microgrid_api_source.py @@ -13,9 +13,7 @@ from frequenz.channels import Receiver, Sender -from frequenz.sdk.actor import ChannelRegistry -from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest, Sample -from frequenz.sdk.microgrid import ( +from ...microgrid import ( BatteryData, ComponentCategory, EVChargerData, @@ -23,6 +21,9 @@ MeterData, microgrid_api, ) +from ...timeseries import Sample +from .. import ChannelRegistry +from .types import ComponentMetricId, ComponentMetricRequest _MeterDataMethods: Dict[ComponentMetricId, Callable[[MeterData], float]] = { ComponentMetricId.ACTIVE_POWER: lambda msg: msg.active_power, diff --git a/src/frequenz/sdk/data_pipeline/types.py b/src/frequenz/sdk/actor/data_sourcing/types.py similarity index 85% rename from src/frequenz/sdk/data_pipeline/types.py rename to src/frequenz/sdk/actor/data_sourcing/types.py index 1b1e1deed..3e80c7052 100644 --- a/src/frequenz/sdk/data_pipeline/types.py +++ b/src/frequenz/sdk/actor/data_sourcing/types.py @@ -73,16 +73,3 @@ def get_channel_name(self) -> str: A string denoting a channel name. """ return f"{self.component_id}::{self.metric_id.name}::{self.start_time}::{self.namespace}" - - -@dataclass(frozen=True) -class Sample: - """A measurement taken at a particular point in time. - - The `value` could be `None` if a component is malfunctioning or data is - lacking for another reason, but a sample still needs to be sent to have a - coherent view on a group of component metrics for a particular timestamp. - """ - - timestamp: datetime - value: Optional[float] = None diff --git a/src/frequenz/sdk/data_ingestion/resampling/component_metrics_resampling_actor.py b/src/frequenz/sdk/actor/resampling.py similarity index 94% rename from src/frequenz/sdk/data_ingestion/resampling/component_metrics_resampling_actor.py rename to src/frequenz/sdk/actor/resampling.py index 4136f70e5..1eab25575 100644 --- a/src/frequenz/sdk/data_ingestion/resampling/component_metrics_resampling_actor.py +++ b/src/frequenz/sdk/actor/resampling.py @@ -15,14 +15,19 @@ from frequenz.channels import MergeNamed, Receiver, Select, Sender, Timer -from ...actor import ChannelRegistry, actor -from ...data_pipeline import ComponentMetricRequest, Sample -from .component_metric_group_resampler import ComponentMetricGroupResampler -from .component_metric_resampler import ResamplingFunction +from ..timeseries import GroupResampler, ResamplingFunction, Sample +from . import ChannelRegistry, actor, data_sourcing logger = logging.Logger(__name__) +# Re-export the types from the data_sourcing actor as we use the same requests, +# we are only forwarding them for now. +ComponentMetricId = data_sourcing.ComponentMetricId + +ComponentMetricRequest = data_sourcing.ComponentMetricRequest + + # pylint: disable=unused-argument def average(samples: Sequence[Sample], resampling_period_s: float) -> float: """Calculate average of the provided values. @@ -147,7 +152,7 @@ async def run() -> None: self._max_data_age_in_periods: float = max_data_age_in_periods self._resampling_function: ResamplingFunction = resampling_function - self._resampler = ComponentMetricGroupResampler( + self._resampler = GroupResampler( resampling_period_s=resampling_period_s, max_data_age_in_periods=max_data_age_in_periods, initial_resampling_function=resampling_function, @@ -209,10 +214,12 @@ async def run(self) -> None: component_data_receiver=MergeNamed(**self._input_receivers), ) while await select.ready(): - if _ := select.resampling_timer: + if msg := select.resampling_timer: + assert msg.inner is not None, "The timer should never be 'closed'" + timestamp = msg.inner awaitables = [ self._output_senders[channel_name].send(sample) - for channel_name, sample in self._resampler.resample() + for channel_name, sample in self._resampler.resample(timestamp) ] await asyncio.gather(*awaitables) if msg := select.component_data_receiver: diff --git a/src/frequenz/sdk/data_handling/time_series.py b/src/frequenz/sdk/data_handling/time_series.py index 659702b08..7859785e0 100644 --- a/src/frequenz/sdk/data_handling/time_series.py +++ b/src/frequenz/sdk/data_handling/time_series.py @@ -9,14 +9,12 @@ """ from __future__ import annotations -import datetime import enum import logging from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone from typing import Collection, Dict, Generic, Optional, Set, TypeVar -import pytz - from .formula import Formula Key = TypeVar("Key") @@ -104,13 +102,13 @@ class Status(enum.Enum): UNKNOWN = "unknown" ERROR = "error" - timestamp: datetime.datetime + timestamp: datetime value: Optional[Value] = None status: Status = Status.VALID broken_component_ids: Set[int] = field(default_factory=set) @staticmethod - def create_error(timestamp: datetime.datetime) -> TimeSeriesEntry[Value]: + def create_error(timestamp: datetime) -> TimeSeriesEntry[Value]: """Create a `TimeSeriesEntry` that contains an error. This can happen when the value would be NaN, e.g. @@ -128,7 +126,7 @@ def create_error(timestamp: datetime.datetime) -> TimeSeriesEntry[Value]: @staticmethod def create_unknown( - timestamp: datetime.datetime, broken_component_ids: Optional[Set[int]] = None + timestamp: datetime, broken_component_ids: Optional[Set[int]] = None ) -> TimeSeriesEntry[Value]: """Create a `TimeSeriesEntry` that contains an unknown value. @@ -174,11 +172,11 @@ class LatestEntryCache(Generic[Key, Value]): def __init__(self) -> None: """Initialize the class.""" - self._latest_timestamp = pytz.utc.localize(datetime.datetime.min) + self._latest_timestamp = datetime.min.replace(tzinfo=timezone.utc) self._entries: Dict[Key, TimeSeriesEntry[Value]] = {} @property - def latest_timestamp(self) -> datetime.datetime: + def latest_timestamp(self) -> datetime: """Get the most recently observed timestamp across all keys in the cache. Returns: @@ -209,7 +207,7 @@ def __contains__(self, key: Key) -> bool: def get( self, key: Key, - timedelta_tolerance: datetime.timedelta = datetime.timedelta.max, + timedelta_tolerance: timedelta = timedelta.max, default: Optional[TimeSeriesEntry[Value]] = None, ) -> CacheEntryLookupResult[Value]: """Get the cached entry for the specified key, if any. @@ -232,7 +230,7 @@ def get( retrieved from the cache has a timestamp greater than the latest saved timestamp across all cache keys. """ - if timedelta_tolerance < datetime.timedelta(0): + if timedelta_tolerance < timedelta(0): raise ValueError( f"timedelta_tolerance cannot be less than 0, but " f"{timedelta_tolerance} was provided" @@ -320,7 +318,7 @@ def reset(self) -> None: slightly more efficient. """ self.clear() - self._latest_timestamp = pytz.utc.localize(datetime.datetime.min) + self._latest_timestamp = datetime.min.replace(tzinfo=timezone.utc) def reset_latest_timestamp(self) -> bool: """Reset the `latest_timestamp` property to the lowest possible value. @@ -342,7 +340,7 @@ def reset_latest_timestamp(self) -> bool: self._latest_timestamp = max( map(lambda x: x.timestamp, self._entries.values()), - default=pytz.utc.localize(datetime.datetime.min), + default=datetime.min.replace(tzinfo=timezone.utc), ) if self._latest_timestamp > previous: @@ -423,7 +421,7 @@ def evaluate( cache: LatestEntryCache[str, Value], formula_name: str = "", symbol_to_symbol_mapping: Optional[Dict[str, SymbolMapping]] = None, - timedelta_tolerance: datetime.timedelta = datetime.timedelta.max, + timedelta_tolerance: timedelta = timedelta.max, default_entry: Optional[TimeSeriesEntry[Value]] = None, ) -> Optional[TimeSeriesEntry[Value]]: """Evaluate the formula using time-series values from the provided cache. @@ -463,7 +461,7 @@ def evaluate( `timedelta_tolerance` """ kwargs: Dict[str, Optional[Value]] = {} - timestamp = pytz.utc.localize(datetime.datetime.min) + timestamp = datetime.min.replace(tzinfo=timezone.utc) symbol_to_symbol_mapping = symbol_to_symbol_mapping or {} formula_broken_component_ids: Set[int] = set() diff --git a/src/frequenz/sdk/data_ingestion/formula_calculator.py b/src/frequenz/sdk/data_ingestion/formula_calculator.py index 138b1e03b..54c595c20 100644 --- a/src/frequenz/sdk/data_ingestion/formula_calculator.py +++ b/src/frequenz/sdk/data_ingestion/formula_calculator.py @@ -7,13 +7,12 @@ License MIT """ -import datetime as dt import logging from dataclasses import dataclass +from datetime import datetime, timedelta, timezone from itertools import chain from typing import Any, Dict, List, Optional, Set, Tuple -import pytz import sympy from ..data_handling.time_series import ( @@ -834,11 +833,9 @@ def compute( cache=self.symbol_values, formula_name=formula_name, symbol_to_symbol_mapping=self.symbol_mappings, - timedelta_tolerance=dt.timedelta( - seconds=self.component_data_timeout_sec - ), + timedelta_tolerance=timedelta(seconds=self.component_data_timeout_sec), default_entry=TimeSeriesEntry[Any]( - timestamp=pytz.utc.localize(dt.datetime.now()), value=0.0 + timestamp=datetime.now(timezone.utc), value=0.0 ), ) if res is not None: diff --git a/src/frequenz/sdk/data_ingestion/load_historic_data.py b/src/frequenz/sdk/data_ingestion/load_historic_data.py index 9ac09d726..0180a34ef 100644 --- a/src/frequenz/sdk/data_ingestion/load_historic_data.py +++ b/src/frequenz/sdk/data_ingestion/load_historic_data.py @@ -13,17 +13,16 @@ MIT """ -import datetime as dt import glob import itertools import logging import os from dataclasses import dataclass +from datetime import datetime, timezone from typing import Any, Callable, List, Optional import pandas as pd import pyarrow.parquet as pq -import pytz from tqdm import tqdm logger = logging.Logger(__name__) @@ -151,8 +150,8 @@ def gen_date_dirs(data_dir: str, dates: pd.DatetimeIndex) -> List[str]: def crop_df_list_by_time( df_list: List[pd.DataFrame], - start_time: dt.datetime, - end_time: dt.datetime, + start_time: datetime, + end_time: datetime, ) -> pd.DataFrame: """Concat and crop read data by the specified start and end time. @@ -169,7 +168,7 @@ def crop_df_list_by_time( specified start and end times. """ df0 = pd.concat(df_list).reset_index(drop=True) - df0["ts"] = pd.to_datetime(df0["ts"]).dt.tz_localize(pytz.UTC) + df0["ts"] = pd.to_datetime(df0["ts"]).tz_localize(timezone.utc) df0 = df0.loc[((df0["ts"] >= start_time) & (df0["ts"] <= end_time))].reset_index( drop=True ) @@ -256,15 +255,15 @@ def get_file_timestamps(self, filenames: List[str]) -> pd.Series: for file in filenames ], format=self.file_time_format, - ).tz_localize(pytz.UTC) + ).tz_localize(timezone.utc) return timestamps def gen_datafile_list( self, data_dir: str, dates: pd.DatetimeIndex, - start_time: dt.datetime, - end_time: dt.datetime, + start_time: datetime, + end_time: datetime, ) -> List[str]: """Generate the list of historic parquet files to read. @@ -352,8 +351,8 @@ def load_parquet_files( def read( self, load_hd_settings: LoadHistoricDataSettings, - start_time: dt.datetime, - end_time: dt.datetime, + start_time: datetime, + end_time: datetime, ) -> pd.DataFrame: """Read historical data. @@ -373,9 +372,9 @@ def read( "component_id=" + str(load_hd_settings.component_info.component_id), ) if start_time.tzinfo is None: - start_time = start_time.replace(tzinfo=pytz.UTC) + start_time = start_time.replace(tzinfo=timezone.utc) if end_time.tzinfo is None: - end_time = end_time.replace(tzinfo=pytz.UTC) + end_time = end_time.replace(tzinfo=timezone.utc) logger.info( "reading historic data from component id=%s within the time interval: %s to %s", load_hd_settings.component_info.component_id, diff --git a/src/frequenz/sdk/data_ingestion/microgrid_data.py b/src/frequenz/sdk/data_ingestion/microgrid_data.py index bc903dcef..df2f10157 100644 --- a/src/frequenz/sdk/data_ingestion/microgrid_data.py +++ b/src/frequenz/sdk/data_ingestion/microgrid_data.py @@ -11,11 +11,10 @@ MIT """ import asyncio -import datetime as dt import logging +from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Set -import pytz from frequenz.channels import Broadcast, Merge, Receiver, Select, Sender from ..actor.decorator import actor @@ -99,7 +98,7 @@ async def resend_formulas(self) -> None: await asyncio.sleep(self._wait_for_data_sec) tasks: List["asyncio.Task[bool]"] = [] while True: - start_time = dt.datetime.now(tz=pytz.UTC) + start_time = datetime.now(timezone.utc) # For every formula that was updated at least once, send that formula. for name, formula_result in self.formula_calculator.results.items(): if name not in self._outputs: @@ -114,7 +113,7 @@ async def resend_formulas(self) -> None: tasks.append(task) await asyncio.gather(*tasks, return_exceptions=True) - diff: float = (dt.datetime.now(pytz.UTC) - start_time).total_seconds() + diff: float = (datetime.now(timezone.utc) - start_time).total_seconds() if diff >= self._formula_update_interval_sec: logger.error( "Sending results of formulas took too long: %f, " diff --git a/src/frequenz/sdk/data_ingestion/resampling/component_metric_group_resampler.py b/src/frequenz/sdk/data_ingestion/resampling/component_metric_group_resampler.py deleted file mode 100644 index 70b419d56..000000000 --- a/src/frequenz/sdk/data_ingestion/resampling/component_metric_group_resampler.py +++ /dev/null @@ -1,108 +0,0 @@ -""" -ComponentMetricGroupResampler class that delegates resampling to individual resamplers. - -Copyright -Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -License -MIT -""" -import logging -from datetime import datetime -from typing import Dict, Generator, Tuple - -import pytz - -from ...data_pipeline import Sample -from .component_metric_resampler import ComponentMetricResampler, ResamplingFunction - -logger = logging.Logger(__name__) - - -class ComponentMetricGroupResampler: - """Class that delegates resampling to individual component metric resamplers.""" - - def __init__( - self, - *, - resampling_period_s: float, - initial_resampling_function: ResamplingFunction, - max_data_age_in_periods: float = 3.0, - ) -> None: - """Initialize the ComponentMetricGroupResampler. - - Args: - resampling_period_s: value describing how often resampling should be - performed, in seconds - initial_resampling_function: function to be applied to a sequence of - samples within a resampling period to produce a single output sample - max_data_age_in_periods: max age that samples shouldn't exceed in order - to be used in the resampling function - """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._initial_resampling_function: ResamplingFunction = ( - initial_resampling_function - ) - self._resamplers: Dict[str, ComponentMetricResampler] = {} - - def add_time_series(self, time_series_id: str) -> None: - """Create a new resampler for a specific time series. - - If resampler already exists for the provided `time_series_id`, it will be used - without creating a new one. - - Args: - time_series_id: time series id - """ - if time_series_id in self._resamplers: - return - - self._resamplers[time_series_id] = ComponentMetricResampler( - resampling_period_s=self._resampling_period_s, - max_data_age_in_periods=self._max_data_age_in_periods, - resampling_function=self._initial_resampling_function, - ) - - def remove_timeseries(self, time_series_id: str) -> None: - """Remove a resampler for a specific time series. - - Args: - time_series_id: time series id, for which to remove the resampler - - Raises: - KeyError: if resampler for the provided timer_series_id doesn't exist - """ - try: - del self._resamplers[time_series_id] - except KeyError as err: - raise KeyError( - f"No resampler for time series {time_series_id} found!" - ) from err - - def add_sample(self, time_series_id: str, sample: Sample) -> None: - """Add a sample for a specific time series. - - Args: - time_series_id: time series id, which the sample should be added to - sample: sample to be added - - Raises: - KeyError: if resampler for the provided timer_series_id doesn't exist - """ - try: - self._resamplers[time_series_id].add_sample(sample) - except KeyError as err: - raise KeyError( - f"No resampler for time series {time_series_id} found!" - ) from err - - def resample(self) -> Generator[Tuple[str, Sample], None, None]: - """Resample samples for all time series. - - Yields: - iterator of time series ids and their newly resampled samples - """ - now = datetime.now(tz=pytz.UTC) - for time_series_id, resampler in self._resamplers.items(): - yield time_series_id, Sample(timestamp=now, value=resampler.resample()) diff --git a/src/frequenz/sdk/data_ingestion/resampling/component_metric_resampler.py b/src/frequenz/sdk/data_ingestion/resampling/component_metric_resampler.py deleted file mode 100644 index b00f1fc92..000000000 --- a/src/frequenz/sdk/data_ingestion/resampling/component_metric_resampler.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -ComponentMetricResampler class for resampling individual metrics. - -Copyright -Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -License -MIT -""" -import logging -from collections import deque -from datetime import datetime, timedelta -from typing import Callable, Deque, Optional, Sequence - -import pytz - -from ...data_pipeline import Sample - -logger = logging.Logger(__name__) - - -ResamplingFunction = Callable[[Sequence[Sample], float], float] - - -class ComponentMetricResampler: - """Resampler for a single metric of a specific component, e.g. 123_active_power.""" - - def __init__( - self, - resampling_period_s: float, - max_data_age_in_periods: float, - resampling_function: ResamplingFunction, - ) -> None: - """Initialize the ComponentMetricResampler. - - Args: - resampling_period_s: value describing how often resampling should be - performed, in seconds - max_data_age_in_periods: max age that samples shouldn't exceed in order - to be used in the resampling function - resampling_function: function to be applied to a sequence of samples within - a resampling period to produce a single output sample - """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._buffer: Deque[Sample] = deque() - self._resampling_function: ResamplingFunction = resampling_function - - def add_sample(self, sample: Sample) -> None: - """Add a new sample. - - Args: - sample: sample to be added to the buffer - """ - self._buffer.append(sample) - - def remove_outdated_samples(self, threshold: datetime) -> None: - """Remove samples that are older than the provided time threshold. - - It is assumed that items in the buffer are in a sorted order (ascending order - by timestamp). - - The removal works by traversing the buffer starting from the oldest sample - (smallest timestamp) and comparing sample's timestamp with the threshold. - If the sample's threshold is smaller than `threshold`, it means that the - sample is outdated and it is removed from the buffer. This continues until - the first sample that is with timestamp greater or equal to `threshold` is - encountered, then buffer is considered up to date. - - Args: - threshold: samples whose timestamp is older than the threshold are - considered outdated and should be remove from the buffer - """ - while self._buffer: - sample: Sample = self._buffer[0] - if sample.timestamp >= threshold: - return - - self._buffer.popleft() - - def resample(self) -> Optional[float]: - """Resample samples from the buffer and produce a single sample. - - Returns: - Samples resampled into a single sample or `None` if the - `resampling_function` cannot produce a valid Sample. - """ - # It might be better to provide `now` from the outside so that all - # individual resamplers use the same `now` - now = datetime.now(tz=pytz.UTC) - threshold = now - timedelta( - seconds=self._max_data_age_in_periods * self._resampling_period_s - ) - self.remove_outdated_samples(threshold=threshold) - if len(self._buffer) == 0: - return None - return self._resampling_function(self._buffer, self._resampling_period_s) diff --git a/src/frequenz/sdk/data_pipeline/__init__.py b/src/frequenz/sdk/data_pipeline/__init__.py deleted file mode 100644 index d36709c58..000000000 --- a/src/frequenz/sdk/data_pipeline/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -"""Utils for the Data Pipeline. - -Copyright -Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -License -MIT -""" - -from .types import ComponentMetricId, ComponentMetricRequest, Sample - -__all__ = [ - "ComponentMetricRequest", - "ComponentMetricId", - "Sample", -] diff --git a/src/frequenz/sdk/power_distribution/power_distributor.py b/src/frequenz/sdk/power_distribution/power_distributor.py index 64418006d..2501d531d 100644 --- a/src/frequenz/sdk/power_distribution/power_distributor.py +++ b/src/frequenz/sdk/power_distribution/power_distributor.py @@ -9,9 +9,9 @@ MIT """ import asyncio -import datetime as dt import logging from asyncio.tasks import ALL_COMPLETED +from datetime import datetime, timezone from typing import ( # pylint: disable=unused-import Any, Dict, @@ -24,7 +24,6 @@ ) import grpc -import pytz from frequenz.channels import BidirectionalHandle, Peekable, Receiver from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module @@ -528,7 +527,7 @@ def _is_component_data_valid( ) return False - now = dt.datetime.now(tz=pytz.UTC) + now = datetime.now(timezone.utc) time_delta = now - component_data.timestamp if time_delta.total_seconds() > self.component_data_timeout_sec: _logger.warning( diff --git a/src/frequenz/sdk/power_distribution/utils.py b/src/frequenz/sdk/power_distribution/utils.py index 9fa757b2d..f662f4dab 100644 --- a/src/frequenz/sdk/power_distribution/utils.py +++ b/src/frequenz/sdk/power_distribution/utils.py @@ -7,7 +7,7 @@ MIT """ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import Dict, NamedTuple, Optional, Set @@ -100,7 +100,7 @@ def mark_as_broken(self, component_id: int) -> None: Args: component_id: component id """ - self._broken[component_id] = datetime.now() + self._broken[component_id] = datetime.now(timezone.utc) def update_retry(self, timeout_sec: float) -> None: """Change how long the component should be marked as broken. @@ -121,7 +121,9 @@ def is_broken(self, component_id: int) -> bool: """ if component_id in self._broken: last_broken = self._broken[component_id] - if (datetime.now() - last_broken).total_seconds() < self._timeout_sec: + if ( + datetime.now(timezone.utc) - last_broken + ).total_seconds() < self._timeout_sec: return True del self._broken[component_id] diff --git a/src/frequenz/sdk/timeseries/__init__.py b/src/frequenz/sdk/timeseries/__init__.py new file mode 100644 index 000000000..a52321752 --- /dev/null +++ b/src/frequenz/sdk/timeseries/__init__.py @@ -0,0 +1,24 @@ +""" +Handling of timeseries streams. + +A timeseries is a stream (normally an async iterator) of +[samples][frequenz.sdk.timeseries.Sample]. + +This module provides tools to operate on timeseries. + +Copyright +Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +License +MIT +""" + +from ._resampler import GroupResampler, Resampler, ResamplingFunction +from .sample import Sample + +__all__ = [ + "GroupResampler", + "Resampler", + "ResamplingFunction", + "Sample", +] diff --git a/src/frequenz/sdk/timeseries/_resampler.py b/src/frequenz/sdk/timeseries/_resampler.py new file mode 100644 index 000000000..4770a28f6 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampler.py @@ -0,0 +1,231 @@ +""" +Timeseries resampler. + +Copyright +Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +License +MIT +""" + +import logging +from collections import deque +from datetime import datetime, timedelta, timezone +from typing import Callable, Deque, Dict, Generator, Optional, Sequence, Tuple + +from .sample import Sample + +logger = logging.Logger(__name__) + + +ResamplingFunction = Callable[[Sequence[Sample], float], float] +"""Resampling function type. + +A resampling function produces a new sample based on a list of pre-existing +samples. It can do "upsampling" when there data rate of the `input_samples` +period is smaller than the `resampling_period_s`, or "downsampling" if it is +bigger. + +In general a resampling window is the same as the `resampling_period_s`, and +this function might receive input samples from multiple windows in the past to +enable extrapolation, but no samples from the future (so the timestamp of the +new sample that is going to be produced will always be bigger than the biggest +timestamp in the input data). + +Args: + input_samples (Sequence[Sample]): the sequence of pre-existing samples. + resampling_period_s (float): the period in seconds (i.e. how ofter a new sample is + produced. + +Returns: + new_sample (float): The value of new sample produced after the resampling. +""" + + +class Resampler: + """Ingests samples and produces resampled data for one timeseries. + + Samples are stored in an internal ring buffer. All collected samples that + are newer than `resampling_period_s * max_data_age_in_periods` seconds will + be passed to the provided + [ResamplingFunction][frequenz.sdk.timeseries.ResamplingFunction]. + """ + + def __init__( + self, + resampling_period_s: float, + max_data_age_in_periods: float, + resampling_function: ResamplingFunction, + ) -> None: + """Initialize the ComponentMetricResampler. + + Args: + resampling_period_s: value describing how often resampling should be + performed, in seconds + max_data_age_in_periods: max age that samples shouldn't exceed in order + to be used in the resampling function + resampling_function: function to be applied to a sequence of samples within + a resampling period to produce a single output sample + """ + self._resampling_period_s = resampling_period_s + self._max_data_age_in_periods: float = max_data_age_in_periods + self._buffer: Deque[Sample] = deque() + self._resampling_function: ResamplingFunction = resampling_function + + def add_sample(self, sample: Sample) -> None: + """Add a new sample. + + Args: + sample: sample to be added to the buffer + """ + self._buffer.append(sample) + + def _remove_outdated_samples(self, threshold: datetime) -> None: + """Remove samples that are older than the provided time threshold. + + It is assumed that items in the buffer are in a sorted order (ascending order + by timestamp). + + The removal works by traversing the buffer starting from the oldest sample + (smallest timestamp) and comparing sample's timestamp with the threshold. + If the sample's threshold is smaller than `threshold`, it means that the + sample is outdated and it is removed from the buffer. This continues until + the first sample that is with timestamp greater or equal to `threshold` is + encountered, then buffer is considered up to date. + + Args: + threshold: samples whose timestamp is older than the threshold are + considered outdated and should be remove from the buffer + """ + while self._buffer: + sample: Sample = self._buffer[0] + if sample.timestamp >= threshold: + return + + self._buffer.popleft() + + def resample(self, timestamp: Optional[datetime] = None) -> Optional[float]: + """Resample samples from the buffer and produce a single sample. + + Args: + timestamp: the timestamp to use to as the current resampling + timestamp when calculating which stored past samples are + relevant to pass to the resampling function. If `None`, the + current datetime (in UTC) will be used. + + Returns: + Samples resampled into a single sample or `None` if the + `resampling_function` cannot produce a valid Sample. + """ + if timestamp is None: + timestamp = datetime.now(timezone.utc) + threshold = timestamp - timedelta( + seconds=self._max_data_age_in_periods * self._resampling_period_s + ) + self._remove_outdated_samples(threshold=threshold) + if len(self._buffer) == 0: + return None + return self._resampling_function(self._buffer, self._resampling_period_s) + + +class GroupResampler: + """Ingests samples and produces resampled data for a group of timeseries. + + Like the [Resampler][frequenz.sdk.timeseries.Resampler] but handles a group + of timeseries. + """ + + def __init__( + self, + *, + resampling_period_s: float, + initial_resampling_function: ResamplingFunction, + max_data_age_in_periods: float = 3.0, + ) -> None: + """Initialize the ComponentMetricGroupResampler. + + Args: + resampling_period_s: value describing how often resampling should be + performed, in seconds + initial_resampling_function: function to be applied to a sequence of + samples within a resampling period to produce a single output sample + max_data_age_in_periods: max age that samples shouldn't exceed in order + to be used in the resampling function + """ + self._resampling_period_s = resampling_period_s + self._max_data_age_in_periods: float = max_data_age_in_periods + self._initial_resampling_function: ResamplingFunction = ( + initial_resampling_function + ) + self._resamplers: Dict[str, Resampler] = {} + + def add_time_series(self, time_series_id: str) -> None: + """Create a new resampler for a specific time series. + + If resampler already exists for the provided `time_series_id`, it will be used + without creating a new one. + + Args: + time_series_id: time series id + """ + if time_series_id in self._resamplers: + return + + self._resamplers[time_series_id] = Resampler( + resampling_period_s=self._resampling_period_s, + max_data_age_in_periods=self._max_data_age_in_periods, + resampling_function=self._initial_resampling_function, + ) + + def remove_timeseries(self, time_series_id: str) -> None: + """Remove a resampler for a specific time series. + + Args: + time_series_id: time series id, for which to remove the resampler + + Raises: + KeyError: if resampler for the provided timer_series_id doesn't exist + """ + try: + del self._resamplers[time_series_id] + except KeyError as err: + raise KeyError( + f"No resampler for time series {time_series_id} found!" + ) from err + + def add_sample(self, time_series_id: str, sample: Sample) -> None: + """Add a sample for a specific time series. + + Args: + time_series_id: time series id, which the sample should be added to + sample: sample to be added + + Raises: + KeyError: if resampler for the provided timer_series_id doesn't exist + """ + try: + self._resamplers[time_series_id].add_sample(sample) + except KeyError as err: + raise KeyError( + f"No resampler for time series {time_series_id} found!" + ) from err + + def resample( + self, timestamp: Optional[datetime] = None + ) -> Generator[Tuple[str, Sample], None, None]: + """Resample samples for all time series. + + Args: + timestamp: the timestamp to use to emit the new samples (and to + consider stored samples relevant for resampling. If `None`, + the current datetime (in UTC) will be used. + + Yields: + iterator of time series ids and their newly resampled samples + """ + if timestamp is None: + timestamp = datetime.now(timezone.utc) + for time_series_id, resampler in self._resamplers.items(): + yield time_series_id, Sample( + timestamp=timestamp, value=resampler.resample(timestamp) + ) diff --git a/src/frequenz/sdk/timeseries/sample.py b/src/frequenz/sdk/timeseries/sample.py new file mode 100644 index 000000000..0dbefd81c --- /dev/null +++ b/src/frequenz/sdk/timeseries/sample.py @@ -0,0 +1,25 @@ +"""Common types for the Data Pipeline. + +Copyright +Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +License +MIT +""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + + +@dataclass(frozen=True) +class Sample: + """A measurement taken at a particular point in time. + + The `value` could be `None` if a component is malfunctioning or data is + lacking for another reason, but a sample still needs to be sent to have a + coherent view on a group of component metrics for a particular timestamp. + """ + + timestamp: datetime + value: Optional[float] = None diff --git a/tests/actor/test_component_metrics_resampling_actor.py b/tests/actor/test_component_metrics_resampling_actor.py deleted file mode 100644 index 3ccdcb52f..000000000 --- a/tests/actor/test_component_metrics_resampling_actor.py +++ /dev/null @@ -1,142 +0,0 @@ -"""Frequenz Python SDK resampling example. - -Copyright -Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -License -MIT -""" -import time -from typing import Iterator - -import grpc -from frequenz.api.microgrid import microgrid_pb2 -from frequenz.api.microgrid.battery_pb2 import Battery -from frequenz.api.microgrid.battery_pb2 import Data as BatteryData -from frequenz.api.microgrid.common_pb2 import MetricAggregation -from frequenz.api.microgrid.microgrid_pb2 import ComponentData, ComponentIdParam -from frequenz.channels import Broadcast -from google.protobuf.timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module -from pytest_mock import MockerFixture - -from frequenz.sdk.actor import ChannelRegistry -from frequenz.sdk.actor.data_sourcing import DataSourcingActor -from frequenz.sdk.data_ingestion.resampling.component_metrics_resampling_actor import ( - ComponentMetricsResamplingActor, -) -from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest -from frequenz.sdk.microgrid import microgrid_api -from tests.test_microgrid import mock_api - - -async def test_component_metrics_resampling_actor(mocker: MockerFixture) -> None: - """Run main functions that initializes and creates everything.""" - - servicer = mock_api.MockMicrogridServicer() - - # pylint: disable=unused-argument - def get_component_data( - request: ComponentIdParam, context: grpc.ServicerContext - ) -> Iterator[ComponentData]: - """Return an iterator for mock ComponentData.""" - # pylint: disable=stop-iteration-return - - def next_msg(value: float) -> ComponentData: - timestamp = Timestamp() - timestamp.GetCurrentTime() - return ComponentData( - id=request.id, - ts=timestamp, - battery=Battery( - data=BatteryData( - soc=MetricAggregation(avg=value), - ) - ), - ) - - for value in [3, 6, 9]: - yield next_msg(value=value) - time.sleep(0.1) - - mocker.patch.object(servicer, "GetComponentData", get_component_data) - - server = mock_api.MockGrpcServer(servicer, port=57899) - await server.start() - - servicer.add_component(1, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_GRID) - servicer.add_component( - 3, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_JUNCTION - ) - servicer.add_component(4, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_METER) - servicer.add_component(7, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_METER) - servicer.add_component( - 8, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_INVERTER - ) - servicer.add_component( - 9, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_BATTERY - ) - - servicer.add_connection(1, 3) - servicer.add_connection(3, 4) - servicer.add_connection(3, 7) - servicer.add_connection(7, 8) - servicer.add_connection(8, 9) - - await microgrid_api.initialize("[::1]", 57899) - - channel_registry = ChannelRegistry(name="Microgrid Channel Registry") - - data_source_request_channel = Broadcast[ComponentMetricRequest]( - "Data Source Request Channel" - ) - data_source_request_sender = data_source_request_channel.get_sender() - data_source_request_receiver = data_source_request_channel.get_receiver() - - resampling_actor_request_channel = Broadcast[ComponentMetricRequest]( - "Resampling Actor Request Channel" - ) - resampling_actor_request_sender = resampling_actor_request_channel.get_sender() - resampling_actor_request_receiver = resampling_actor_request_channel.get_receiver() - - DataSourcingActor( - request_receiver=data_source_request_receiver, registry=channel_registry - ) - - ComponentMetricsResamplingActor( - channel_registry=channel_registry, - subscription_sender=data_source_request_sender, - subscription_receiver=resampling_actor_request_receiver, - resampling_period_s=0.1, - ) - - subscription_request = ComponentMetricRequest( - namespace="Resampling", - component_id=9, - metric_id=ComponentMetricId.SOC, - start_time=None, - ) - - await resampling_actor_request_sender.send(subscription_request) - - index = 0 - expected_sample_values = [ - 3.0, - 4.5, - 6.0, - 7.5, - 9.0, - None, - None, - None, - ] - - async for sample in channel_registry.get_receiver( - subscription_request.get_channel_name() - ): - assert sample.value == expected_sample_values[index] - index += 1 - if index >= len(expected_sample_values): - break - - await server.stop(0.1) - microgrid_api._MICROGRID_API = None # pylint: disable=protected-access diff --git a/tests/actor/test_data_sourcing.py b/tests/actor/test_data_sourcing.py index 9544e622a..cae326427 100644 --- a/tests/actor/test_data_sourcing.py +++ b/tests/actor/test_data_sourcing.py @@ -12,8 +12,11 @@ from frequenz.channels import Broadcast from frequenz.sdk.actor import ChannelRegistry -from frequenz.sdk.actor.data_sourcing import DataSourcingActor -from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest +from frequenz.sdk.actor.data_sourcing import ( + ComponentMetricId, + ComponentMetricRequest, + DataSourcingActor, +) from frequenz.sdk.microgrid import microgrid_api from tests.test_microgrid import mock_api diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py new file mode 100644 index 000000000..bca277736 --- /dev/null +++ b/tests/actor/test_resampling.py @@ -0,0 +1,99 @@ +"""Frequenz Python SDK resampling example. + +Copyright +Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +License +MIT +""" +import dataclasses +from datetime import datetime, timedelta, timezone + +import time_machine +from frequenz.channels import Broadcast + +from frequenz.sdk.actor import ChannelRegistry +from frequenz.sdk.actor.resampling import ( + ComponentMetricId, + ComponentMetricRequest, + ComponentMetricsResamplingActor, +) +from frequenz.sdk.timeseries import Sample + + +def _now(*, shift: float = 0.0) -> datetime: + return datetime.now(timezone.utc) + timedelta(seconds=shift) + + +@time_machine.travel(0) +async def test_component_metrics_resampling_actor() -> None: + """Run main functions that initializes and creates everything.""" + + channel_registry = ChannelRegistry(name="test") + data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req") + data_source_req_recv = data_source_req_chan.get_receiver() + resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req") + resampling_req_sender = resampling_req_chan.get_sender() + + resampling_actor = ComponentMetricsResamplingActor( + channel_registry=channel_registry, + subscription_sender=data_source_req_chan.get_sender(), + subscription_receiver=resampling_req_chan.get_receiver(), + resampling_period_s=0.2, + max_data_age_in_periods=2, + ) + + subs_req = ComponentMetricRequest( + namespace="Resampling", + component_id=9, + metric_id=ComponentMetricId.SOC, + start_time=None, + ) + + await resampling_req_sender.send(subs_req) + data_source_req = await data_source_req_recv.receive() + assert data_source_req is not None + assert data_source_req == dataclasses.replace(subs_req, namespace="Source") + + timeseries_receiver = channel_registry.get_receiver(subs_req.get_channel_name()) + timeseries_sender = channel_registry.get_sender(data_source_req.get_channel_name()) + + new_sample = await timeseries_receiver.receive() # At ~0.2s (timer) + assert new_sample is not None + assert new_sample.value is None + + sample = Sample(_now(shift=0.1), 3) # ts = ~0.3s + await timeseries_sender.send(sample) + new_sample = await timeseries_receiver.receive() # At ~0.4s (timer) + assert new_sample is not None + assert new_sample.value == 3 + assert new_sample.timestamp >= sample.timestamp + + sample = Sample(_now(shift=0.05), 4) # ts = ~0.45s + await timeseries_sender.send(sample) + new_sample = await timeseries_receiver.receive() # At ~0.6s (timer) + assert new_sample is not None + assert new_sample.value == 3.5 # avg(3, 4) + assert new_sample.timestamp >= sample.timestamp + + await timeseries_sender.send(Sample(_now(shift=0.05), 8)) # ts = ~0.65s + await timeseries_sender.send(Sample(_now(shift=0.1), 1)) # ts = ~0.7s + sample = Sample(_now(shift=0.15), 9) # ts = ~0.75s + await timeseries_sender.send(sample) + new_sample = await timeseries_receiver.receive() # At ~0.8s (timer) + assert new_sample is not None + assert new_sample.value == 5.5 # avg(4, 8, 1, 9) + assert new_sample.timestamp >= sample.timestamp + + # No more samples sent + new_sample = await timeseries_receiver.receive() # At ~1.0s (timer) + assert new_sample is not None + assert new_sample.value == 6 # avg(8, 1, 9) + assert new_sample.timestamp >= sample.timestamp + + # No more samples sent + new_sample = await timeseries_receiver.receive() # At ~1.2s (timer) + assert new_sample is not None + assert new_sample.value is None + + await resampling_actor._stop() # type: ignore # pylint: disable=no-member, protected-access diff --git a/tests/data_ingestion/test_group_timeseries_resampler.py b/tests/data_ingestion/test_group_timeseries_resampler.py deleted file mode 100644 index bdf16725e..000000000 --- a/tests/data_ingestion/test_group_timeseries_resampler.py +++ /dev/null @@ -1,81 +0,0 @@ -""" -Tests for the `ComponentMetricGroupResampler` - -Copyright -Copyright © 2021 Frequenz Energy-as-a-Service GmbH - -License -MIT -""" -from datetime import datetime, timedelta -from typing import Sequence - -import pytz -import time_machine - -from frequenz.sdk.data_ingestion.resampling.component_metric_group_resampler import ( - ComponentMetricGroupResampler, -) -from frequenz.sdk.data_pipeline import Sample - - -# pylint: disable=unused-argument -def resampling_function_sum( - samples: Sequence[Sample], resampling_period_s: float -) -> float: - """Calculate sum of the provided values. - - Args: - samples: sequences of samples to apply the average to - resampling_period_s: value describing how often resampling should be performed, - in seconds - - Returns: - sum of all the sample values - - Raises: - AssertionError if there are no provided samples - """ - assert len(samples) > 0, "Sum cannot be given an empty list of samples" - return sum(sample.value for sample in samples if sample.value is not None) - - -@time_machine.travel(datetime.now()) -def test_component_metric_group_resampler() -> None: - """Test if resampling is properly delegated to component metric resamplers.""" - resampler = ComponentMetricGroupResampler( - resampling_period_s=0.2, - max_data_age_in_periods=5.0, - initial_resampling_function=resampling_function_sum, - ) - - time_series_id_1 = "123_active_power" - time_series_id_2 = "99_active_power" - - resampler.add_time_series(time_series_id=time_series_id_1) - resampler.add_time_series(time_series_id=time_series_id_2) - - timestamp = datetime.now(tz=pytz.UTC) - - value1 = 5.0 - value2 = 15.0 - value3 = 100.0 - value4 = 999.0 - - sample1 = Sample(timestamp - timedelta(seconds=0.5), value=value1) - sample2 = Sample(timestamp - timedelta(seconds=0.7), value=value2) - sample3 = Sample(timestamp - timedelta(seconds=5.05), value=value3) - sample4 = Sample(timestamp - timedelta(seconds=0.99), value=value4) - - resampler.add_sample(time_series_id=time_series_id_1, sample=sample1) - resampler.add_sample(time_series_id=time_series_id_1, sample=sample2) - resampler.add_sample(time_series_id=time_series_id_2, sample=sample3) - resampler.add_sample(time_series_id=time_series_id_2, sample=sample4) - - resampled_samples = dict(resampler.resample()) - - assert resampled_samples[time_series_id_1].timestamp >= timestamp - assert resampled_samples[time_series_id_1].value == sum([value1, value2]) - - assert resampled_samples[time_series_id_2].timestamp >= timestamp - assert resampled_samples[time_series_id_2].value == value4 diff --git a/tests/data_ingestion/test_timeseries_resampler.py b/tests/data_ingestion/test_timeseries_resampler.py deleted file mode 100644 index b5694775c..000000000 --- a/tests/data_ingestion/test_timeseries_resampler.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Tests for the `TimeSeriesResampler` - -Copyright -Copyright © 2021 Frequenz Energy-as-a-Service GmbH - -License -MIT -""" -from datetime import datetime, timedelta -from typing import Sequence - -import pytz -import time_machine - -from frequenz.sdk.data_ingestion.resampling.component_metric_resampler import ( - ComponentMetricResampler, -) -from frequenz.sdk.data_pipeline import Sample - - -# pylint: disable=unused-argument -def resampling_function_sum( - samples: Sequence[Sample], resampling_period_s: float -) -> float: - """Calculate sum of the provided values. - - Args: - samples: sequences of samples to apply the average to - resampling_period_s: value describing how often resampling should be performed, - in seconds - - Returns: - sum of all the sample values - - Raises: - AssertionError if there are no provided samples - """ - assert len(samples) > 0, "Avg function cannot be given an empty list of samples" - return sum(sample.value for sample in samples if sample.value is not None) - - -@time_machine.travel(datetime.now()) -def test_component_metric_resampler_remove_outdated_samples() -> None: - """Test if outdated samples are being properly removed.""" - resampler = ComponentMetricResampler( - resampling_period_s=0.2, - max_data_age_in_periods=1.0, - resampling_function=resampling_function_sum, - ) - - timestamp = datetime.now(tz=pytz.UTC) - sample1 = Sample(timestamp, value=5.0) - sample2 = Sample(timestamp + timedelta(seconds=1), value=12.0) - resampler.add_sample(sample1) - resampler.add_sample(sample2) - - resampler.remove_outdated_samples(threshold=timestamp + timedelta(seconds=0.5)) - assert list(resampler._buffer) == [sample2] # pylint: disable=protected-access - - resampler.remove_outdated_samples(threshold=timestamp + timedelta(seconds=1.01)) - assert len(resampler._buffer) == 0 # pylint: disable=protected-access - - -@time_machine.travel(datetime.now()) -def test_component_metric_resampler_resample() -> None: - """Test if resampling function works as expected.""" - resampler = ComponentMetricResampler( - resampling_period_s=0.2, - max_data_age_in_periods=5.0, - resampling_function=resampling_function_sum, - ) - - timestamp = datetime.now(tz=pytz.UTC) - timedelta(seconds=0.5) - - value1 = 5.0 - value2 = 15.0 - - sample1 = Sample(timestamp, value=value1) - sample2 = Sample(timestamp, value=value2) - - resampler.add_sample(sample1) - resampler.add_sample(sample2) - - value = resampler.resample() - assert value is not None - assert value == sum([value1, value2]) - - -@time_machine.travel(datetime.now()) -def test_component_metric_resampler_resample_with_outdated_samples() -> None: - """Test that resampling function doesn't take outdated samples into account.""" - resampler = ComponentMetricResampler( - resampling_period_s=0.2, - max_data_age_in_periods=5.0, - resampling_function=resampling_function_sum, - ) - - timestamp = datetime.now(tz=pytz.UTC) - - value3 = 100.0 - value1 = 5.0 - value2 = 15.0 - - sample3 = Sample(timestamp - timedelta(seconds=1.01), value=value3) - sample1 = Sample(timestamp - timedelta(seconds=0.5), value=value1) - sample2 = Sample(timestamp - timedelta(seconds=0.7), value=value2) - - resampler.add_sample(sample3) - resampler.add_sample(sample1) - resampler.add_sample(sample2) - - value = resampler.resample() - assert value is not None - assert value == sum([value1, value2]) diff --git a/tests/power_distribution/test_distribution_algorithm.py b/tests/power_distribution/test_distribution_algorithm.py index ad2152ae6..d90829310 100644 --- a/tests/power_distribution/test_distribution_algorithm.py +++ b/tests/power_distribution/test_distribution_algorithm.py @@ -8,7 +8,7 @@ MIT """ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from typing import Dict, List, Optional import frequenz.api.microgrid.microgrid_pb2 as microgrid_pb @@ -72,7 +72,7 @@ def create_battery_msg( # pylint: disable=too-many-arguments capacity: Metric, soc: Metric, power: Bound, - timestamp: datetime = datetime.utcnow(), + timestamp: datetime = datetime.now(timezone.utc), ) -> microgrid_pb.ComponentData: """Create protobuf battery components with given arguments. @@ -111,7 +111,7 @@ def create_battery_msg( # pylint: disable=too-many-arguments def create_inverter_msg( component_id: int, power: Bound, - timestamp: datetime = datetime.utcnow(), + timestamp: datetime = datetime.now(timezone.utc), ) -> microgrid_pb.ComponentData: """Create protobuf inverter components with given arguments. diff --git a/tests/power_distribution/test_power_distributor.py b/tests/power_distribution/test_power_distributor.py index 7a7d0251c..5e29e2bd3 100644 --- a/tests/power_distribution/test_power_distributor.py +++ b/tests/power_distribution/test_power_distributor.py @@ -9,7 +9,7 @@ import asyncio import re from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from functools import partial from typing import Dict, Set, Tuple, TypeVar, Union from unittest import IsolatedAsyncioTestCase, mock @@ -605,7 +605,7 @@ async def test_power_distributor_stale_battery_message( capacity=Metric(98000), soc=Metric(40, Bound(20, 80)), power=Bound(-1000, 1000), - timestamp=datetime.utcnow() - timedelta(seconds=62), + timestamp=datetime.now(timezone.utc) - timedelta(seconds=62), ) else: bat = create_battery_msg( @@ -672,7 +672,7 @@ async def test_power_distributor_stale_all_components_message( capacity=Metric(98000), soc=Metric(40, Bound(20, 80)), power=Bound(-1000, 1000), - timestamp=datetime.utcnow() - timedelta(seconds=62), + timestamp=datetime.now(timezone.utc) - timedelta(seconds=62), ) else: bat = create_battery_msg( @@ -689,7 +689,7 @@ async def test_power_distributor_stale_all_components_message( inv = create_inverter_msg( key_id, power=Bound(-500, 500), - timestamp=datetime.utcnow() - timedelta(seconds=62), + timestamp=datetime.now(timezone.utc) - timedelta(seconds=62), ) else: inv = create_inverter_msg( diff --git a/tests/test_data_handling/test_formula.py b/tests/test_data_handling/test_formula.py index a51cd077d..49caaee78 100644 --- a/tests/test_data_handling/test_formula.py +++ b/tests/test_data_handling/test_formula.py @@ -6,12 +6,11 @@ License MIT """ -import datetime as dt +from datetime import datetime, timedelta, timezone from typing import Dict import pandas as pd import pytest -import pytz import sympy from frequenz.sdk.data_handling import TimeSeriesEntry, formula @@ -101,9 +100,9 @@ def test_formula_with_broken_meter() -> None: """Test a microgrid formula with a broken meter.""" cache = ts.LatestEntryCache[str, float]() - ts1 = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") - outdated_ts = ts1 - dt.timedelta(minutes=5) - timedelta_tolerance = dt.timedelta(minutes=3) + ts1 = datetime.fromisoformat("2021-03-11T10:49:00+00:00") + outdated_ts = ts1 - timedelta(minutes=5) + timedelta_tolerance = timedelta(minutes=3) symbols = [ "meter_1_active_power", @@ -166,9 +165,9 @@ def test_formula_with_broken_battery() -> None: """Test a microgrid formula with a broken battery.""" cache = ts.LatestEntryCache[str, float]() - ts1 = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") - outdated_ts = ts1 - dt.timedelta(minutes=5) - timedelta_tolerance = dt.timedelta(minutes=3) + ts1 = datetime.fromisoformat("2021-03-11T10:49:00+00:00") + outdated_ts = ts1 - timedelta(minutes=5) + timedelta_tolerance = timedelta(minutes=3) symbols = [ "battery_1_soc", @@ -227,7 +226,7 @@ def test_formula_with_broken_battery() -> None: symbol_to_symbol_mapping=symbol_to_symbol_mapping, timedelta_tolerance=timedelta_tolerance, default_entry=TimeSeriesEntry[float]( - timestamp=pytz.utc.localize(dt.datetime.now()), value=0.0 + timestamp=datetime.now(timezone.utc), value=0.0 ), ) diff --git a/tests/test_data_handling/test_handle_historic_data.py b/tests/test_data_handling/test_handle_historic_data.py index c6a1ab8d9..93ff07b4c 100644 --- a/tests/test_data_handling/test_handle_historic_data.py +++ b/tests/test_data_handling/test_handle_historic_data.py @@ -7,7 +7,7 @@ MIT """ -import datetime as dt +from datetime import datetime, timedelta, timezone import numpy as np import pandas as pd @@ -36,8 +36,8 @@ def test_handle_historic_data_import() -> None: def mock_load_hd_read( self: LoadHistoricData, # pylint: disable=unused-argument load_hd_settings: LoadHistoricDataSettings, - start_time: dt.datetime, - end_time: dt.datetime, + start_time: datetime, + end_time: datetime, ) -> pd.DataFrame: """Mock historic data loading function.""" timestamps = pd.date_range( @@ -95,9 +95,9 @@ def test_load_compute_formula(mocker: MockerFixture) -> None: hd_handler_settings = HandleHistDataSettings( hd_loaders, hd_formulas, symbol_mappings ) - end_time = dt.datetime.now() + end_time = datetime.now(timezone.utc) mocker.patch.object(LoadHistoricData, "read", mock_load_hd_read) hd_handler = HandleHistData(messstellen_id, hd_handler_settings) - df_hdh = hd_handler.compute(end_time - dt.timedelta(days=2), end_time) + df_hdh = hd_handler.compute(end_time - timedelta(days=2), end_time) assert "client_load" in df_hdh.columns assert (df_hdh["client_load"] == 5).all() diff --git a/tests/test_data_handling/test_time_series.py b/tests/test_data_handling/test_time_series.py index 57d279755..1a658a525 100644 --- a/tests/test_data_handling/test_time_series.py +++ b/tests/test_data_handling/test_time_series.py @@ -7,9 +7,7 @@ MIT """ -import datetime as dt - -import pytz +from datetime import datetime, timedelta, timezone import frequenz.sdk.data_handling.time_series as ts @@ -20,7 +18,7 @@ def test_LatestEntryCache() -> None: cache1 = ts.LatestEntryCache[int, float]() - assert cache1.latest_timestamp == pytz.utc.localize(dt.datetime.min) + assert cache1.latest_timestamp == datetime.min.replace(tzinfo=timezone.utc) assert 1 not in cache1 assert 2 not in cache1 assert 3 not in cache1 @@ -36,7 +34,7 @@ def test_LatestEntryCache() -> None: # popping entries that do not exist returns `None`, or whatever # alternative default we specify custom_default = ts.TimeSeriesEntry( - timestamp=dt.datetime.fromisoformat("2019-01-01T00:00:00+01:00"), + timestamp=datetime.fromisoformat("2019-01-01T00:00:00+01:00"), value=123.45, ) assert cache1.pop(1).entry is None @@ -51,7 +49,7 @@ def test_LatestEntryCache() -> None: # if we add a time series entry for a new key, then it does not # matter what its timestamp is, the entry will always be added # to the cache (`update` returns `True`) - ts1a = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") + ts1a = datetime.fromisoformat("2021-03-11T10:49:00+00:00") e1a = ts.TimeSeriesEntry(timestamp=ts1a, value=1.0) assert cache1.update(1, e1a) is True @@ -67,7 +65,7 @@ def test_LatestEntryCache() -> None: # the same as that entry's timestamp, and so even if we set # zero timedelta tolerance, we still get it back assert cache1.get(1).entry == e1a - assert cache1.get(1, dt.timedelta(0)).entry == e1a + assert cache1.get(1, timedelta(0)).entry == e1a assert cache1.get(2).entry is None assert cache1.get(2).entry is None @@ -79,7 +77,7 @@ def test_LatestEntryCache() -> None: # timedeltas are still measured relative to the most recent # of all entry timestamps, but the entry will still be added # to the cache - ts2a = dt.datetime.fromisoformat("2021-03-11T10:48:59+00:00") + ts2a = datetime.fromisoformat("2021-03-11T10:48:59+00:00") e2a = ts.TimeSeriesEntry(timestamp=ts2a, value=2.0) assert cache1.update(2, e2a) is True @@ -92,12 +90,12 @@ def test_LatestEntryCache() -> None: assert list(cache1.keys()) == [1, 2] assert cache1.get(1).entry == e1a - assert cache1.get(1, dt.timedelta(0)).entry == e1a # still most recent timestamp + assert cache1.get(1, timedelta(0)).entry == e1a # still most recent timestamp assert cache1.get(2).entry == e2a - assert cache1.get(2, dt.timedelta(seconds=1)).entry == e2a # within tolerance - assert cache1.get(2, dt.timedelta(seconds=0.999)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=1)).entry == e2a # within tolerance + assert cache1.get(2, timedelta(seconds=0.999)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry is None assert cache1.get(1001).entry is None @@ -118,24 +116,24 @@ def test_LatestEntryCache() -> None: assert list(cache1.keys()) == [1, 2, 3] assert cache1.get(1).entry == e1a - assert cache1.get(1, dt.timedelta(0)).entry == e1a # still most recent timestamp + assert cache1.get(1, timedelta(0)).entry == e1a # still most recent timestamp assert cache1.get(2).entry == e2a - assert cache1.get(2, dt.timedelta(seconds=1)).entry == e2a # within tolerance - assert cache1.get(2, dt.timedelta(seconds=0.999)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=1)).entry == e2a # within tolerance + assert cache1.get(2, timedelta(seconds=0.999)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry == e3a - assert cache1.get(3, dt.timedelta(0)).entry == e3a # same as most recent timestamp + assert cache1.get(3, timedelta(0)).entry == e3a # same as most recent timestamp assert cache1.get(1001).entry is None # if we add an entry for an existing key, but whose timestamp is # less than that of the cached entry, the existing cache entry is # kept and the outdated one discarded (`update` returns `False`) - ts1old = e1a.timestamp - dt.timedelta(seconds=0.01) - ts2old = e2a.timestamp - dt.timedelta(seconds=0.01) - ts3old = e3a.timestamp - dt.timedelta(seconds=0.001) + ts1old = e1a.timestamp - timedelta(seconds=0.01) + ts2old = e2a.timestamp - timedelta(seconds=0.01) + ts3old = e3a.timestamp - timedelta(seconds=0.001) assert cache1.update(1, ts.TimeSeriesEntry(ts1old, 0.1)) is False assert cache1.update(2, ts.TimeSeriesEntry(ts2old, 0.2)) is False @@ -144,15 +142,15 @@ def test_LatestEntryCache() -> None: assert cache1.latest_timestamp == ts1a assert cache1.get(1).entry == e1a - assert cache1.get(1, dt.timedelta(0)).entry == e1a # still most recent timestamp + assert cache1.get(1, timedelta(0)).entry == e1a # still most recent timestamp assert cache1.get(2).entry == e2a - assert cache1.get(2, dt.timedelta(seconds=1)).entry == e2a # within tolerance - assert cache1.get(2, dt.timedelta(seconds=0.999)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=1)).entry == e2a # within tolerance + assert cache1.get(2, timedelta(seconds=0.999)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry == e3a - assert cache1.get(3, dt.timedelta(0)).entry == e3a # same as most recent timestamp + assert cache1.get(3, timedelta(0)).entry == e3a # same as most recent timestamp assert cache1.get(1001).entry is None @@ -166,80 +164,80 @@ def test_LatestEntryCache() -> None: assert cache1.latest_timestamp == ts1a assert cache1.get(1).entry == e1a - assert cache1.get(1, dt.timedelta(0)).entry == e1a # still most recent timestamp + assert cache1.get(1, timedelta(0)).entry == e1a # still most recent timestamp assert cache1.get(2).entry == e2a - assert cache1.get(2, dt.timedelta(seconds=1)).entry == e2a # within tolerance - assert cache1.get(2, dt.timedelta(seconds=0.999)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=1)).entry == e2a # within tolerance + assert cache1.get(2, timedelta(seconds=0.999)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry == e3a - assert cache1.get(3, dt.timedelta(0)).entry == e3a # same as most recent timestamp + assert cache1.get(3, timedelta(0)).entry == e3a # same as most recent timestamp assert cache1.get(1001).entry is None # if entries with newer timestamps are reported, then the update # will take place - ts1b = dt.datetime.fromisoformat("2021-03-11T10:49:00.250+00:00") + ts1b = datetime.fromisoformat("2021-03-11T10:49:00.250+00:00") e1b = ts.TimeSeriesEntry(ts1b, 101.0) assert cache1.update(1, e1b) is True assert cache1.latest_timestamp == ts1b assert cache1.get(1).entry == e1b - assert cache1.get(1, dt.timedelta(0)).entry == e1b # new most recent timestamp + assert cache1.get(1, timedelta(0)).entry == e1b # new most recent timestamp assert cache1.get(2).entry == e2a - assert cache1.get(2, dt.timedelta(seconds=1.25)).entry == e2a # within tolerance - assert cache1.get(2, dt.timedelta(seconds=1.249)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=1.25)).entry == e2a # within tolerance + assert cache1.get(2, timedelta(seconds=1.249)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry == e3a - assert cache1.get(3, dt.timedelta(seconds=0.25)).entry == e3a # within tolerance - assert cache1.get(3, dt.timedelta(seconds=0.249)).entry is None # outside tolerance - assert cache1.get(3, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(3, timedelta(seconds=0.25)).entry == e3a # within tolerance + assert cache1.get(3, timedelta(seconds=0.249)).entry is None # outside tolerance + assert cache1.get(3, timedelta(0)).entry is None # outside tolerance assert cache1.get(1001).entry is None - ts2b = dt.datetime.fromisoformat("2021-03-11T10:49:00.500+00:00") + ts2b = datetime.fromisoformat("2021-03-11T10:49:00.500+00:00") e2b = ts.TimeSeriesEntry(ts2b, 102.0) assert cache1.update(2, e2b) is True assert cache1.latest_timestamp == ts2b assert cache1.get(1).entry == e1b - assert cache1.get(1, dt.timedelta(seconds=0.25)).entry == e1b # within tolerance - assert cache1.get(1, dt.timedelta(seconds=0.249)).entry is None # outside tolerance - assert cache1.get(1, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(1, timedelta(seconds=0.25)).entry == e1b # within tolerance + assert cache1.get(1, timedelta(seconds=0.249)).entry is None # outside tolerance + assert cache1.get(1, timedelta(0)).entry is None # outside tolerance assert cache1.get(2).entry == e2b - assert cache1.get(2, dt.timedelta(0)).entry == e2b # new most recent timestamp + assert cache1.get(2, timedelta(0)).entry == e2b # new most recent timestamp assert cache1.get(3).entry == e3a - assert cache1.get(3, dt.timedelta(seconds=0.5)).entry == e3a # within tolerance - assert cache1.get(3, dt.timedelta(seconds=0.499)).entry is None # outside tolerance - assert cache1.get(3, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(3, timedelta(seconds=0.5)).entry == e3a # within tolerance + assert cache1.get(3, timedelta(seconds=0.499)).entry is None # outside tolerance + assert cache1.get(3, timedelta(0)).entry is None # outside tolerance assert cache1.get(1001).entry is None - ts3b = dt.datetime.fromisoformat("2021-03-11T10:49:00.502+00:00") + ts3b = datetime.fromisoformat("2021-03-11T10:49:00.502+00:00") e3b = ts.TimeSeriesEntry(ts3b, 103.0) assert cache1.update(3, e3b) is True assert cache1.latest_timestamp == ts3b assert cache1.get(1).entry == e1b - assert cache1.get(1, dt.timedelta(seconds=0.252)).entry == e1b # within tolerance - assert cache1.get(1, dt.timedelta(seconds=0.251)).entry is None # outside tolerance - assert cache1.get(1, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(1, timedelta(seconds=0.252)).entry == e1b # within tolerance + assert cache1.get(1, timedelta(seconds=0.251)).entry is None # outside tolerance + assert cache1.get(1, timedelta(0)).entry is None # outside tolerance assert cache1.get(2).entry == e2b - assert cache1.get(2, dt.timedelta(seconds=0.002)).entry == e2b # within tolerance - assert cache1.get(2, dt.timedelta(seconds=0.001)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=0.002)).entry == e2b # within tolerance + assert cache1.get(2, timedelta(seconds=0.001)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry == e3b - assert cache1.get(3, dt.timedelta(0)).entry == e3b # new most recent timestamp + assert cache1.get(3, timedelta(0)).entry == e3b # new most recent timestamp assert cache1.get(1001).entry is None @@ -247,23 +245,19 @@ def test_LatestEntryCache() -> None: # both when the key does not exist and when the timestamp is outside the # timedelta tolerance assert cache1.get(1).entry == e1b - assert cache1.get(1, dt.timedelta(seconds=0.252)).entry == e1b # within tolerance - assert ( - cache1.get(1, dt.timedelta(seconds=0.251), default=e1a).entry == e1a - ) # outside - assert cache1.get(1, dt.timedelta(0), default=e2a).entry == e2a # outside + assert cache1.get(1, timedelta(seconds=0.252)).entry == e1b # within tolerance + assert cache1.get(1, timedelta(seconds=0.251), default=e1a).entry == e1a # outside + assert cache1.get(1, timedelta(0), default=e2a).entry == e2a # outside assert cache1.get(2).entry == e2b - assert cache1.get(2, dt.timedelta(seconds=0.002)).entry == e2b # within tolerance - assert ( - cache1.get(2, dt.timedelta(seconds=0.001), default=e1b).entry == e1b - ) # outside - assert cache1.get(2, dt.timedelta(0), default=e2a).entry == e2a # outside + assert cache1.get(2, timedelta(seconds=0.002)).entry == e2b # within tolerance + assert cache1.get(2, timedelta(seconds=0.001), default=e1b).entry == e1b # outside + assert cache1.get(2, timedelta(0), default=e2a).entry == e2a # outside assert cache1.get(3).entry == e3b - assert cache1.get(3, dt.timedelta(0)).entry == e3b # new most recent timestamp + assert cache1.get(3, timedelta(0)).entry == e3b # new most recent timestamp assert ( - cache1.get(3, dt.timedelta(0), default=e2b).entry == e3b + cache1.get(3, timedelta(0), default=e2b).entry == e3b ) # never outside tolerance assert cache1.get(1001).entry is None @@ -282,14 +276,14 @@ def test_LatestEntryCache() -> None: assert list(cache1.keys()) == [1, 2] assert cache1.get(1).entry == e1b - assert cache1.get(1, dt.timedelta(seconds=0.252)).entry == e1b # within tolerance - assert cache1.get(1, dt.timedelta(seconds=0.251)).entry is None # outside tolerance - assert cache1.get(1, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(1, timedelta(seconds=0.252)).entry == e1b # within tolerance + assert cache1.get(1, timedelta(seconds=0.251)).entry is None # outside tolerance + assert cache1.get(1, timedelta(0)).entry is None # outside tolerance assert cache1.get(2).entry == e2b - assert cache1.get(2, dt.timedelta(seconds=0.002)).entry == e2b # within tolerance - assert cache1.get(2, dt.timedelta(seconds=0.001)).entry is None # outside tolerance - assert cache1.get(2, dt.timedelta(0)).entry is None # outside tolerance + assert cache1.get(2, timedelta(seconds=0.002)).entry == e2b # within tolerance + assert cache1.get(2, timedelta(seconds=0.001)).entry is None # outside tolerance + assert cache1.get(2, timedelta(0)).entry is None # outside tolerance assert cache1.get(3).entry is None assert cache1.get(1001).entry is None @@ -327,7 +321,7 @@ def test_LatestEntryCache() -> None: # ... but if we call the `reset_latest_timestamp` method then with no # cached entries it will be reset to the starting value `datetime.min` assert cache1.reset_latest_timestamp() is True - assert cache1.latest_timestamp == pytz.utc.localize(dt.datetime.min) + assert cache1.latest_timestamp == datetime.min.replace(tzinfo=timezone.utc) # if we `remove` the last entry in the cache it similarly does not # reset the `latest_timestamp` value @@ -350,7 +344,7 @@ def test_LatestEntryCache() -> None: # ... but if we manually request a reset, then again, with no entries in # the cache, it will be reset to its default start value `datetime.min` assert cache1.reset_latest_timestamp() is True - assert cache1.latest_timestamp == pytz.utc.localize(dt.datetime.min) + assert cache1.latest_timestamp == datetime.min.replace(tzinfo=timezone.utc) # if we want to both clear out all entries and reset `latest_timestamp` all # at the same time then we can do this with the `reset` method @@ -365,7 +359,7 @@ def test_LatestEntryCache() -> None: cache1.reset() assert len(cache1) == 0 assert len(cache1.keys()) == 0 - assert cache1.latest_timestamp == pytz.utc.localize(dt.datetime.min) + assert cache1.latest_timestamp == datetime.min.replace(tzinfo=timezone.utc) def test_TimeSeriesFormula() -> None: @@ -378,13 +372,13 @@ def test_TimeSeriesFormula() -> None: # if we have an entry only for only one of the required symbols, # then we still cannot evaluate the formula - ts1a = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") + ts1a = datetime.fromisoformat("2021-03-11T10:49:00+00:00") xa = ts.TimeSeriesEntry(ts1a, 9) assert cache1.update("x", xa) is True assert f1.evaluate(cache1) is None # missing data # a second entry for the same symbol still results in None - ts1b = dt.datetime.fromisoformat("2021-03-11T10:49:00.200+00:00") + ts1b = datetime.fromisoformat("2021-03-11T10:49:00.200+00:00") xb = ts.TimeSeriesEntry(ts1b, 7) assert cache1.update("x", xb) is True assert f1.evaluate(cache1) is None # missing data @@ -395,47 +389,44 @@ def test_TimeSeriesFormula() -> None: # timestamp among all cache entries): if a result is generated, # its timestamp will be equal to the most recent among all the # symbols used in the formula - ts2a = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") + ts2a = datetime.fromisoformat("2021-03-11T10:49:00+00:00") ya = ts.TimeSeriesEntry(ts2a, 5) assert cache1.update("y", ya) is True expected_ya = ts.TimeSeriesEntry(xb.timestamp, 12) assert f1.evaluate(cache1) == expected_ya assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.200)) - == expected_ya + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.200)) == expected_ya ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None # a more recent entry will result in a change to both timestamp and # and value of the formula result, and changes the tolerance limits - ts2b = dt.datetime.fromisoformat("2021-03-11T10:49:00.500+00:00") + ts2b = datetime.fromisoformat("2021-03-11T10:49:00.500+00:00") yb = ts.TimeSeriesEntry(ts2b, -3) assert cache1.update("y", yb) is True expected_yb = ts.TimeSeriesEntry(yb.timestamp, 4) assert f1.evaluate(cache1) == expected_yb assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.300)) - == expected_yb + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.300)) == expected_yb ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.299)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.299)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None # a still more recent observation for a symbol name not in the formula # will not change the result of the formula itself but affects tolerance - ts3a = dt.datetime.fromisoformat("2021-03-11T10:49:00.750+00:00") + ts3a = datetime.fromisoformat("2021-03-11T10:49:00.750+00:00") za = ts.TimeSeriesEntry(ts3a, 999) assert cache1.update("z", za) is True assert f1.evaluate(cache1) == expected_yb assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.550)) - == expected_yb + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.550)) == expected_yb ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.549)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.549)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None # if the timestamps of all observations are exactly equal then we will # get the same result all the way down to timedelta_tolerance == 0 @@ -445,18 +436,17 @@ def test_TimeSeriesFormula() -> None: expected_xc = ts.TimeSeriesEntry(xc.timestamp, 5) assert f1.evaluate(cache1) == expected_xc assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.250)) - == expected_xc + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.250)) == expected_xc ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.249)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.249)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None yc = ts.TimeSeriesEntry(ts3a, 6) assert cache1.update("y", yc) is True expected_yc = ts.TimeSeriesEntry(yc.timestamp, 14) assert f1.evaluate(cache1) == expected_yc - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) == expected_yc + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) == expected_yc # some alternative formulas involving more variables and operations f2 = ts.TimeSeriesFormula[float]("x4 - (x5 + x6)") @@ -467,7 +457,7 @@ def test_TimeSeriesFormula() -> None: cache2 = ts.LatestEntryCache[str, float]() - ts4a = dt.datetime.fromisoformat("2021-03-11T12:42:00+00:00") + ts4a = datetime.fromisoformat("2021-03-11T12:42:00+00:00") x4a = ts.TimeSeriesEntry(ts4a, 7.5) assert cache2.update("x4", x4a) is True @@ -475,7 +465,7 @@ def test_TimeSeriesFormula() -> None: assert f3.evaluate(cache2) is None assert f4.evaluate(cache2) is None - ts5a = dt.datetime.fromisoformat("2021-03-11T12:42:00.100+00:00") + ts5a = datetime.fromisoformat("2021-03-11T12:42:00.100+00:00") x5a = ts.TimeSeriesEntry(ts5a, 4.5) assert cache2.update("x5", x5a) is True @@ -483,95 +473,94 @@ def test_TimeSeriesFormula() -> None: assert f3.evaluate(cache2) is None assert f4.evaluate(cache2) is None - ts6a = dt.datetime.fromisoformat("2021-03-11T12:42:00.200+00:00") + ts6a = datetime.fromisoformat("2021-03-11T12:42:00.200+00:00") x6a = ts.TimeSeriesEntry(ts6a, 2.25) assert cache2.update("x6", x6a) is True expected_456 = ts.TimeSeriesEntry(x6a.timestamp, 0.75) assert f2.evaluate(cache2) == expected_456 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.200)) + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.200)) == expected_456 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f3.evaluate(cache2) is None assert f4.evaluate(cache2) is None - ts7a = dt.datetime.fromisoformat("2021-03-11T12:42:00.300+00:00") + ts7a = datetime.fromisoformat("2021-03-11T12:42:00.300+00:00") x7a = ts.TimeSeriesEntry(timestamp=ts7a, value=3.0) assert cache2.update("x7", x7a) is True assert f2.evaluate(cache2) == expected_456 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.300)) + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.300)) == expected_456 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.299)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.299)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None expected_567 = ts.TimeSeriesEntry(x7a.timestamp, 20.25) assert f3.evaluate(cache2) == expected_567 assert ( - f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.200)) + f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.200)) == expected_567 ) - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f4.evaluate(cache2) is None - ts8a = dt.datetime.fromisoformat("2021-03-11T12:42:00.400+00:00") + ts8a = datetime.fromisoformat("2021-03-11T12:42:00.400+00:00") x8a = ts.TimeSeriesEntry(timestamp=ts8a, value=12.0) assert cache2.update("x8", x8a) is True assert f2.evaluate(cache2) == expected_456 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.400)) + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.400)) == expected_456 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.399)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.399)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f3.evaluate(cache2) == expected_567 assert ( - f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.300)) + f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.300)) == expected_567 ) - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.299)) is None - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.299)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None expected_78 = ts.TimeSeriesEntry(x8a.timestamp, 0.25) assert f4.evaluate(cache2) == expected_78 assert ( - f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.100)) - == expected_78 + f4.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.100)) == expected_78 ) - assert f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.099)) is None - assert f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f4.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.099)) is None + assert f4.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None # since f4 involves division, let's test a case where the denominator # is zero ... - ts8z = dt.datetime.fromisoformat("2021-03-11T12:42:00.500+00:00") + ts8z = datetime.fromisoformat("2021-03-11T12:42:00.500+00:00") x8z = ts.TimeSeriesEntry(timestamp=ts8z, value=0.0) assert cache2.update("x8", x8z) is True assert f2.evaluate(cache2) == expected_456 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.500)) + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.500)) == expected_456 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.499)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.499)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f3.evaluate(cache2) == expected_567 assert ( - f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.400)) + f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.400)) == expected_567 ) - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.399)) is None - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.399)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None # cases where the formula would be calculated result in zero-division # errors being raised, otherwise we get `None` as is normal when at @@ -581,12 +570,12 @@ def test_TimeSeriesFormula() -> None: assert f4_result.status == ts.TimeSeriesEntry.Status.ERROR f4_result_outdated = f4.evaluate( - cache2, timedelta_tolerance=dt.timedelta(seconds=0.200) + cache2, timedelta_tolerance=timedelta(seconds=0.200) ) assert f4_result_outdated is not None assert f4_result_outdated.status == ts.TimeSeriesEntry.Status.ERROR - assert f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f4.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f4.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None def test_TimeSeriesFormula_ComplexPower() -> None: @@ -604,13 +593,13 @@ def test_TimeSeriesFormula_ComplexPower() -> None: # the formula needs observations for 2 different time series: # with data only for one, a `None` result will be generated - ts1a = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") + ts1a = datetime.fromisoformat("2021-03-11T10:49:00+00:00") xa = ts.TimeSeriesEntry(ts1a, S(-5.0)) assert cache1.update("x", xa) is True assert f1.evaluate(cache1) is None # missing data # a second observation for the same time series still results in None - ts1b = dt.datetime.fromisoformat("2021-03-11T10:49:00.200+00:00") + ts1b = datetime.fromisoformat("2021-03-11T10:49:00.200+00:00") xb = ts.TimeSeriesEntry(ts1b, S(-7.0)) assert cache1.update("x", xb) is True assert f1.evaluate(cache1) is None # missing data @@ -620,47 +609,44 @@ def test_TimeSeriesFormula_ComplexPower() -> None: # timedelta tolerance (relative to the most recent observation across # all component_id keys); if a result is generated, its timestamp will # be equal to the most recent of the two required observations - ts2a = dt.datetime.fromisoformat("2021-03-11T10:49:00+00:00") + ts2a = datetime.fromisoformat("2021-03-11T10:49:00+00:00") ya = ts.TimeSeriesEntry(ts2a, S(5.0)) assert cache1.update("y", ya) is True expected_ya = ts.TimeSeriesEntry(ts1b, S(-2.0)) assert f1.evaluate(cache1) == expected_ya assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.200)) - == expected_ya + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.200)) == expected_ya ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None # a more recent observation will result in a change to both timestamp # and value of the formula result, and changes the tolerance limits - ts2b = dt.datetime.fromisoformat("2021-03-11T10:49:00.500+00:00") + ts2b = datetime.fromisoformat("2021-03-11T10:49:00.500+00:00") yb = ts.TimeSeriesEntry(ts2b, S(4.5 + 1.0j)) assert cache1.update("y", yb) is True expected_yb = ts.TimeSeriesEntry(ts2b, S(-2.5 + 1.0j)) assert f1.evaluate(cache1) == expected_yb assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.300)) - == expected_yb + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.300)) == expected_yb ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.299)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.299)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None # a still more recent observation for a component_id not in the formula # will not change the result of the formula itself but affects tolerance - ts3a = dt.datetime.fromisoformat("2021-03-11T10:49:00.750+00:00") + ts3a = datetime.fromisoformat("2021-03-11T10:49:00.750+00:00") za = ts.TimeSeriesEntry(ts3a, S(-99.9)) assert cache1.update("z", za) is True assert f1.evaluate(cache1) == expected_yb assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.550)) - == expected_yb + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.550)) == expected_yb ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.549)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.549)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None # if the timestamps of all observations are exactly equal then we will # get the same result all the way down to timedelta_tolerance == 0 @@ -670,18 +656,17 @@ def test_TimeSeriesFormula_ComplexPower() -> None: expected_xc = ts.TimeSeriesEntry(ts3a, S(-3.75 + 0.5j)) assert f1.evaluate(cache1) == expected_xc assert ( - f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.250)) - == expected_xc + f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.250)) == expected_xc ) - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(seconds=0.249)) is None - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(seconds=0.249)) is None + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) is None yc = ts.TimeSeriesEntry(ts3a, S(8.5 + 0.25j)) assert cache1.update("y", yc) is True expected_yc = ts.TimeSeriesEntry(ts3a, S(0.25 - 0.25j)) assert f1.evaluate(cache1) == expected_yc - assert f1.evaluate(cache1, timedelta_tolerance=dt.timedelta(0)) == expected_yc + assert f1.evaluate(cache1, timedelta_tolerance=timedelta(0)) == expected_yc # some alternative formulas involving all the supported operations # (addition, subtraction, multiplication, division): for simplicity @@ -694,7 +679,7 @@ def test_TimeSeriesFormula_ComplexPower() -> None: cache2 = ts.LatestEntryCache[str, ComplexPower]() - ts4a = dt.datetime.fromisoformat("2021-03-11T12:42:00+00:00") + ts4a = datetime.fromisoformat("2021-03-11T12:42:00+00:00") x4a = ts.TimeSeriesEntry(ts4a, S(-7.5)) assert cache2.update("x4", x4a) is True @@ -702,7 +687,7 @@ def test_TimeSeriesFormula_ComplexPower() -> None: assert f3.evaluate(cache2) is None assert f4.evaluate(cache2) is None - ts5a = dt.datetime.fromisoformat("2021-03-11T12:42:00.100+00:00") + ts5a = datetime.fromisoformat("2021-03-11T12:42:00.100+00:00") x5a = ts.TimeSeriesEntry(ts5a, S(4.5)) assert cache2.update("x5", x5a) is True @@ -710,70 +695,64 @@ def test_TimeSeriesFormula_ComplexPower() -> None: assert f3.evaluate(cache2) is None assert f4.evaluate(cache2) is None - ts6a = dt.datetime.fromisoformat("2021-03-11T12:42:00.200+00:00") + ts6a = datetime.fromisoformat("2021-03-11T12:42:00.200+00:00") x6a = ts.TimeSeriesEntry(ts6a, S(-3.0)) assert cache2.update("x6", x6a) is True expected_f2 = ts.TimeSeriesEntry(ts6a, S(-9.0)) assert f2.evaluate(cache2) == expected_f2 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.200)) - == expected_f2 + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.200)) == expected_f2 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f3.evaluate(cache2) is None assert f4.evaluate(cache2) is None - ts7a = dt.datetime.fromisoformat("2021-03-11T12:42:00.300+00:00") + ts7a = datetime.fromisoformat("2021-03-11T12:42:00.300+00:00") x7a = ts.TimeSeriesEntry(timestamp=ts7a, value=S(-5.25)) assert cache2.update("x7", x7a) is True assert f2.evaluate(cache2) == expected_f2 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.300)) - == expected_f2 + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.300)) == expected_f2 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.299)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.299)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None expected_f3 = ts.TimeSeriesEntry(ts7a, S(17.25)) assert f3.evaluate(cache2) == expected_f3 assert ( - f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.200)) - == expected_f3 + f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.200)) == expected_f3 ) - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.199)) is None - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.199)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f4.evaluate(cache2) is None - ts8a = dt.datetime.fromisoformat("2021-03-11T12:42:00.400+00:00") + ts8a = datetime.fromisoformat("2021-03-11T12:42:00.400+00:00") x8a = ts.TimeSeriesEntry(timestamp=ts8a, value=S(11.0)) assert cache2.update("x8", x8a) is True assert f2.evaluate(cache2) == expected_f2 assert ( - f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.400)) - == expected_f2 + f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.400)) == expected_f2 ) - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.399)) is None - assert f2.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.399)) is None + assert f2.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None assert f3.evaluate(cache2) == expected_f3 assert ( - f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.300)) - == expected_f3 + f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.300)) == expected_f3 ) - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.299)) is None - assert f3.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.299)) is None + assert f3.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None expected_f4 = ts.TimeSeriesEntry(ts8a, S(2.5)) assert f4.evaluate(cache2) == expected_f4 assert ( - f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.100)) - == expected_f4 + f4.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.100)) == expected_f4 ) - assert f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(seconds=0.099)) is None - assert f4.evaluate(cache2, timedelta_tolerance=dt.timedelta(0)) is None + assert f4.evaluate(cache2, timedelta_tolerance=timedelta(seconds=0.099)) is None + assert f4.evaluate(cache2, timedelta_tolerance=timedelta(0)) is None diff --git a/tests/test_microgrid/test_client.py b/tests/test_microgrid/test_client.py index 2f6ac2c6c..0cf5e804b 100644 --- a/tests/test_microgrid/test_client.py +++ b/tests/test_microgrid/test_client.py @@ -47,235 +47,246 @@ async def test_components(self) -> None: server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) + try: + microgrid = self.create_client(57899) - assert set(await microgrid.components()) == set() + assert set(await microgrid.components()) == set() - servicer.add_component( - 0, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER - ) - assert set(await microgrid.components()) == { - Component(0, ComponentCategory.METER) - } + servicer.add_component( + 0, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER + ) + assert set(await microgrid.components()) == { + Component(0, ComponentCategory.METER) + } - servicer.add_component( - 0, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY - ) - assert set(await microgrid.components()) == { - Component(0, ComponentCategory.METER), - Component(0, ComponentCategory.BATTERY), - } + servicer.add_component( + 0, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + ) + assert set(await microgrid.components()) == { + Component(0, ComponentCategory.METER), + Component(0, ComponentCategory.BATTERY), + } - servicer.add_component( - 0, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER - ) - assert set(await microgrid.components()) == { - Component(0, ComponentCategory.METER), - Component(0, ComponentCategory.BATTERY), - Component(0, ComponentCategory.METER), - } - - # sensors/loads are not counted as components by the API client - servicer.add_component( - 1, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR - ) - servicer.add_component( - 2, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_LOAD - ) - assert set(await microgrid.components()) == { - Component(0, ComponentCategory.METER), - Component(0, ComponentCategory.BATTERY), - Component(0, ComponentCategory.METER), - } - - servicer.set_components( - [ - (9, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER), - (99, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER), - (66, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_LOAD), - (666, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR), - (999, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY), - ] - ) - assert set(await microgrid.components()) == { - Component(9, ComponentCategory.METER), - Component(99, ComponentCategory.INVERTER), - Component(999, ComponentCategory.BATTERY), - } - - servicer.set_components( - [ - (66, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_LOAD), - (99, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR), - (100, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_UNSPECIFIED), - (101, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_GRID), - (103, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_JUNCTION), - (104, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER), - (105, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER), - (106, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY), - (107, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_EV_CHARGER), - (999, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR), - ] - ) - assert set(await microgrid.components()) == { - Component(100, ComponentCategory.NONE), - Component(101, ComponentCategory.GRID), - Component(103, ComponentCategory.JUNCTION), - Component(104, ComponentCategory.METER), - Component(105, ComponentCategory.INVERTER), - Component(106, ComponentCategory.BATTERY), - Component(107, ComponentCategory.EV_CHARGER), - } - - await server.stop(grace=1.0) + servicer.add_component( + 0, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER + ) + assert set(await microgrid.components()) == { + Component(0, ComponentCategory.METER), + Component(0, ComponentCategory.BATTERY), + Component(0, ComponentCategory.METER), + } + + # sensors/loads are not counted as components by the API client + servicer.add_component( + 1, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR + ) + servicer.add_component( + 2, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_LOAD + ) + assert set(await microgrid.components()) == { + Component(0, ComponentCategory.METER), + Component(0, ComponentCategory.BATTERY), + Component(0, ComponentCategory.METER), + } + + servicer.set_components( + [ + (9, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER), + (99, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER), + (66, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_LOAD), + (666, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR), + (999, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY), + ] + ) + assert set(await microgrid.components()) == { + Component(9, ComponentCategory.METER), + Component(99, ComponentCategory.INVERTER), + Component(999, ComponentCategory.BATTERY), + } + + servicer.set_components( + [ + (66, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_LOAD), + (99, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR), + ( + 100, + microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_UNSPECIFIED, + ), + (101, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_GRID), + (103, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_JUNCTION), + (104, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER), + (105, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER), + (106, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY), + (107, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_EV_CHARGER), + (999, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_SENSOR), + ] + ) + assert set(await microgrid.components()) == { + Component(100, ComponentCategory.NONE), + Component(101, ComponentCategory.GRID), + Component(103, ComponentCategory.JUNCTION), + Component(104, ComponentCategory.METER), + Component(105, ComponentCategory.INVERTER), + Component(106, ComponentCategory.BATTERY), + Component(107, ComponentCategory.EV_CHARGER), + } + + finally: + await server.stop(grace=1.0) async def test_connections(self) -> None: servicer = mock_api.MockMicrogridServicer() server = mock_api.MockGrpcServer(servicer, port=57898) await server.start() - microgrid = self.create_client(57898) + try: + microgrid = self.create_client(57898) - assert set(await microgrid.connections()) == set() + assert set(await microgrid.connections()) == set() - servicer.add_connection(0, 0) - assert set(await microgrid.connections()) == {Connection(0, 0)} + servicer.add_connection(0, 0) + assert set(await microgrid.connections()) == {Connection(0, 0)} - servicer.add_connection(7, 9) - servicer.add_component( - 7, - component_category=microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY, - ) - servicer.add_component( - 9, - component_category=microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER, - ) - assert set(await microgrid.connections()) == { - Connection(0, 0), - Connection(7, 9), - } - - servicer.add_connection(0, 0) - assert set(await microgrid.connections()) == { - Connection(0, 0), - Connection(7, 9), - Connection(0, 0), - } - - servicer.set_connections([(999, 9), (99, 19), (909, 101), (99, 91)]) - for component_id in [999, 99, 19, 909, 101, 91]: + servicer.add_connection(7, 9) servicer.add_component( - component_id, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + 7, + component_category=microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY, ) - - assert set(await microgrid.connections()) == { - Connection(999, 9), - Connection(99, 19), - Connection(909, 101), - Connection(99, 91), - } - - for component_id in [1, 2, 3, 4, 5, 6, 7, 8]: servicer.add_component( - component_id, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + 9, + component_category=microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER, ) - - servicer.set_connections( - [ - (1, 2), - (2, 3), - (2, 4), - (2, 5), - (4, 3), - (4, 5), - (4, 6), - (5, 4), - (5, 7), - (5, 8), - ] - ) - assert set(await microgrid.connections()) == { - Connection(1, 2), - Connection(2, 3), - Connection(2, 4), - Connection(2, 5), - Connection(4, 3), - Connection(4, 5), - Connection(4, 6), - Connection(5, 4), - Connection(5, 7), - Connection(5, 8), - } - - # passing empty sets is the same as passing `None`, - # filter is ignored - assert set(await microgrid.connections(starts=set([]), ends=set([]))) == { - Connection(1, 2), - Connection(2, 3), - Connection(2, 4), - Connection(2, 5), - Connection(4, 3), - Connection(4, 5), - Connection(4, 6), - Connection(5, 4), - Connection(5, 7), - Connection(5, 8), - } - - # include filter for connection start - assert set(await microgrid.connections(starts={1})) == {Connection(1, 2)} - - assert set(await microgrid.connections(starts={2})) == { - Connection(2, 3), - Connection(2, 4), - Connection(2, 5), - } - assert set(await microgrid.connections(starts={3})) == set() - - assert set(await microgrid.connections(starts={4, 5})) == { - Connection(4, 3), - Connection(4, 5), - Connection(4, 6), - Connection(5, 4), - Connection(5, 7), - Connection(5, 8), - } - - # include filter for connection end - assert set(await microgrid.connections(ends={1})) == set() - - assert set(await microgrid.connections(ends={3})) == { - Connection(2, 3), - Connection(4, 3), - } - - assert set(await microgrid.connections(ends={2, 4, 5})) == { - Connection(1, 2), - Connection(2, 4), - Connection(2, 5), - Connection(4, 5), - Connection(5, 4), - } - - # different filters combine with AND logic - assert set(await microgrid.connections(starts={1, 2, 4}, ends={4, 5, 6})) == { - Connection(2, 4), - Connection(2, 5), - Connection(4, 5), - Connection(4, 6), - } - - assert set(await microgrid.connections(starts={3, 5}, ends={7, 8})) == { - Connection(5, 7), - Connection(5, 8), - } - - assert set(await microgrid.connections(starts={1, 5}, ends={2, 7})) == { - Connection(1, 2), - Connection(5, 7), - } - - await server.stop(grace=1.0) + assert set(await microgrid.connections()) == { + Connection(0, 0), + Connection(7, 9), + } + + servicer.add_connection(0, 0) + assert set(await microgrid.connections()) == { + Connection(0, 0), + Connection(7, 9), + Connection(0, 0), + } + + servicer.set_connections([(999, 9), (99, 19), (909, 101), (99, 91)]) + for component_id in [999, 99, 19, 909, 101, 91]: + servicer.add_component( + component_id, + microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY, + ) + + assert set(await microgrid.connections()) == { + Connection(999, 9), + Connection(99, 19), + Connection(909, 101), + Connection(99, 91), + } + + for component_id in [1, 2, 3, 4, 5, 6, 7, 8]: + servicer.add_component( + component_id, + microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY, + ) + + servicer.set_connections( + [ + (1, 2), + (2, 3), + (2, 4), + (2, 5), + (4, 3), + (4, 5), + (4, 6), + (5, 4), + (5, 7), + (5, 8), + ] + ) + assert set(await microgrid.connections()) == { + Connection(1, 2), + Connection(2, 3), + Connection(2, 4), + Connection(2, 5), + Connection(4, 3), + Connection(4, 5), + Connection(4, 6), + Connection(5, 4), + Connection(5, 7), + Connection(5, 8), + } + + # passing empty sets is the same as passing `None`, + # filter is ignored + assert set(await microgrid.connections(starts=set([]), ends=set([]))) == { + Connection(1, 2), + Connection(2, 3), + Connection(2, 4), + Connection(2, 5), + Connection(4, 3), + Connection(4, 5), + Connection(4, 6), + Connection(5, 4), + Connection(5, 7), + Connection(5, 8), + } + + # include filter for connection start + assert set(await microgrid.connections(starts={1})) == {Connection(1, 2)} + + assert set(await microgrid.connections(starts={2})) == { + Connection(2, 3), + Connection(2, 4), + Connection(2, 5), + } + assert set(await microgrid.connections(starts={3})) == set() + + assert set(await microgrid.connections(starts={4, 5})) == { + Connection(4, 3), + Connection(4, 5), + Connection(4, 6), + Connection(5, 4), + Connection(5, 7), + Connection(5, 8), + } + + # include filter for connection end + assert set(await microgrid.connections(ends={1})) == set() + + assert set(await microgrid.connections(ends={3})) == { + Connection(2, 3), + Connection(4, 3), + } + + assert set(await microgrid.connections(ends={2, 4, 5})) == { + Connection(1, 2), + Connection(2, 4), + Connection(2, 5), + Connection(4, 5), + Connection(5, 4), + } + + # different filters combine with AND logic + assert set( + await microgrid.connections(starts={1, 2, 4}, ends={4, 5, 6}) + ) == { + Connection(2, 4), + Connection(2, 5), + Connection(4, 5), + Connection(4, 6), + } + + assert set(await microgrid.connections(starts={3, 5}, ends={7, 8})) == { + Connection(5, 7), + Connection(5, 8), + } + + assert set(await microgrid.connections(starts={1, 5}, ends={2, 7})) == { + Connection(1, 2), + Connection(5, 7), + } + + finally: + await server.stop(grace=1.0) async def test_bad_connections(self) -> None: """Validate that the client does not apply connection filters itself.""" @@ -299,74 +310,82 @@ def ListAllComponents( server = mock_api.MockGrpcServer(servicer, port=57897) await server.start() - microgrid = self.create_client(57897) - - assert list(await microgrid.connections()) == [] - for component_id in [1, 2, 3, 4, 5, 6, 7, 8, 9]: - servicer.add_component( - component_id, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + try: + microgrid = self.create_client(57897) + + assert list(await microgrid.connections()) == [] + for component_id in [1, 2, 3, 4, 5, 6, 7, 8, 9]: + servicer.add_component( + component_id, + microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY, + ) + servicer.set_connections( + [ + (1, 2), + (1, 9), + (2, 3), + (3, 4), + (4, 5), + (5, 6), + (6, 7), + (7, 6), + (7, 9), + ] ) - servicer.set_connections( - [ - (1, 2), - (1, 9), - (2, 3), - (3, 4), - (4, 5), - (5, 6), - (6, 7), - (7, 6), - (7, 9), - ] - ) - unfiltered = { - Connection(1, 2), - Connection(1, 9), - Connection(2, 3), - Connection(3, 4), - Connection(4, 5), - Connection(5, 6), - Connection(6, 7), - Connection(7, 6), - Connection(7, 9), - } - - # because the application of filters is left to the server side, - # it doesn't matter what filters we set in the client if the - # server doesn't do its part - assert set(await microgrid.connections()) == unfiltered - assert set(await microgrid.connections(starts={1})) == unfiltered - assert set(await microgrid.connections(ends={9})) == unfiltered - assert ( - set(await microgrid.connections(starts={1, 7}, ends={3, 9})) == unfiltered - ) + unfiltered = { + Connection(1, 2), + Connection(1, 9), + Connection(2, 3), + Connection(3, 4), + Connection(4, 5), + Connection(5, 6), + Connection(6, 7), + Connection(7, 6), + Connection(7, 9), + } + + # because the application of filters is left to the server side, + # it doesn't matter what filters we set in the client if the + # server doesn't do its part + assert set(await microgrid.connections()) == unfiltered + assert set(await microgrid.connections(starts={1})) == unfiltered + assert set(await microgrid.connections(ends={9})) == unfiltered + assert ( + set(await microgrid.connections(starts={1, 7}, ends={3, 9})) + == unfiltered + ) - await server.stop(grace=1.0) + finally: + await server.stop(grace=1.0) async def test_meter_data(self) -> None: servicer = mock_api.MockMicrogridServicer() server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - servicer.add_component( - 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER - ) - servicer.add_component( - 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY - ) + try: + microgrid = self.create_client(57899) - with pytest.raises(ValueError): - ## should raise a ValueError for missing component_id - await microgrid.meter_data(20) + servicer.add_component( + 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER + ) + servicer.add_component( + 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + ) - with pytest.raises(ValueError): - ## should raise a ValueError for wrong component category - await microgrid.meter_data(38) - peekable = (await microgrid.meter_data(83)).into_peekable() - await asyncio.sleep(0.2) - await server.stop(0.1) + with pytest.raises(ValueError): + ## should raise a ValueError for missing component_id + await microgrid.meter_data(20) + + with pytest.raises(ValueError): + ## should raise a ValueError for wrong component category + await microgrid.meter_data(38) + peekable = (await microgrid.meter_data(83)).into_peekable() + await asyncio.sleep(0.2) + + finally: + await server.stop(0.1) latest = peekable.peek() assert isinstance(latest, MeterData) @@ -376,25 +395,29 @@ async def test_battery_data(self) -> None: servicer = mock_api.MockMicrogridServicer() server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - servicer.add_component( - 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY - ) - servicer.add_component( - 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER - ) + try: + microgrid = self.create_client(57899) - with pytest.raises(ValueError): - ## should raise a ValueError for missing component_id - await microgrid.meter_data(20) + servicer.add_component( + 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + ) + servicer.add_component( + 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER + ) + + with pytest.raises(ValueError): + ## should raise a ValueError for missing component_id + await microgrid.meter_data(20) - with pytest.raises(ValueError): - ## should raise a ValueError for wrong component category - await microgrid.meter_data(38) - peekable = (await microgrid.battery_data(83)).into_peekable() - await asyncio.sleep(0.2) - await server.stop(0.1) + with pytest.raises(ValueError): + ## should raise a ValueError for wrong component category + await microgrid.meter_data(38) + peekable = (await microgrid.battery_data(83)).into_peekable() + await asyncio.sleep(0.2) + + finally: + await server.stop(0.1) latest = peekable.peek() assert isinstance(latest, BatteryData) @@ -404,25 +427,29 @@ async def test_inverter_data(self) -> None: servicer = mock_api.MockMicrogridServicer() server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - servicer.add_component( - 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER - ) - servicer.add_component( - 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY - ) + try: + microgrid = self.create_client(57899) + + servicer.add_component( + 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER + ) + servicer.add_component( + 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + ) + + with pytest.raises(ValueError): + ## should raise a ValueError for missing component_id + await microgrid.meter_data(20) - with pytest.raises(ValueError): - ## should raise a ValueError for missing component_id - await microgrid.meter_data(20) + with pytest.raises(ValueError): + ## should raise a ValueError for wrong component category + await microgrid.meter_data(38) + peekable = (await microgrid.inverter_data(83)).into_peekable() + await asyncio.sleep(0.2) - with pytest.raises(ValueError): - ## should raise a ValueError for wrong component category - await microgrid.meter_data(38) - peekable = (await microgrid.inverter_data(83)).into_peekable() - await asyncio.sleep(0.2) - await server.stop(0.1) + finally: + await server.stop(0.1) latest = peekable.peek() assert isinstance(latest, InverterData) @@ -432,25 +459,29 @@ async def test_ev_charger_data(self) -> None: servicer = mock_api.MockMicrogridServicer() server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - servicer.add_component( - 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_EV_CHARGER - ) - servicer.add_component( - 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY - ) + try: + microgrid = self.create_client(57899) - with pytest.raises(ValueError): - ## should raise a ValueError for missing component_id - await microgrid.meter_data(20) + servicer.add_component( + 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_EV_CHARGER + ) + servicer.add_component( + 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY + ) + + with pytest.raises(ValueError): + ## should raise a ValueError for missing component_id + await microgrid.meter_data(20) + + with pytest.raises(ValueError): + ## should raise a ValueError for wrong component category + await microgrid.meter_data(38) + peekable = (await microgrid.ev_charger_data(83)).into_peekable() + await asyncio.sleep(0.2) - with pytest.raises(ValueError): - ## should raise a ValueError for wrong component category - await microgrid.meter_data(38) - peekable = (await microgrid.ev_charger_data(83)).into_peekable() - await asyncio.sleep(0.2) - await server.stop(0.1) + finally: + await server.stop(0.1) latest = peekable.peek() assert isinstance(latest, EVChargerData) @@ -462,18 +493,22 @@ async def test_charge(self) -> None: server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - servicer.add_component( - 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER - ) + try: + microgrid = self.create_client(57899) + + servicer.add_component( + 83, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER + ) + + await microgrid.set_power(component_id=83, power_w=12) - await microgrid.set_power(component_id=83, power_w=12) + assert servicer.latest_charge is not None + assert servicer.latest_charge.component_id == 83 + assert servicer.latest_charge.power_w == 12 - assert servicer.latest_charge is not None - assert servicer.latest_charge.component_id == 83 - assert servicer.latest_charge.power_w == 12 - await server.stop(0.1) + finally: + await server.stop(0.1) async def test_discharge(self) -> None: """Check if discharge is able to discharge component.""" @@ -481,44 +516,51 @@ async def test_discharge(self) -> None: server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - servicer.add_component( - 73, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER - ) + try: + microgrid = self.create_client(57899) - await microgrid.set_power(component_id=73, power_w=-15) + servicer.add_component( + 73, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_METER + ) - assert servicer.latest_discharge is not None - assert servicer.latest_discharge.component_id == 73 - assert servicer.latest_discharge.power_w == 15 - await server.stop(0.1) + await microgrid.set_power(component_id=73, power_w=-15) + + assert servicer.latest_discharge is not None + assert servicer.latest_discharge.component_id == 73 + assert servicer.latest_discharge.power_w == 15 + finally: + await server.stop(0.1) async def test_set_bounds(self) -> None: servicer = mock_api.MockMicrogridServicer() server = mock_api.MockGrpcServer(servicer, port=57899) await server.start() - microgrid = self.create_client(57899) - - servicer.add_component( - 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER - ) - num_calls = 4 + try: + microgrid = self.create_client(57899) - expected_bounds = [ - microgrid_pb.SetBoundsParam( - component_id=comp_id, - target_metric=microgrid_pb.SetBoundsParam.TargetMetric.TARGET_METRIC_POWER_ACTIVE, - bounds=common_pb.Bounds(lower=-10, upper=2), + servicer.add_component( + 38, microgrid_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER ) - for comp_id in range(num_calls) - ] - for cid in range(num_calls): - await microgrid.set_bounds(cid, -10.0, 2.0) - await asyncio.sleep(0.1) - await server.stop(0.1) + num_calls = 4 + + target_metric = microgrid_pb.SetBoundsParam.TargetMetric + expected_bounds = [ + microgrid_pb.SetBoundsParam( + component_id=comp_id, + target_metric=target_metric.TARGET_METRIC_POWER_ACTIVE, + bounds=common_pb.Bounds(lower=-10, upper=2), + ) + for comp_id in range(num_calls) + ] + for cid in range(num_calls): + await microgrid.set_bounds(cid, -10.0, 2.0) + await asyncio.sleep(0.1) + + finally: + await server.stop(0.1) assert len(expected_bounds) == len(servicer.get_bounds()) diff --git a/tests/test_microgrid/test_component_data.py b/tests/test_microgrid/test_component_data.py index 7903ce6e5..d36996e29 100644 --- a/tests/test_microgrid/test_component_data.py +++ b/tests/test_microgrid/test_component_data.py @@ -7,7 +7,8 @@ License MIT """ -import datetime + +from datetime import datetime, timezone import pytest @@ -18,4 +19,4 @@ def test_component_data_abstract_class() -> None: """Verify the base class ComponentData may not be instantiated.""" with pytest.raises(TypeError): # pylint: disable=abstract-class-instantiated - ComponentData(0, datetime.datetime.utcnow()) # type: ignore + ComponentData(0, datetime.now(timezone.utc)) # type: ignore diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py new file mode 100644 index 000000000..1fa8ada2f --- /dev/null +++ b/tests/timeseries/test_resampling.py @@ -0,0 +1,174 @@ +""" +Tests for the `TimeSeriesResampler` + +Copyright +Copyright © 2021 Frequenz Energy-as-a-Service GmbH + +License +MIT +""" + +from datetime import datetime, timedelta, timezone +from typing import Sequence + +import time_machine + +from frequenz.sdk.timeseries import GroupResampler, Resampler, Sample + + +# pylint: disable=unused-argument +def resampling_function_sum( + samples: Sequence[Sample], resampling_period_s: float +) -> float: + """Calculate sum of the provided values. + + Args: + samples: sequences of samples to apply the average to + resampling_period_s: value describing how often resampling should be performed, + in seconds + + Returns: + sum of all the sample values + + Raises: + AssertionError if there are no provided samples + """ + assert len(samples) > 0, "Avg function cannot be given an empty list of samples" + return sum(sample.value for sample in samples if sample.value is not None) + + +@time_machine.travel(0, tick=False) +def test_component_metric_resampler_remove_outdated_samples() -> None: + """Test if outdated samples are being properly removed.""" + resampler = Resampler( + resampling_period_s=0.2, + max_data_age_in_periods=1.0, + resampling_function=resampling_function_sum, + ) + + timestamp = datetime.now(timezone.utc) + sample1 = Sample(timestamp, value=5.0) + sample2 = Sample(timestamp + timedelta(seconds=1), value=12.0) + resampler.add_sample(sample1) + resampler.add_sample(sample2) + + resampler._remove_outdated_samples(threshold=timestamp) + assert list(resampler._buffer) == [ + sample1, + sample2, + ] # pylint: disable=protected-access + + resampler._remove_outdated_samples(threshold=timestamp + timedelta(seconds=0.5)) + assert list(resampler._buffer) == [sample2] # pylint: disable=protected-access + + resampler._remove_outdated_samples(threshold=timestamp + timedelta(seconds=1.01)) + assert len(resampler._buffer) == 0 # pylint: disable=protected-access + + +@time_machine.travel(0, tick=False) +def test_component_metric_resampler_resample() -> None: + """Test if resampling function works as expected.""" + resampler = Resampler( + resampling_period_s=0.2, + max_data_age_in_periods=5.0, + resampling_function=resampling_function_sum, + ) + + now = datetime.now(timezone.utc) + timestamp1 = now - timedelta(seconds=0.5) + timestamp2 = now - timedelta(seconds=0.2) + + value1 = 5.0 + value2 = 15.0 + + sample1 = Sample(timestamp1, value=value1) + sample2 = Sample(timestamp2, value=value2) + + resampler.add_sample(sample1) + resampler.add_sample(sample2) + + value = resampler.resample() + assert value is not None + assert value == sum([value1, value2]) + + value = resampler.resample(now + timedelta(seconds=0.6)) + assert value is not None + assert value == value2 + + +@time_machine.travel(0, tick=False) +def test_component_metric_resampler_resample_with_outdated_samples() -> None: + """Test that resampling function doesn't take outdated samples into account.""" + resampler = Resampler( + resampling_period_s=0.2, + max_data_age_in_periods=5.0, + resampling_function=resampling_function_sum, + ) + + timestamp = datetime.now(timezone.utc) + + value1 = 100.0 + value2 = 15.0 + value3 = 5.0 + + sample1 = Sample(timestamp - timedelta(seconds=1.01), value=value1) + sample2 = Sample(timestamp - timedelta(seconds=0.7), value=value2) + sample3 = Sample(timestamp - timedelta(seconds=0.5), value=value3) + + resampler.add_sample(sample1) + resampler.add_sample(sample2) + resampler.add_sample(sample3) + + value = resampler.resample() + assert value is not None + assert value == sum([value2, value3]) + + +@time_machine.travel(0, tick=False) +def test_component_metric_group_resampler() -> None: + """Test if resampling is properly delegated to component metric resamplers.""" + resampler = GroupResampler( + resampling_period_s=0.2, + max_data_age_in_periods=5.0, + initial_resampling_function=resampling_function_sum, + ) + + time_series_id_1 = "123_active_power" + time_series_id_2 = "99_active_power" + + resampler.add_time_series(time_series_id=time_series_id_1) + resampler.add_time_series(time_series_id=time_series_id_2) + + now = datetime.now(timezone.utc) + + value11 = 5.0 + value12 = 15.0 + value21 = 100.0 + value22 = 999.0 + + sample11 = Sample(now - timedelta(seconds=0.7), value=value11) + sample12 = Sample(now - timedelta(seconds=0.5), value=value12) + sample21 = Sample(now - timedelta(seconds=5.05), value=value21) + sample22 = Sample(now - timedelta(seconds=0.99), value=value22) + + resampler.add_sample(time_series_id=time_series_id_1, sample=sample11) + resampler.add_sample(time_series_id=time_series_id_1, sample=sample12) + resampler.add_sample(time_series_id=time_series_id_2, sample=sample21) + resampler.add_sample(time_series_id=time_series_id_2, sample=sample22) + + resampled_samples = dict(resampler.resample()) + + assert resampled_samples[time_series_id_1].timestamp >= now + assert resampled_samples[time_series_id_1].value == sum([value11, value12]) + + assert resampled_samples[time_series_id_2].timestamp >= now + assert resampled_samples[time_series_id_2].value == value22 + + timestamp = now + timedelta(seconds=0.5) + resampled_samples = dict(resampler.resample(timestamp)) + + assert resampled_samples[time_series_id_1].timestamp == timestamp + assert resampled_samples[time_series_id_1].value == value12 + + assert resampled_samples[time_series_id_2].timestamp == timestamp + assert resampled_samples[time_series_id_2].value is None