From 3d80455ee0116fbbf021fc999944d9bf2b553cc9 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 29 Dec 2022 16:00:21 -0300 Subject: [PATCH 1/7] Fix wrong logger creation Loggers need to be created using logging.getLogger() instead of logging.Logger(), otherwise they don't go through the configuration system, and they can't be controlled in tests for example. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/_data_handling/time_series.py | 2 +- src/frequenz/sdk/_data_ingestion/component_info.py | 2 +- src/frequenz/sdk/_data_ingestion/formula_calculator.py | 2 +- src/frequenz/sdk/_data_ingestion/load_historic_data.py | 2 +- src/frequenz/sdk/_data_ingestion/microgrid_data.py | 2 +- src/frequenz/sdk/actor/_decorator.py | 2 +- src/frequenz/sdk/actor/_resampling.py | 2 +- src/frequenz/sdk/microgrid/client/_client.py | 2 +- src/frequenz/sdk/timeseries/_resampling.py | 2 +- src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/frequenz/sdk/_data_handling/time_series.py b/src/frequenz/sdk/_data_handling/time_series.py index cea613486..4d5474344 100644 --- a/src/frequenz/sdk/_data_handling/time_series.py +++ b/src/frequenz/sdk/_data_handling/time_series.py @@ -15,7 +15,7 @@ Key = TypeVar("Key") Value = TypeVar("Value") -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) SYMBOL_SEGMENT_SEPARATOR = "_" diff --git a/src/frequenz/sdk/_data_ingestion/component_info.py b/src/frequenz/sdk/_data_ingestion/component_info.py index 6825eb9b0..9a273025f 100644 --- a/src/frequenz/sdk/_data_ingestion/component_info.py +++ b/src/frequenz/sdk/_data_ingestion/component_info.py @@ -10,7 +10,7 @@ from ..microgrid import ComponentGraph from ..microgrid.component import ComponentCategory -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) @dataclass diff --git a/src/frequenz/sdk/_data_ingestion/formula_calculator.py b/src/frequenz/sdk/_data_ingestion/formula_calculator.py index 8a31d7d2f..3b9cd1ee4 100644 --- a/src/frequenz/sdk/_data_ingestion/formula_calculator.py +++ b/src/frequenz/sdk/_data_ingestion/formula_calculator.py @@ -36,7 +36,7 @@ METRIC_PV_PROD, ) -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) @dataclass(frozen=True) diff --git a/src/frequenz/sdk/_data_ingestion/load_historic_data.py b/src/frequenz/sdk/_data_ingestion/load_historic_data.py index 3aa5c4eac..6e9218b28 100644 --- a/src/frequenz/sdk/_data_ingestion/load_historic_data.py +++ b/src/frequenz/sdk/_data_ingestion/load_historic_data.py @@ -22,7 +22,7 @@ import pyarrow.parquet as pq from tqdm import tqdm -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) # directory path to all component data of a particular site HISTDATA_DIR = "/data" diff --git a/src/frequenz/sdk/_data_ingestion/microgrid_data.py b/src/frequenz/sdk/_data_ingestion/microgrid_data.py index 057ffd21e..d6a6c75b7 100644 --- a/src/frequenz/sdk/_data_ingestion/microgrid_data.py +++ b/src/frequenz/sdk/_data_ingestion/microgrid_data.py @@ -27,7 +27,7 @@ from .formula_calculator import FormulaCalculator from .gen_component_receivers import gen_component_receivers -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) CONFIG_FILE_FORMULA_PREFIX = "formula_" diff --git a/src/frequenz/sdk/actor/_decorator.py b/src/frequenz/sdk/actor/_decorator.py index ffdcd00b0..5bafffe4f 100644 --- a/src/frequenz/sdk/actor/_decorator.py +++ b/src/frequenz/sdk/actor/_decorator.py @@ -14,7 +14,7 @@ import logging from typing import Any, Generic, Optional, Type, TypeVar -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) OT = TypeVar("OT") diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index db9aa3e5b..e87afcce5 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -19,7 +19,7 @@ from ._data_sourcing import ComponentMetricRequest from ._decorator import actor -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) @actor diff --git a/src/frequenz/sdk/microgrid/client/_client.py b/src/frequenz/sdk/microgrid/client/_client.py index e296f1b10..68a16f257 100644 --- a/src/frequenz/sdk/microgrid/client/_client.py +++ b/src/frequenz/sdk/microgrid/client/_client.py @@ -44,7 +44,7 @@ EVChargerData, ) -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) class MicrogridApiClient(ABC): diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 93319ff18..87aae5c70 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -20,7 +20,7 @@ from ..util.asyncio import cancel_and_await from . import Sample -_logger = logging.Logger(__name__) +_logger = logging.getLogger(__name__) DEFAULT_BUFFER_LEN_INIT = 16 diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index e41741c72..8dcabf469 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -25,7 +25,7 @@ ) from ._resampled_formula_builder import ResampledFormulaBuilder -logger = logging.Logger(__name__) +logger = logging.getLogger(__name__) class LogicalMeter: From eb12a3b510edd0813b77a9b87165a3b682f0915b Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 23 Dec 2022 12:40:11 -0300 Subject: [PATCH 2/7] Add a maximum resampling buffer size This is mainly because in a near future the resampling buffer size will dynamically change based on the input sampling rate, so we need to make sure it doesn't get out of hand in the event of some unlikely high input sampling rate combined with a very low frequency resampling. We also add a warning size, for which we just emit a warning in the logs this is to also try to minimize the presence of very big buffers without affecting the running apps (unless again, it gets out of hand). When this happens users should either try to use a different resampling period to keep buffer smaller, or change the (default) warning buffer length if they know it is still manageable. Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 2 + src/frequenz/sdk/timeseries/_resampling.py | 60 ++++++++++++++++++++++ tests/timeseries/test_resampling.py | 58 +++++++++++++++++++++ 3 files changed, 120 insertions(+) diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index 6e85708dc..87c1cf5d6 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -26,6 +26,8 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None: max_data_age_in_periods=3.0, resampling_function=nop, initial_buffer_len=samples * 3, + max_buffer_len=samples * 3, + warn_buffer_len=samples * 3, ) ) now = datetime.now(timezone.utc) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 87aae5c70..940e0658b 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -32,6 +32,21 @@ """ +DEFAULT_BUFFER_LEN_MAX = 1024 +"""Default maximum allowed buffer length. + +If a buffer length would get bigger than this, it will be truncated to this +length. +""" + + +DEFAULT_BUFFER_LEN_WARN = 128 +"""Default minimum buffer length that will produce a warning. + +If a buffer length would get bigger than this, a warning will be logged. +""" + + Source = AsyncIterator[Sample] """A source for a timeseries. @@ -132,6 +147,51 @@ class ResamplerConfig: can be stored. """ + warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN + """The minimum length of the resampling buffer that will emit a warning. + + If a buffer grows bigger than this value, it will emit a warning in the + logs, so buffers don't grow too big inadvertly. + """ + + max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX + """The maximum length of the resampling buffer. + + Buffers won't be allowed to grow beyond this point even if it would be + needed to keep all the requested past sampling periods. An error will be + emitted in the logs if the buffer length needs to be truncated to this + value. + """ + + def __post_init__(self) -> None: + """Check config values are valid. + + Raises: + ValueError: if the initial buffer length is too small (less than 2) + or too big (more than `max_buffer_len`). + """ + if self.initial_buffer_len < 2: + raise ValueError( + f"initial_buffer_len must be at least 2, got {self.initial_buffer_len}" + ) + if self.initial_buffer_len > self.max_buffer_len: + raise ValueError( + f"initial_buffer_len be smaller than {self.max_buffer_len}, " + "got {self.initial_buffer_len}" + ) + if self.initial_buffer_len > self.warn_buffer_len: + _logger.warning( + "initial_buffer_len (%s) is bigger than %s", + self.initial_buffer_len, + self.warn_buffer_len, + ) + if self.initial_buffer_len > self.warn_buffer_len: + _logger.warning( + "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)", + self.initial_buffer_len, + self.warn_buffer_len, + ) + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 61b0fdb28..2f4b471af 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -16,6 +16,8 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._resampling import ( + DEFAULT_BUFFER_LEN_MAX, + DEFAULT_BUFFER_LEN_WARN, Resampler, ResamplerConfig, ResamplingError, @@ -78,6 +80,62 @@ async def _assert_no_more_samples( # pylint: disable=too-many-arguments resampling_fun_mock.reset_mock() +@pytest.mark.parametrize("init_len", list(range(2, DEFAULT_BUFFER_LEN_WARN + 1, 16))) +async def test_resampler_config_len_ok( + init_len: int, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test checks on the resampling buffer.""" + config = ResamplerConfig( + resampling_period_s=1.0, + initial_buffer_len=init_len, + ) + assert config.initial_buffer_len == init_len + assert caplog.records == [] + + +@pytest.mark.parametrize( + "init_len", + range(DEFAULT_BUFFER_LEN_WARN + 1, DEFAULT_BUFFER_LEN_MAX + 1, 64), +) +async def test_resampler_config_len_warn( + init_len: int, caplog: pytest.LogCaptureFixture +) -> None: + """Test checks on the resampling buffer.""" + config = ResamplerConfig( + resampling_period_s=1.0, + initial_buffer_len=init_len, + ) + assert config.initial_buffer_len == init_len + for record in caplog.records: + assert record.levelname == "WARNING" + assert caplog.text.startswith("") + assert ( + caplog.text + == f"initial_buffer_len ({init_len}) is bigger than {DEFAULT_BUFFER_LEN_WARN}" + assert caplog.record_tuples == [ + ( + "frequenz.sdk.timeseries._resampling", + logging.WARNING, + f"initial_buffer_len ({init_len}) is bigger than " + f"warn_buffer_len ({DEFAULT_BUFFER_LEN_WARN})", + ) + ] + + +@pytest.mark.parametrize( + "init_len", + list(range(-2, 2)) + [DEFAULT_BUFFER_LEN_MAX + 1, DEFAULT_BUFFER_LEN_MAX + 2], +) +async def test_resampler_config_len_error(init_len: int) -> None: + """Test checks on the resampling buffer.""" + with pytest.raises(ValueError): + _ = ResamplerConfig( + resampling_period_s=1.0, + initial_buffer_len=init_len, + ) + + async def test_resampling_with_one_window( fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample] ) -> None: From 34a1dd508c648d71125ff2bc27cf6196938809eb Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 23 Dec 2022 17:04:48 -0300 Subject: [PATCH 3/7] Pass the whole ResamplerConfig to the ResamplingFunction Instead of only passing the resampling period, we pass the whole ResamplerConfig so the resampling function has more information on how the resampling is being done to make better decisions. Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 2 +- src/frequenz/sdk/timeseries/_resampling.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index 87c1cf5d6..b90480835 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -12,7 +12,7 @@ def nop( # pylint: disable=unused-argument - samples: Sequence[Sample], resampling_period_s: float + samples: Sequence[Sample], resampler_config: ResamplerConfig ) -> float: """Return 0.0.""" return 0.0 diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 940e0658b..c743e4605 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -71,7 +71,7 @@ """ -ResamplingFunction = Callable[[Sequence[Sample], float], float] +ResamplingFunction = Callable[[Sequence[Sample], "ResamplerConfig"], float] """Resampling function type. A resampling function produces a new sample based on a list of pre-existing @@ -86,9 +86,9 @@ timestamp in the input data). Args: - input_samples (Sequence[Sample]): the sequence of pre-existing samples. - resampling_period_s (float): the period in seconds (i.e. how ofter a new sample is - produced. + input_samples (Sequence[Sample]): The sequence of pre-existing samples. + resampler_config (ResamplerConfig): The configuration of the resampling + calling this function. Returns: new_sample (float): The value of new sample produced after the resampling. @@ -96,13 +96,13 @@ # pylint: disable=unused-argument -def average(samples: Sequence[Sample], resampling_period_s: float) -> float: +def average(samples: Sequence[Sample], resampler_config: ResamplerConfig) -> float: """Calculate average of all the provided values. Args: samples: The samples to apply the average to. It must be non-empty. - resampling_period_s: The time it passes between resampled data is - produced (in seconds). + resampler_config: The configuration of the resampler calling this + function. Returns: The average of all `samples` values. From 3513f90d600d2537c24fbc3451d6c1892c2cfef5 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 23 Dec 2022 17:24:20 -0300 Subject: [PATCH 4/7] Calculate the input sampling period The input sampling period is calculated by counting the total received samples, and dividing the total resampling time until the internal buffer is full by the total received samples. To store all the source properties (sampling period, sampling start, total received samples) a new class is used (SourceProperties) and this information is passed to the resampling function, so it can have more information about the source to perform a proper resampling. This calculation is performed once and then remains constant, but at some point we could add a timer to re-calculate it. This also improves slightly the documentation and validation of the ResamplingConfig class. The add_timeseries() methods now also takes a name as a way to identify different sources in log messages (the actor uses the channel name as timeseries name). Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 17 +- examples/resampling.py | 4 +- src/frequenz/sdk/actor/_resampling.py | 2 +- src/frequenz/sdk/timeseries/_resampling.py | 219 ++++++++++++++++++--- tests/timeseries/test_resampling.py | 133 ++++++++----- 5 files changed, 293 insertions(+), 82 deletions(-) diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index b90480835..f4ff601f4 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -8,11 +8,17 @@ from typing import Sequence from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper +from frequenz.sdk.timeseries._resampling import ( + ResamplerConfig, + SourceProperties, + _ResamplingHelper, +) def nop( # pylint: disable=unused-argument - samples: Sequence[Sample], resampler_config: ResamplerConfig + samples: Sequence[Sample], + resampler_config: ResamplerConfig, + source_properties: SourceProperties, ) -> float: """Return 0.0.""" return 0.0 @@ -21,14 +27,15 @@ def nop( # pylint: disable=unused-argument def _benchmark_resampling_helper(resamples: int, samples: int) -> None: """Benchmark the resampling helper.""" helper = _ResamplingHelper( + "benchmark", ResamplerConfig( resampling_period_s=1.0, max_data_age_in_periods=3.0, resampling_function=nop, initial_buffer_len=samples * 3, - max_buffer_len=samples * 3, - warn_buffer_len=samples * 3, - ) + warn_buffer_len=samples * 3 + 2, + max_buffer_len=samples * 3 + 3, + ), ) now = datetime.now(timezone.utc) diff --git a/examples/resampling.py b/examples/resampling.py index 6412a240d..ce01782e1 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -105,7 +105,9 @@ async def run() -> None: # pylint: disable=too-many-locals average_chan = Broadcast[Sample]("average") second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0)) - second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample) + second_stage_resampler.add_timeseries( + "avg", average_chan.new_receiver(), _print_sample + ) average_sender = average_chan.new_sender() # Needed until channels Senders raises exceptions on errors diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index e87afcce5..af3cbb49e 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -85,7 +85,7 @@ async def sink_adapter(sample: Sample) -> None: if not await sender.send(sample): raise Exception(f"Error while sending with sender {sender}", sender) - self._resampler.add_timeseries(receiver, sink_adapter) + self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter) async def _process_resampling_requests(self) -> None: """Process resampling data requests.""" diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index c743e4605..146c2164b 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -13,7 +13,7 @@ from collections import deque from dataclasses import dataclass from datetime import datetime, timedelta -from typing import AsyncIterator, Callable, Coroutine, Sequence +from typing import AsyncIterator, Callable, Coroutine, Optional, Sequence from frequenz.channels.util import Timer @@ -27,7 +27,7 @@ """Default initial buffer length. Buffers will be created initially with this length, but they could grow or -shrink depending on the source characteristics, like sampling rate, to make +shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored. """ @@ -71,7 +71,9 @@ """ -ResamplingFunction = Callable[[Sequence[Sample], "ResamplerConfig"], float] +ResamplingFunction = Callable[ + [Sequence[Sample], "ResamplerConfig", "SourceProperties"], float +] """Resampling function type. A resampling function produces a new sample based on a list of pre-existing @@ -89,6 +91,8 @@ input_samples (Sequence[Sample]): The sequence of pre-existing samples. resampler_config (ResamplerConfig): The configuration of the resampling calling this function. + source_properties (SourceProperties): The properties of the source being + resampled. Returns: new_sample (float): The value of new sample produced after the resampling. @@ -96,13 +100,18 @@ # pylint: disable=unused-argument -def average(samples: Sequence[Sample], resampler_config: ResamplerConfig) -> float: +def average( + samples: Sequence[Sample], + resampler_config: ResamplerConfig, + source_properties: SourceProperties, +) -> float: """Calculate average of all the provided values. Args: samples: The samples to apply the average to. It must be non-empty. resampler_config: The configuration of the resampler calling this function. + source_properties: The properties of the source being resampled. Returns: The average of all `samples` values. @@ -120,15 +129,30 @@ class ResamplerConfig: """The resampling period in seconds. This is the time it passes between resampled data should be calculated. + + It must be a positive number. """ max_data_age_in_periods: float = 3.0 """The maximum age a sample can have to be considered *relevant* for resampling. - Expressed in number of resampling periods. For example if - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data - older than 3*2 = 6 secods will be discarded when creating a new sample and - never passed to the resampling function. + Expressed in number of periods, where period is the `resampling_period_s` + if we are downsampling (resampling period bigger than the input period) or + the input sampling period if we are upsampling (input period bigger than + the resampling period). + + It must be bigger than 1.0. + + Example: + If `resampling_period_s` is 3, the input sampling period is + 1 and `max_data_age_in_periods` is 2, then data older than 3*2 + = 6 secods will be discarded when creating a new sample and never + passed to the resampling function. + + If `resampling_period_s` is 3, the input sampling period is + 5 and `max_data_age_in_periods` is 2, then data older than 5*2 + = 10 secods will be discarded when creating a new sample and never + passed to the resampling function. """ resampling_function: ResamplingFunction = average @@ -142,9 +166,11 @@ class ResamplerConfig: initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. - The buffer could grow or shrink depending on the source characteristics, + The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored. + + It must be at least 1 and at most `max_buffer_len`. """ warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN @@ -152,6 +178,8 @@ class ResamplerConfig: If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertly. + + It must be at least 1 and at most `max_buffer_len`. """ max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX @@ -161,29 +189,43 @@ class ResamplerConfig: needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value. + + It must be at bigger than `warn_buffer_len`. """ def __post_init__(self) -> None: - """Check config values are valid. + """Check that config values are valid. Raises: - ValueError: if the initial buffer length is too small (less than 2) - or too big (more than `max_buffer_len`). + ValueError: If any value is out of range. """ - if self.initial_buffer_len < 2: + if self.resampling_period_s < 0.0: raise ValueError( - f"initial_buffer_len must be at least 2, got {self.initial_buffer_len}" + f"resampling_period_s ({self.resampling_period_s}) must be positive" ) - if self.initial_buffer_len > self.max_buffer_len: + if self.max_data_age_in_periods < 1.0: raise ValueError( - f"initial_buffer_len be smaller than {self.max_buffer_len}, " - "got {self.initial_buffer_len}" + f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0" ) - if self.initial_buffer_len > self.warn_buffer_len: - _logger.warning( - "initial_buffer_len (%s) is bigger than %s", - self.initial_buffer_len, - self.warn_buffer_len, + if self.warn_buffer_len < 1: + raise ValueError( + f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1" + ) + if self.max_buffer_len <= self.warn_buffer_len: + raise ValueError( + f"max_buffer_len ({self.max_buffer_len}) should " + f"be bigger than warn_buffer_len ({self.warn_buffer_len})" + ) + + if self.initial_buffer_len < 1: + raise ValueError( + f"initial_buffer_len ({self.initial_buffer_len}) should at least 1" + ) + if self.initial_buffer_len > self.max_buffer_len: + raise ValueError( + f"initial_buffer_len ({self.initial_buffer_len}) is bigger " + f"than max_buffer_len ({self.max_buffer_len}), use a smaller " + "initial_buffer_len or a bigger max_buffer_len" ) if self.initial_buffer_len > self.warn_buffer_len: _logger.warning( @@ -259,6 +301,29 @@ def __repr__(self) -> str: return f"{self.__class__.__name__}({self.exceptions=})" +@dataclass +class SourceProperties: + """Properties of a resampling source.""" + + sampling_start: Optional[datetime] = None + """The time when resampling started for this source. + + `None` means it didn't started yet. + """ + + received_samples: int = 0 + """Total samples received by this source so far.""" + + sampling_period_s: Optional[float] = None + """The sampling period of this source (in seconds). + + This is we receive (on average) one sample for this source every + `sampling_period_s` seconds. + + `None` means it is unknown. + """ + + class Resampler: """A timeseries resampler. @@ -293,14 +358,26 @@ def config(self) -> ResamplerConfig: """ return self._config + def get_source_properties(self, source: Source) -> SourceProperties: + """Get the properties of a timeseries source. + + Args: + source: The source from which to get the properties. + + Returns: + The timeseries source properties. + """ + return self._resamplers[source].source_properties + async def stop(self) -> None: """Cancel all receiving tasks.""" await asyncio.gather(*[helper.stop() for helper in self._resamplers.values()]) - def add_timeseries(self, source: Source, sink: Sink) -> bool: + def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: """Start resampling a new timeseries. Args: + name: The name of the timeseries (for logging purposes). source: The source of the timeseries to resample. sink: The sink to use to send the resampled data. @@ -312,7 +389,9 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool: if source in self._resamplers: return False - resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink) + resampler = _StreamingHelper( + _ResamplingHelper(name, self._config), source, sink + ) self._resamplers[source] = resampler return True @@ -373,19 +452,32 @@ class _ResamplingHelper: """Keeps track of *relevant* samples to pass them to the resampling function. Samples are stored in an internal ring buffer. All collected samples that - are newer than `resampling_period_s * max_data_age_in_periods` seconds are - considered *relevant* and are passed to the provided `resampling_function` - when calling the `resample()` method. All older samples are discarded. + are newer than `max(resampling_period_s, input_period_s) + * max_data_age_in_periods` seconds are considered *relevant* and are passed + to the provided `resampling_function` when calling the `resample()` method. + All older samples are discarded. """ - def __init__(self, config: ResamplerConfig) -> None: + def __init__(self, name: str, config: ResamplerConfig) -> None: """Initialize an instance. Args: - config: The configuration for the resampler. + name: The name of this resampler helper (for logging purposes). + config: The configuration for this resampler helper. """ + self._name = name self._config = config self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len) + self._source_properties: SourceProperties = SourceProperties() + + @property + def source_properties(self) -> SourceProperties: + """Return the properties of the source. + + Returns: + The properties of the source. + """ + return self._source_properties def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -394,6 +486,50 @@ def add_sample(self, sample: Sample) -> None: sample: The sample to be added to the buffer. """ self._buffer.append(sample) + if self._source_properties.sampling_start is None: + self._source_properties.sampling_start = sample.timestamp + self._source_properties.received_samples += 1 + + def _update_source_sample_period(self, now: datetime) -> bool: + """Update the source sample period. + + Args: + now: The datetime in which this update happens. + + Returns: + Whether the source sample period was changed (was really updated). + """ + assert ( + self._buffer.maxlen is not None and self._buffer.maxlen > 0 + ), "We need a maxlen of at least 1 to update the sample period" + + config = self._config + props = self._source_properties + + # We only update it if we didn't before and we have enough data + if ( + props.sampling_period_s is not None + or props.sampling_start is None + or props.received_samples + < config.resampling_period_s * config.max_data_age_in_periods + or len(self._buffer) < self._buffer.maxlen + # There might be a race between the first sample being received and + # this function being called + or now <= props.sampling_start + ): + return False + + samples_time_delta = now - props.sampling_start + props.sampling_period_s = ( + samples_time_delta.total_seconds() + ) / props.received_samples + + _logger.info( + "New input sampling period calculated for %r: %ss", + self._name, + props.sampling_period_s, + ) + return True def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -407,10 +543,22 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ + self._update_source_sample_period(timestamp) + conf = self._config + props = self._source_properties + + # To see which samples are relevant we need to consider if we are down + # or upsampling. + period = ( + max(conf.resampling_period_s, props.sampling_period_s) + if props.sampling_period_s is not None + else conf.resampling_period_s + ) minimum_relevant_timestamp = timestamp - timedelta( - seconds=conf.resampling_period_s * conf.max_data_age_in_periods + seconds=period * conf.max_data_age_in_periods ) + # We need to pass a dummy Sample to bisect because it only support # specifying a key extraction function in Python 3.10, so we need to # compare samples at the moment. @@ -423,7 +571,7 @@ def resample(self, timestamp: datetime) -> Sample: # resort to some C (or similar) implementation. relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) value = ( - conf.resampling_function(relevant_samples, conf.resampling_period_s) + conf.resampling_function(relevant_samples, conf, props) if relevant_samples else None ) @@ -453,6 +601,15 @@ def __init__( self._receive_samples() ) + @property + def source_properties(self) -> SourceProperties: + """Get the source properties. + + Returns: + The source properties. + """ + return self._helper.source_properties + async def stop(self) -> None: """Cancel the receiving task.""" await cancel_and_await(self._receiving_task) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 2f4b471af..7b3c8b284 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -24,6 +24,7 @@ ResamplingFunction, Sink, Source, + SourceProperties, SourceStoppedError, ) @@ -80,7 +81,7 @@ async def _assert_no_more_samples( # pylint: disable=too-many-arguments resampling_fun_mock.reset_mock() -@pytest.mark.parametrize("init_len", list(range(2, DEFAULT_BUFFER_LEN_WARN + 1, 16))) +@pytest.mark.parametrize("init_len", list(range(1, DEFAULT_BUFFER_LEN_WARN + 1, 16))) async def test_resampler_config_len_ok( init_len: int, caplog: pytest.LogCaptureFixture, @@ -125,7 +126,7 @@ async def test_resampler_config_len_warn( @pytest.mark.parametrize( "init_len", - list(range(-2, 2)) + [DEFAULT_BUFFER_LEN_MAX + 1, DEFAULT_BUFFER_LEN_MAX + 2], + list(range(-2, 1)) + [DEFAULT_BUFFER_LEN_MAX + 1, DEFAULT_BUFFER_LEN_MAX + 2], ) async def test_resampler_config_len_error(init_len: int) -> None: """Test checks on the resampling buffer.""" @@ -148,20 +149,21 @@ async def test_resampling_with_one_window( resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.0, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + initial_buffer_len=4, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Test timeline # @@ -186,7 +188,10 @@ async def test_resampling_with_one_window( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s), resampling_period_s + a_sequence(sample1s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=2, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -209,7 +214,12 @@ async def test_resampling_with_one_window( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), resampling_period_s + a_sequence(sample2_5s, sample3s, sample4s), config, source_props + ) + # By now we have a full buffer (5 samples and a buffer of length 4), which + # we received in 4 seconds, so we have an input period of 0.8s. + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=0.8 ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -223,6 +233,9 @@ async def test_resampling_with_one_window( resampling_period_s, current_iteration=3, ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=0.8 + ) # Even when a lot could be refactored to use smaller functions, I'm allowing @@ -240,20 +253,21 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.5, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.5, + resampling_function=resampling_fun_mock, + initial_buffer_len=7, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Test timeline # @@ -278,7 +292,10 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), resampling_period_s + a_sequence(sample0s, sample1s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=2, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -302,7 +319,10 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (1, 4] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), resampling_period_s + a_sequence(sample2_5s, sample3s, sample4s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -324,7 +344,12 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (3, 6] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample4s, sample5s, sample6s), resampling_period_s + a_sequence(sample4s, sample5s, sample6s), config, source_props + ) + # By now we have a full buffer (7 samples and a buffer of length 6), which + # we received in 4 seconds, so we have an input period of 6/7s. + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7 ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -342,7 +367,9 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (5, 8] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample6s), resampling_period_s + a_sequence(sample6s), + config, + source_props, ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -356,6 +383,9 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma resampling_period_s, current_iteration=5, ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7 + ) # Even when a lot could be refactored to use smaller functions, I'm allowing @@ -373,20 +403,21 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + initial_buffer_len=16, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Test timeline # @@ -411,7 +442,10 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), resampling_period_s + a_sequence(sample0s, sample1s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=2, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -435,8 +469,10 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (0, 4] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s, sample2_5s, sample3s, sample4s), - resampling_period_s, + a_sequence(sample1s, sample2_5s, sample3s, sample4s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=5, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -459,7 +495,11 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen # It should include samples in the interval (2, 6] seconds resampling_fun_mock.assert_called_once_with( a_sequence(sample2_5s, sample3s, sample4s, sample5s, sample6s), - resampling_period_s, + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -477,8 +517,10 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (4, 8] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample5s, sample6s), - resampling_period_s, + a_sequence(sample5s, sample6s), config, source_props + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=None ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -492,6 +534,9 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen resampling_period_s, current_iteration=5, ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=7, sampling_period_s=None + ) async def test_receiving_stopped_resampling_error( @@ -506,20 +551,20 @@ async def test_receiving_stopped_resampling_error( resampling_fun_mock = MagicMock( spec=ResamplingFunction, return_value=expected_resampled_value ) - resampler = Resampler( - ResamplerConfig( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, - ) + config = ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, ) + resampler = Resampler(config) source_recvr = source_chan.new_receiver() source_sendr = source_chan.new_sender() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(source_recvr, sink_mock) + resampler.add_timeseries("test", source_recvr, sink_mock) + source_props = resampler.get_source_properties(source_recvr) # Send a sample and run a resample tick, advancing the fake time by one period sample0s = Sample(timestamp, value=5.0) @@ -534,7 +579,7 @@ async def test_receiving_stopped_resampling_error( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s), resampling_period_s + a_sequence(sample0s), config, source_props ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -585,7 +630,7 @@ async def make_fake_source() -> Source: fake_source = make_fake_source() sink_mock = AsyncMock(spec=Sink, return_value=True) - resampler.add_timeseries(fake_source, sink_mock) + resampler.add_timeseries("test", fake_source, sink_mock) # Try to resample fake_time.shift(resampling_period_s) From 541d3c90b5c204c3f8e8836d818c212570582f89 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 23 Dec 2022 17:27:51 -0300 Subject: [PATCH 5/7] Update the buffer length based on the input sampling period When the input sampling period is know, we can adjust the internal buffer to have a good size to be able to store all the requested resampling and not waste space if the initial buffer was too big. If we are upsampling, one sample could be enough for back-filling, but we store max_data_age_in_periods using the input sampling period, so the resampling functions can do more complex inter/extrapolation if they need to. If we are upsampling, we want a buffer that can hold max_data_age_in_periods * resampling_period_s seconds of data, and we have one sample every input_sampling_period_s, so we use a buffer length of: max_data_age_in_periods * resampling_period_s / input_sampling_period_s The buffer size, like the input sampling period, is only calculated once. The buffer data is copied to a new buffer when/if the buffer needs resizing. The new size is validated against config.warn_buffer_len and config.max_buffer_len and will emit a warning if it goes above any of those and if it goes above config.max_buffer_len it will be truncated to config.max_buffer_len as a safety measure. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/timeseries/_resampling.py | 62 +++++++++++++++++++++- tests/timeseries/test_resampling.py | 55 +++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 146c2164b..ea374a343 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -531,6 +531,65 @@ def _update_source_sample_period(self, now: datetime) -> bool: ) return True + def _update_buffer_len(self) -> bool: + """Update the length of the buffer based on the source properties. + + Returns: + Whether the buffer length was changed (was really updated). + """ + input_sampling_period_s = self._source_properties.sampling_period_s + + # To make type checking happy + assert input_sampling_period_s is not None + assert self._buffer.maxlen is not None + + config = self._config + + # If we are upsampling, one sample could be enough for back-filling, but + # we store max_data_age_in_periods for input periods, so resampling + # functions can do more complex inter/extrapolation if they need to. + if input_sampling_period_s > config.resampling_period_s: + new_buffer_len = input_sampling_period_s * config.max_data_age_in_periods + # If we are upsampling, we want a buffer that can hold + # max_data_age_in_periods * resampling_period_s seconds of data, and we + # one sample every input_sampling_period_s. + else: + new_buffer_len = ( + config.resampling_period_s + / input_sampling_period_s + * config.max_data_age_in_periods + ) + + new_buffer_len = max(1, math.ceil(new_buffer_len)) + if new_buffer_len > config.max_buffer_len: + _logger.error( + "The new buffer length (%s) for timeseries %s is too big, using %s instead", + new_buffer_len, + self._name, + config.max_buffer_len, + ) + new_buffer_len = config.max_buffer_len + elif new_buffer_len > config.warn_buffer_len: + _logger.warning( + "The new buffer length (%s) for timeseries %s bigger than %s", + new_buffer_len, + self._name, + config.warn_buffer_len, + ) + + if new_buffer_len == self._buffer.maxlen: + return False + + _logger.info( + "New buffer length calculated for %r: %s", + self._name, + new_buffer_len, + ) + + self._buffer = deque(self._buffer, maxlen=new_buffer_len) + + return True + def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -543,7 +602,8 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ - self._update_source_sample_period(timestamp) + if self._update_source_sample_period(timestamp): + self._update_buffer_len() conf = self._config props = self._source_properties diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 7b3c8b284..4c0820cdb 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -5,6 +5,7 @@ Tests for the `TimeSeriesResampler` """ +import logging from datetime import datetime, timedelta, timezone from typing import AsyncIterator, Iterator from unittest.mock import AsyncMock, MagicMock @@ -26,6 +27,7 @@ Source, SourceProperties, SourceStoppedError, + _ResamplingHelper, ) from ..utils import a_sequence @@ -137,6 +139,35 @@ async def test_resampler_config_len_error(init_len: int) -> None: ) +async def test_helper_buffer_too_big( + fake_time: time_machine.Coordinates, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test checks on the resampling buffer.""" + config = ResamplerConfig( + resampling_period_s=DEFAULT_BUFFER_LEN_MAX + 1, + max_data_age_in_periods=1, + ) + helper = _ResamplingHelper("test", config) + + for i in range(DEFAULT_BUFFER_LEN_MAX + 1): + sample = Sample(datetime.now(timezone.utc), i) + helper.add_sample(sample) + fake_time.shift(1) + + _ = helper.resample(datetime.now(timezone.utc)) + assert caplog.record_tuples == [ + ( + "frequenz.sdk.timeseries._resampling", + logging.ERROR, + f"The new buffer length ({DEFAULT_BUFFER_LEN_MAX + 1}) " + f"for timeseries test is too big, using {DEFAULT_BUFFER_LEN_MAX} instead", + ) + ] + # pylint: disable=protected-access + assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX + + async def test_resampling_with_one_window( fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample] ) -> None: @@ -193,6 +224,7 @@ async def test_resampling_with_one_window( assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -221,6 +253,9 @@ async def test_resampling_with_one_window( assert source_props == SourceProperties( sampling_start=timestamp, received_samples=5, sampling_period_s=0.8 ) + # The buffer should be able to hold 2 seconds of data, and data is coming + # every 0.8 seconds, so we should be able to store 3 samples. + assert _get_buffer_len(resampler, source_recvr) == 3 sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -236,6 +271,7 @@ async def test_resampling_with_one_window( assert source_props == SourceProperties( sampling_start=timestamp, received_samples=5, sampling_period_s=0.8 ) + assert _get_buffer_len(resampler, source_recvr) == 3 # Even when a lot could be refactored to use smaller functions, I'm allowing @@ -297,6 +333,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -324,6 +361,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma assert source_props == SourceProperties( sampling_start=timestamp, received_samples=5, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -351,6 +389,10 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma assert source_props == SourceProperties( sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7 ) + # The buffer should be able to hold 2 * 1.5 (3) seconds of data, and data + # is coming every 6/7 seconds (~0.857s), so we should be able to store + # 4 samples. + assert _get_buffer_len(resampler, source_recvr) == 4 sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -386,6 +428,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma assert source_props == SourceProperties( sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7 ) + assert _get_buffer_len(resampler, source_recvr) == 4 # Even when a lot could be refactored to use smaller functions, I'm allowing @@ -447,6 +490,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -474,6 +518,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen assert source_props == SourceProperties( sampling_start=timestamp, received_samples=5, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -501,6 +546,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen assert source_props == SourceProperties( sampling_start=timestamp, received_samples=7, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -522,6 +568,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen assert source_props == SourceProperties( sampling_start=timestamp, received_samples=7, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -537,6 +584,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen assert source_props == SourceProperties( sampling_start=timestamp, received_samples=7, sampling_period_s=None ) + assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len async def test_receiving_stopped_resampling_error( @@ -643,3 +691,10 @@ async def make_fake_source() -> Source: assert fake_source in exceptions timeseries_error = exceptions[fake_source] assert isinstance(timeseries_error, TestException) + + +def _get_buffer_len(resampler: Resampler, source_recvr: Source) -> int: + # pylint: disable=protected-access + blen = resampler._resamplers[source_recvr]._helper._buffer.maxlen + assert blen is not None + return blen From 8ce069a7642ca4863329180584d996e539a91513 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 30 Dec 2022 12:15:09 -0300 Subject: [PATCH 6/7] Update release notes Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index afb10a8ef..5d0ea5456 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,12 +6,35 @@ ## Upgrading - +* The resampler now takes a `name` argument for `add_timeseries()`. This is only used for logging purposes. + +* The resampler and resampling actor now takes a `ResamplerConfig` object in the constructor instead of the individual values. + +* The resampler and resampling actor can emit errors or warnings if the buffer needed to resample is too big. If it is bigger than `ResamplingConfig.max_buffer_len`, the buffer will be truncated to that length, so the resampling can lose accuracy. + +* The `ResamplingFunction` now takes different arguments: + + * `resampling_period_s` was removed. + * `resampler_config` is the configuration of the resampler calling the resampling function. + * `source_properties` is the properties of the source being resampled. ## New Features - +* The resampler and resampling actor can now take a few new options via the new `ResamplerConfig` object: + + * `warn_buffer_len`: The minimum length of the resampling buffer that will emit a warning. + * `max_buffer_len`: The maximum length of the resampling buffer. + +* The resampler now infers the input sampling rate of sources and use a buffer size according to it. + + This information can be consulted via `resampler.get_source_properties(source)`. The buffer size is now calculated so it can store all the needed samples requested via the combination of `resampling_period_s`, `max_data_age_in_periods` and the calculated `input_sampling_period_s`. + + If we are upsampling, one sample could be enough for back-filling, but we store `max_data_age_in_periods` using `input_sampling_period_s` as period, so the resampling functions can do more complex inter/extrapolation if they need to. + + If we are downsampling, we want a buffer that can hold `max_data_age_in_periods * resampling_period_s` seconds of data, and we have one sample every `input_sampling_period_s`, so we use a buffer length of: `max_data_age_in_periods * resampling_period_s / input_sampling_period_s` ## Bug Fixes - +* Fixed logger creationg for some modules. + + Some modules didn't create the logger properly so there was no way to configure them using the standard logger configuration system. Because of this, it might have happened that some log messages were never showed, or some message that the user didn't want to get were emitted anyway. From 23931fd89845e8ec41e4a68ca19f6e78806f1dad Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 6 Jan 2023 23:20:10 -0300 Subject: [PATCH 7/7] Add a hack to ignore logs unrelated to the tests Some resampling tests started getting some log errors about some gRPC task not being properly closed, which are clearly artefacts from other tests that shouldn't appear there. For example: assert config.initial_buffer_len == init_len > assert caplog.record_tuples == [ ( "frequenz.sdk.timeseries._resampling", logging.WARNING, f"initial_buffer_len ({init_len}) is bigger than " f"warn_buffer_len ({DEFAULT_BUFFER_LEN_WARN})", ) ] E assert [('asyncio', 40, "Task was destroyed but it is pending!\ntask: wait_for=()]>>"), ('asyncio', 40, "Task was destroyed but it is pending!\ntask: wait_for=()]>>"), ('frequenz.sdk.timeseries._resampling', 30, 'initial_buffer_len (257) is bigger than warn_buffer_len (128)')] == [('frequenz.sdk.timeseries._resampling', 30, 'initial_buffer_len (257) is bigger than warn_buffer_len (128)')] E At index 0 diff: ('asyncio', 40, "Task was destroyed but it is pending!\ntask: wait_for=()]>>") != ('frequenz.sdk.timeseries._resampling', 30, 'initial_buffer_len (257) is bigger than warn_buffer_len (128)') E Left contains 2 more items, first extra item: ('asyncio', 40, "Task was destroyed but it is pending!\ntask: wait_for=()]>>") E Full diff: E [ E + ('asyncio', E + 40, E + 'Task was destroyed but it is pending!\n' E + "task: ' E + 'wait_for=()]>>'), E + ('asyncio', E + 40, E + 'Task was destroyed but it is pending!\n' E + "task: ' E + 'wait_for=()]>>'), E ('frequenz.sdk.timeseries._resampling', E 30, E 'initial_buffer_len (257) is bigger than warn_buffer_len (128)'), E ] tests/timeseries/test_resampling.py:113: AssertionError Signed-off-by: Leandro Lucarella --- tests/timeseries/test_resampling.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 4c0820cdb..78ac92c62 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -5,6 +5,8 @@ Tests for the `TimeSeriesResampler` """ +from __future__ import annotations + import logging from datetime import datetime, timedelta, timezone from typing import AsyncIterator, Iterator @@ -94,7 +96,8 @@ async def test_resampler_config_len_ok( initial_buffer_len=init_len, ) assert config.initial_buffer_len == init_len - assert caplog.records == [] + # Ignore errors produced by wrongly finalized gRPC server in unrelated tests + assert _filter_logs(caplog.record_tuples, logger_name="") == [] @pytest.mark.parametrize( @@ -110,13 +113,11 @@ async def test_resampler_config_len_warn( initial_buffer_len=init_len, ) assert config.initial_buffer_len == init_len - for record in caplog.records: - assert record.levelname == "WARNING" - assert caplog.text.startswith("") - assert ( - caplog.text - == f"initial_buffer_len ({init_len}) is bigger than {DEFAULT_BUFFER_LEN_WARN}" - assert caplog.record_tuples == [ + # Ignore errors produced by wrongly finalized gRPC server in unrelated tests + assert _filter_logs( + caplog.record_tuples, + logger_name="frequenz.sdk.timeseries._resampling", + ) == [ ( "frequenz.sdk.timeseries._resampling", logging.WARNING, @@ -156,7 +157,11 @@ async def test_helper_buffer_too_big( fake_time.shift(1) _ = helper.resample(datetime.now(timezone.utc)) - assert caplog.record_tuples == [ + # Ignore errors produced by wrongly finalized gRPC server in unrelated tests + assert _filter_logs( + caplog.record_tuples, + logger_name="frequenz.sdk.timeseries._resampling", + ) == [ ( "frequenz.sdk.timeseries._resampling", logging.ERROR, @@ -698,3 +703,9 @@ def _get_buffer_len(resampler: Resampler, source_recvr: Source) -> int: blen = resampler._resamplers[source_recvr]._helper._buffer.maxlen assert blen is not None return blen + + +def _filter_logs( + record_tuples: list[tuple[str, int, str]], *, logger_name: str +) -> list[tuple[str, int, str]]: + return [t for t in record_tuples if t[0] == logger_name]