diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index afb10a8ef..5d0ea5456 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,12 +6,35 @@ ## Upgrading - +* The resampler now takes a `name` argument for `add_timeseries()`. This is only used for logging purposes. + +* The resampler and resampling actor now takes a `ResamplerConfig` object in the constructor instead of the individual values. + +* The resampler and resampling actor can emit errors or warnings if the buffer needed to resample is too big. If it is bigger than `ResamplingConfig.max_buffer_len`, the buffer will be truncated to that length, so the resampling can lose accuracy. + +* The `ResamplingFunction` now takes different arguments: + + * `resampling_period_s` was removed. + * `resampler_config` is the configuration of the resampler calling the resampling function. + * `source_properties` is the properties of the source being resampled. ## New Features - +* The resampler and resampling actor can now take a few new options via the new `ResamplerConfig` object: + + * `warn_buffer_len`: The minimum length of the resampling buffer that will emit a warning. + * `max_buffer_len`: The maximum length of the resampling buffer. + +* The resampler now infers the input sampling rate of sources and use a buffer size according to it. + + This information can be consulted via `resampler.get_source_properties(source)`. The buffer size is now calculated so it can store all the needed samples requested via the combination of `resampling_period_s`, `max_data_age_in_periods` and the calculated `input_sampling_period_s`. + + If we are upsampling, one sample could be enough for back-filling, but we store `max_data_age_in_periods` using `input_sampling_period_s` as period, so the resampling functions can do more complex inter/extrapolation if they need to. + + If we are downsampling, we want a buffer that can hold `max_data_age_in_periods * resampling_period_s` seconds of data, and we have one sample every `input_sampling_period_s`, so we use a buffer length of: `max_data_age_in_periods * resampling_period_s / input_sampling_period_s` ## Bug Fixes - +* Fixed logger creationg for some modules. + + Some modules didn't create the logger properly so there was no way to configure them using the standard logger configuration system. Because of this, it might have happened that some log messages were never showed, or some message that the user didn't want to get were emitted anyway. diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index 6e85708dc..f4ff601f4 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -8,11 +8,17 @@ from typing import Sequence from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper +from frequenz.sdk.timeseries._resampling import ( + ResamplerConfig, + SourceProperties, + _ResamplingHelper, +) def nop( # pylint: disable=unused-argument - samples: Sequence[Sample], resampling_period_s: float + samples: Sequence[Sample], + resampler_config: ResamplerConfig, + source_properties: SourceProperties, ) -> float: """Return 0.0.""" return 0.0 @@ -21,12 +27,15 @@ def nop( # pylint: disable=unused-argument def _benchmark_resampling_helper(resamples: int, samples: int) -> None: """Benchmark the resampling helper.""" helper = _ResamplingHelper( + "benchmark", ResamplerConfig( resampling_period_s=1.0, max_data_age_in_periods=3.0, resampling_function=nop, initial_buffer_len=samples * 3, - ) + warn_buffer_len=samples * 3 + 2, + max_buffer_len=samples * 3 + 3, + ), ) now = datetime.now(timezone.utc) diff --git a/examples/resampling.py b/examples/resampling.py index 6412a240d..ce01782e1 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -105,7 +105,9 @@ async def run() -> None: # pylint: disable=too-many-locals average_chan = Broadcast[Sample]("average") second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0)) - second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample) + second_stage_resampler.add_timeseries( + "avg", average_chan.new_receiver(), _print_sample + ) average_sender = average_chan.new_sender() # Needed until channels Senders raises exceptions on errors diff --git a/src/frequenz/sdk/_data_handling/time_series.py b/src/frequenz/sdk/_data_handling/time_series.py index cea613486..4d5474344 100644 --- a/src/frequenz/sdk/_data_handling/time_series.py +++ b/src/frequenz/sdk/_data_handling/time_series.py @@ -15,7 +15,7 @@ Key = TypeVar("Key") Value = TypeVar("Value") -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) SYMBOL_SEGMENT_SEPARATOR = "_" diff --git a/src/frequenz/sdk/_data_ingestion/component_info.py b/src/frequenz/sdk/_data_ingestion/component_info.py index 6825eb9b0..9a273025f 100644 --- a/src/frequenz/sdk/_data_ingestion/component_info.py +++ b/src/frequenz/sdk/_data_ingestion/component_info.py @@ -10,7 +10,7 @@ from ..microgrid import ComponentGraph from ..microgrid.component import ComponentCategory -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) @dataclass diff --git a/src/frequenz/sdk/_data_ingestion/formula_calculator.py b/src/frequenz/sdk/_data_ingestion/formula_calculator.py index 8a31d7d2f..3b9cd1ee4 100644 --- a/src/frequenz/sdk/_data_ingestion/formula_calculator.py +++ b/src/frequenz/sdk/_data_ingestion/formula_calculator.py @@ -36,7 +36,7 @@ METRIC_PV_PROD, ) -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) @dataclass(frozen=True) diff --git a/src/frequenz/sdk/_data_ingestion/load_historic_data.py b/src/frequenz/sdk/_data_ingestion/load_historic_data.py index 3aa5c4eac..6e9218b28 100644 --- a/src/frequenz/sdk/_data_ingestion/load_historic_data.py +++ b/src/frequenz/sdk/_data_ingestion/load_historic_data.py @@ -22,7 +22,7 @@ import pyarrow.parquet as pq from tqdm import tqdm -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) # directory path to all component data of a particular site HISTDATA_DIR = "/data" diff --git a/src/frequenz/sdk/_data_ingestion/microgrid_data.py b/src/frequenz/sdk/_data_ingestion/microgrid_data.py index 057ffd21e..d6a6c75b7 100644 --- a/src/frequenz/sdk/_data_ingestion/microgrid_data.py +++ b/src/frequenz/sdk/_data_ingestion/microgrid_data.py @@ -27,7 +27,7 @@ from .formula_calculator import FormulaCalculator from .gen_component_receivers import gen_component_receivers -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) CONFIG_FILE_FORMULA_PREFIX = "formula_" diff --git a/src/frequenz/sdk/actor/_decorator.py b/src/frequenz/sdk/actor/_decorator.py index ffdcd00b0..5bafffe4f 100644 --- a/src/frequenz/sdk/actor/_decorator.py +++ b/src/frequenz/sdk/actor/_decorator.py @@ -14,7 +14,7 @@ import logging from typing import Any, Generic, Optional, Type, TypeVar -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) OT = TypeVar("OT") diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index db9aa3e5b..af3cbb49e 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -19,7 +19,7 @@ from ._data_sourcing import ComponentMetricRequest from ._decorator import actor -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) @actor @@ -85,7 +85,7 @@ async def sink_adapter(sample: Sample) -> None: if not await sender.send(sample): raise Exception(f"Error while sending with sender {sender}", sender) - self._resampler.add_timeseries(receiver, sink_adapter) + self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter) async def _process_resampling_requests(self) -> None: """Process resampling data requests.""" diff --git a/src/frequenz/sdk/microgrid/client/_client.py b/src/frequenz/sdk/microgrid/client/_client.py index e296f1b10..68a16f257 100644 --- a/src/frequenz/sdk/microgrid/client/_client.py +++ b/src/frequenz/sdk/microgrid/client/_client.py @@ -44,7 +44,7 @@ EVChargerData, ) -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) class MicrogridApiClient(ABC): diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 93319ff18..ea374a343 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -13,25 +13,40 @@ from collections import deque from dataclasses import dataclass from datetime import datetime, timedelta -from typing import AsyncIterator, Callable, Coroutine, Sequence +from typing import AsyncIterator, Callable, Coroutine, Optional, Sequence from frequenz.channels.util import Timer from ..util.asyncio import cancel_and_await from . import Sample -_logger = logging.Logger(__name__) +_logger = logging.getLogger(__name__) DEFAULT_BUFFER_LEN_INIT = 16 """Default initial buffer length. Buffers will be created initially with this length, but they could grow or -shrink depending on the source characteristics, like sampling rate, to make +shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored. """ +DEFAULT_BUFFER_LEN_MAX = 1024 +"""Default maximum allowed buffer length. + +If a buffer length would get bigger than this, it will be truncated to this +length. +""" + + +DEFAULT_BUFFER_LEN_WARN = 128 +"""Default minimum buffer length that will produce a warning. + +If a buffer length would get bigger than this, a warning will be logged. +""" + + Source = AsyncIterator[Sample] """A source for a timeseries. @@ -56,7 +71,9 @@ """ -ResamplingFunction = Callable[[Sequence[Sample], float], float] +ResamplingFunction = Callable[ + [Sequence[Sample], "ResamplerConfig", "SourceProperties"], float +] """Resampling function type. A resampling function produces a new sample based on a list of pre-existing @@ -71,9 +88,11 @@ timestamp in the input data). Args: - input_samples (Sequence[Sample]): the sequence of pre-existing samples. - resampling_period_s (float): the period in seconds (i.e. how ofter a new sample is - produced. + input_samples (Sequence[Sample]): The sequence of pre-existing samples. + resampler_config (ResamplerConfig): The configuration of the resampling + calling this function. + source_properties (SourceProperties): The properties of the source being + resampled. Returns: new_sample (float): The value of new sample produced after the resampling. @@ -81,13 +100,18 @@ # pylint: disable=unused-argument -def average(samples: Sequence[Sample], resampling_period_s: float) -> float: +def average( + samples: Sequence[Sample], + resampler_config: ResamplerConfig, + source_properties: SourceProperties, +) -> float: """Calculate average of all the provided values. Args: samples: The samples to apply the average to. It must be non-empty. - resampling_period_s: The time it passes between resampled data is - produced (in seconds). + resampler_config: The configuration of the resampler calling this + function. + source_properties: The properties of the source being resampled. Returns: The average of all `samples` values. @@ -105,15 +129,30 @@ class ResamplerConfig: """The resampling period in seconds. This is the time it passes between resampled data should be calculated. + + It must be a positive number. """ max_data_age_in_periods: float = 3.0 """The maximum age a sample can have to be considered *relevant* for resampling. - Expressed in number of resampling periods. For example if - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data - older than 3*2 = 6 secods will be discarded when creating a new sample and - never passed to the resampling function. + Expressed in number of periods, where period is the `resampling_period_s` + if we are downsampling (resampling period bigger than the input period) or + the input sampling period if we are upsampling (input period bigger than + the resampling period). + + It must be bigger than 1.0. + + Example: + If `resampling_period_s` is 3, the input sampling period is + 1 and `max_data_age_in_periods` is 2, then data older than 3*2 + = 6 secods will be discarded when creating a new sample and never + passed to the resampling function. + + If `resampling_period_s` is 3, the input sampling period is + 5 and `max_data_age_in_periods` is 2, then data older than 5*2 + = 10 secods will be discarded when creating a new sample and never + passed to the resampling function. """ resampling_function: ResamplingFunction = average @@ -127,11 +166,74 @@ class ResamplerConfig: initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. - The buffer could grow or shrink depending on the source characteristics, + The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored. + + It must be at least 1 and at most `max_buffer_len`. + """ + + warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN + """The minimum length of the resampling buffer that will emit a warning. + + If a buffer grows bigger than this value, it will emit a warning in the + logs, so buffers don't grow too big inadvertly. + + It must be at least 1 and at most `max_buffer_len`. """ + max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX + """The maximum length of the resampling buffer. + + Buffers won't be allowed to grow beyond this point even if it would be + needed to keep all the requested past sampling periods. An error will be + emitted in the logs if the buffer length needs to be truncated to this + value. + + It must be at bigger than `warn_buffer_len`. + """ + + def __post_init__(self) -> None: + """Check that config values are valid. + + Raises: + ValueError: If any value is out of range. + """ + if self.resampling_period_s < 0.0: + raise ValueError( + f"resampling_period_s ({self.resampling_period_s}) must be positive" + ) + if self.max_data_age_in_periods < 1.0: + raise ValueError( + f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0" + ) + if self.warn_buffer_len < 1: + raise ValueError( + f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1" + ) + if self.max_buffer_len <= self.warn_buffer_len: + raise ValueError( + f"max_buffer_len ({self.max_buffer_len}) should " + f"be bigger than warn_buffer_len ({self.warn_buffer_len})" + ) + + if self.initial_buffer_len < 1: + raise ValueError( + f"initial_buffer_len ({self.initial_buffer_len}) should at least 1" + ) + if self.initial_buffer_len > self.max_buffer_len: + raise ValueError( + f"initial_buffer_len ({self.initial_buffer_len}) is bigger " + f"than max_buffer_len ({self.max_buffer_len}), use a smaller " + "initial_buffer_len or a bigger max_buffer_len" + ) + if self.initial_buffer_len > self.warn_buffer_len: + _logger.warning( + "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)", + self.initial_buffer_len, + self.warn_buffer_len, + ) + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -199,6 +301,29 @@ def __repr__(self) -> str: return f"{self.__class__.__name__}({self.exceptions=})" +@dataclass +class SourceProperties: + """Properties of a resampling source.""" + + sampling_start: Optional[datetime] = None + """The time when resampling started for this source. + + `None` means it didn't started yet. + """ + + received_samples: int = 0 + """Total samples received by this source so far.""" + + sampling_period_s: Optional[float] = None + """The sampling period of this source (in seconds). + + This is we receive (on average) one sample for this source every + `sampling_period_s` seconds. + + `None` means it is unknown. + """ + + class Resampler: """A timeseries resampler. @@ -233,14 +358,26 @@ def config(self) -> ResamplerConfig: """ return self._config + def get_source_properties(self, source: Source) -> SourceProperties: + """Get the properties of a timeseries source. + + Args: + source: The source from which to get the properties. + + Returns: + The timeseries source properties. + """ + return self._resamplers[source].source_properties + async def stop(self) -> None: """Cancel all receiving tasks.""" await asyncio.gather(*[helper.stop() for helper in self._resamplers.values()]) - def add_timeseries(self, source: Source, sink: Sink) -> bool: + def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: """Start resampling a new timeseries. Args: + name: The name of the timeseries (for logging purposes). source: The source of the timeseries to resample. sink: The sink to use to send the resampled data. @@ -252,7 +389,9 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool: if source in self._resamplers: return False - resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink) + resampler = _StreamingHelper( + _ResamplingHelper(name, self._config), source, sink + ) self._resamplers[source] = resampler return True @@ -313,19 +452,32 @@ class _ResamplingHelper: """Keeps track of *relevant* samples to pass them to the resampling function. Samples are stored in an internal ring buffer. All collected samples that - are newer than `resampling_period_s * max_data_age_in_periods` seconds are - considered *relevant* and are passed to the provided `resampling_function` - when calling the `resample()` method. All older samples are discarded. + are newer than `max(resampling_period_s, input_period_s) + * max_data_age_in_periods` seconds are considered *relevant* and are passed + to the provided `resampling_function` when calling the `resample()` method. + All older samples are discarded. """ - def __init__(self, config: ResamplerConfig) -> None: + def __init__(self, name: str, config: ResamplerConfig) -> None: """Initialize an instance. Args: - config: The configuration for the resampler. + name: The name of this resampler helper (for logging purposes). + config: The configuration for this resampler helper. """ + self._name = name self._config = config self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len) + self._source_properties: SourceProperties = SourceProperties() + + @property + def source_properties(self) -> SourceProperties: + """Return the properties of the source. + + Returns: + The properties of the source. + """ + return self._source_properties def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -334,6 +486,109 @@ def add_sample(self, sample: Sample) -> None: sample: The sample to be added to the buffer. """ self._buffer.append(sample) + if self._source_properties.sampling_start is None: + self._source_properties.sampling_start = sample.timestamp + self._source_properties.received_samples += 1 + + def _update_source_sample_period(self, now: datetime) -> bool: + """Update the source sample period. + + Args: + now: The datetime in which this update happens. + + Returns: + Whether the source sample period was changed (was really updated). + """ + assert ( + self._buffer.maxlen is not None and self._buffer.maxlen > 0 + ), "We need a maxlen of at least 1 to update the sample period" + + config = self._config + props = self._source_properties + + # We only update it if we didn't before and we have enough data + if ( + props.sampling_period_s is not None + or props.sampling_start is None + or props.received_samples + < config.resampling_period_s * config.max_data_age_in_periods + or len(self._buffer) < self._buffer.maxlen + # There might be a race between the first sample being received and + # this function being called + or now <= props.sampling_start + ): + return False + + samples_time_delta = now - props.sampling_start + props.sampling_period_s = ( + samples_time_delta.total_seconds() + ) / props.received_samples + + _logger.info( + "New input sampling period calculated for %r: %ss", + self._name, + props.sampling_period_s, + ) + return True + + def _update_buffer_len(self) -> bool: + """Update the length of the buffer based on the source properties. + + Returns: + Whether the buffer length was changed (was really updated). + """ + input_sampling_period_s = self._source_properties.sampling_period_s + + # To make type checking happy + assert input_sampling_period_s is not None + assert self._buffer.maxlen is not None + + config = self._config + + # If we are upsampling, one sample could be enough for back-filling, but + # we store max_data_age_in_periods for input periods, so resampling + # functions can do more complex inter/extrapolation if they need to. + if input_sampling_period_s > config.resampling_period_s: + new_buffer_len = input_sampling_period_s * config.max_data_age_in_periods + # If we are upsampling, we want a buffer that can hold + # max_data_age_in_periods * resampling_period_s seconds of data, and we + # one sample every input_sampling_period_s. + else: + new_buffer_len = ( + config.resampling_period_s + / input_sampling_period_s + * config.max_data_age_in_periods + ) + + new_buffer_len = max(1, math.ceil(new_buffer_len)) + if new_buffer_len > config.max_buffer_len: + _logger.error( + "The new buffer length (%s) for timeseries %s is too big, using %s instead", + new_buffer_len, + self._name, + config.max_buffer_len, + ) + new_buffer_len = config.max_buffer_len + elif new_buffer_len > config.warn_buffer_len: + _logger.warning( + "The new buffer length (%s) for timeseries %s bigger than %s", + new_buffer_len, + self._name, + config.warn_buffer_len, + ) + + if new_buffer_len == self._buffer.maxlen: + return False + + _logger.info( + "New buffer length calculated for %r: %s", + self._name, + new_buffer_len, + ) + + self._buffer = deque(self._buffer, maxlen=new_buffer_len) + + return True def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -347,10 +602,23 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ + if self._update_source_sample_period(timestamp): + self._update_buffer_len() + conf = self._config + props = self._source_properties + + # To see which samples are relevant we need to consider if we are down + # or upsampling. + period = ( + max(conf.resampling_period_s, props.sampling_period_s) + if props.sampling_period_s is not None + else conf.resampling_period_s + ) minimum_relevant_timestamp = timestamp - timedelta( - seconds=conf.resampling_period_s * conf.max_data_age_in_periods + seconds=period * conf.max_data_age_in_periods ) + # We need to pass a dummy Sample to bisect because it only support # specifying a key extraction function in Python 3.10, so we need to # compare samples at the moment. @@ -363,7 +631,7 @@ def resample(self, timestamp: datetime) -> Sample: # resort to some C (or similar) implementation. relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) value = ( - conf.resampling_function(relevant_samples, conf.resampling_period_s) + conf.resampling_function(relevant_samples, conf, props) if relevant_samples else None ) @@ -393,6 +661,15 @@ def __init__( self._receive_samples() ) + @property + def source_properties(self) -> SourceProperties: + """Get the source properties. + + Returns: + The source properties. + """ + return self._helper.source_properties + async def stop(self) -> None: """Cancel the receiving task.""" await cancel_and_await(self._receiving_task) diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index e41741c72..8dcabf469 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -25,7 +25,7 @@ ) from ._resampled_formula_builder import ResampledFormulaBuilder -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) class LogicalMeter: diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 61b0fdb28..78ac92c62 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -5,6 +5,9 @@ Tests for the `TimeSeriesResampler` """ +from __future__ import annotations + +import logging from datetime import datetime, timedelta, timezone from typing import AsyncIterator, Iterator from unittest.mock import AsyncMock, MagicMock @@ -16,13 +19,17 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._resampling import ( + DEFAULT_BUFFER_LEN_MAX, + DEFAULT_BUFFER_LEN_WARN, Resampler, ResamplerConfig, ResamplingError, ResamplingFunction, Sink, Source, + SourceProperties, SourceStoppedError, + _ResamplingHelper, ) from ..utils import a_sequence @@ -78,6 +85,94 @@ async def _assert_no_more_samples( # pylint: disable=too-many-arguments resampling_fun_mock.reset_mock() +@pytest.mark.parametrize("init_len", list(range(1, DEFAULT_BUFFER_LEN_WARN + 1, 16))) +async def test_resampler_config_len_ok( + init_len: int, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test checks on the resampling buffer.""" + config = ResamplerConfig( + resampling_period_s=1.0, + initial_buffer_len=init_len, + ) + assert config.initial_buffer_len == init_len + # Ignore errors produced by wrongly finalized gRPC server in unrelated tests + assert _filter_logs(caplog.record_tuples, logger_name="") == [] + + +@pytest.mark.parametrize( + "init_len", + range(DEFAULT_BUFFER_LEN_WARN + 1, DEFAULT_BUFFER_LEN_MAX + 1, 64), +) +async def test_resampler_config_len_warn( + init_len: int, caplog: pytest.LogCaptureFixture +) -> None: + """Test checks on the resampling buffer.""" + config = ResamplerConfig( + resampling_period_s=1.0, + initial_buffer_len=init_len, + ) + assert config.initial_buffer_len == init_len + # Ignore errors produced by wrongly finalized gRPC server in unrelated tests + assert _filter_logs( + caplog.record_tuples, + logger_name="frequenz.sdk.timeseries._resampling", + ) == [ + ( + "frequenz.sdk.timeseries._resampling", + logging.WARNING, + f"initial_buffer_len ({init_len}) is bigger than " + f"warn_buffer_len ({DEFAULT_BUFFER_LEN_WARN})", + ) + ] + + +@pytest.mark.parametrize( + "init_len", + list(range(-2, 1)) + [DEFAULT_BUFFER_LEN_MAX + 1, DEFAULT_BUFFER_LEN_MAX + 2], +) +async def test_resampler_config_len_error(init_len: int) -> None: + """Test checks on the resampling buffer.""" + with pytest.raises(ValueError): + _ = ResamplerConfig( + resampling_period_s=1.0, + initial_buffer_len=init_len, + ) + + +async def test_helper_buffer_too_big( + fake_time: time_machine.Coordinates, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test checks on the resampling buffer.""" + config = ResamplerConfig( + resampling_period_s=DEFAULT_BUFFER_LEN_MAX + 1, + max_data_age_in_periods=1, + ) + helper = _ResamplingHelper("test", config) + + for i in range(DEFAULT_BUFFER_LEN_MAX + 1): + sample = Sample(datetime.now(timezone.utc), i) + helper.add_sample(sample) + fake_time.shift(1) + + _ = helper.resample(datetime.now(timezone.utc)) + # Ignore errors produced by wrongly finalized gRPC server in unrelated tests + assert _filter_logs( + caplog.record_tuples, + logger_name="frequenz.sdk.timeseries._resampling", + ) == [ + ( + "frequenz.sdk.timeseries._resampling", + logging.ERROR, + f"The new buffer length ({DEFAULT_BUFFER_LEN_MAX + 1}) " + f"for timeseries test is too big, using {DEFAULT_BUFFER_LEN_MAX} instead", + ) + ] + # pylint: disable=protected-access + assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX + + async def test_resampling_with_one_window( fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample] ) -> None: @@ -90,20 +185,21 @@ async def test_resampling_with_one_window( resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.0, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + initial_buffer_len=4, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Test timeline # @@ -128,8 +224,12 @@ async def test_resampling_with_one_window( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s), resampling_period_s + a_sequence(sample1s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=2, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -151,8 +251,16 @@ async def test_resampling_with_one_window( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), resampling_period_s + a_sequence(sample2_5s, sample3s, sample4s), config, source_props ) + # By now we have a full buffer (5 samples and a buffer of length 4), which + # we received in 4 seconds, so we have an input period of 0.8s. + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=0.8 + ) + # The buffer should be able to hold 2 seconds of data, and data is coming + # every 0.8 seconds, so we should be able to store 3 samples. + assert _get_buffer_len(resampler, source_recvr) == 3 sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -165,6 +273,10 @@ async def test_resampling_with_one_window( resampling_period_s, current_iteration=3, ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=0.8 + ) + assert _get_buffer_len(resampler, source_recvr) == 3 # Even when a lot could be refactored to use smaller functions, I'm allowing @@ -182,20 +294,21 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.5, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.5, + resampling_function=resampling_fun_mock, + initial_buffer_len=7, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Test timeline # @@ -220,8 +333,12 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), resampling_period_s + a_sequence(sample0s, sample1s), config, source_props ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=2, sampling_period_s=None + ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -244,8 +361,12 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (1, 4] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), resampling_period_s + a_sequence(sample2_5s, sample3s, sample4s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -266,8 +387,17 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (3, 6] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample4s, sample5s, sample6s), resampling_period_s + a_sequence(sample4s, sample5s, sample6s), config, source_props ) + # By now we have a full buffer (7 samples and a buffer of length 6), which + # we received in 4 seconds, so we have an input period of 6/7s. + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7 + ) + # The buffer should be able to hold 2 * 1.5 (3) seconds of data, and data + # is coming every 6/7 seconds (~0.857s), so we should be able to store + # 4 samples. + assert _get_buffer_len(resampler, source_recvr) == 4 sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -284,7 +414,9 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (5, 8] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample6s), resampling_period_s + a_sequence(sample6s), + config, + source_props, ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -298,6 +430,10 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma resampling_period_s, current_iteration=5, ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7 + ) + assert _get_buffer_len(resampler, source_recvr) == 4 # Even when a lot could be refactored to use smaller functions, I'm allowing @@ -315,20 +451,21 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + initial_buffer_len=16, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Test timeline # @@ -353,8 +490,12 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), resampling_period_s + a_sequence(sample0s, sample1s), config, source_props ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=2, sampling_period_s=None + ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -377,9 +518,12 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (0, 4] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s, sample2_5s, sample3s, sample4s), - resampling_period_s, + a_sequence(sample1s, sample2_5s, sample3s, sample4s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -401,8 +545,13 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen # It should include samples in the interval (2, 6] seconds resampling_fun_mock.assert_called_once_with( a_sequence(sample2_5s, sample3s, sample4s, sample5s, sample6s), - resampling_period_s, + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -419,9 +568,12 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (4, 8] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample5s, sample6s), - resampling_period_s, + a_sequence(sample5s, sample6s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -434,6 +586,10 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen resampling_period_s, current_iteration=5, ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=None + ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len async def test_receiving_stopped_resampling_error( @@ -448,20 +604,20 @@ async def test_receiving_stopped_resampling_error( resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Send a sample and run a resample tick, advancing the fake time by one period sample0s = Sample(timestamp, value=5.0) @@ -476,7 +632,7 @@ async def test_receiving_stopped_resampling_error( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s), resampling_period_s + a_sequence(sample0s), config, source_props ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -527,7 +683,7 @@ async def make_fake_source() -> Source: fake_source = make_fake_source() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(fake_source, sink_mock) + resampler.add_timeseries("test", fake_source, sink_mock) # Try to resample fake_time.shift(resampling_period_s) @@ -540,3 +696,16 @@ async def make_fake_source() -> Source: assert fake_source in exceptions timeseries_error = exceptions[fake_source] assert isinstance(timeseries_error, TestException) + + +def _get_buffer_len(resampler: Resampler, source_recvr: Source) -> int: + # pylint: disable=protected-access + blen = resampler._resamplers[source_recvr]._helper._buffer.maxlen + assert blen is not None + return blen + + +def _filter_logs( + record_tuples: list[tuple[str, int, str]], *, logger_name: str +) -> list[tuple[str, int, str]]: + return [t for t in record_tuples if t[0] == logger_name]