From c3fbe591ccc772d9b6b417a68a3d31a97380f7f9 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Mon, 13 Feb 2023 15:16:31 +0100 Subject: [PATCH 1/9] Reset release notes Keep the changes added after v0.18.0 Signed-off-by: Daniel Zullo --- RELEASE_NOTES.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9f1e9d964..9ea924e06 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,17 +2,16 @@ ## Summary + + ## Upgrading + + ## New Features -* A new class `OrderedRingBuffer` is now available, providing a sorted ring buffer of datetime-value pairs with tracking of any values that have not yet been written. -* Add logical meter formula for EV power. -* A `MovingWindow` class has been added that consumes a data stream from a logical meter and updates an `OrderedRingBuffer`. -* Add EVChargerPool implementation. It has only streaming state changes for ev chargers, now. -* Add 3-phase current formulas: `3-phase grid_current` and `3-phase ev_charger_current` to the LogicalMeter. * A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk. ## Bug Fixes -* Add COMPONENT_STATE_DISCHARGING as valid state for the inverter. DISCHARGING state was missing by mistake and this caused the power distributor to error out if the inverter is already discharging. + From 1f561d85c111fb7237f5309e5a2dcb0fa15b9598 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Mon, 13 Feb 2023 14:42:01 +0100 Subject: [PATCH 2/9] Fix logging exception in actor decorator The exception information is added by default when calling logger.exception and it is redundant to re-add it in the message. Signed-off-by: Daniel Zullo --- src/frequenz/sdk/actor/_decorator.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/frequenz/sdk/actor/_decorator.py b/src/frequenz/sdk/actor/_decorator.py index 5bafffe4f..0e6f0b97d 100644 --- a/src/frequenz/sdk/actor/_decorator.py +++ b/src/frequenz/sdk/actor/_decorator.py @@ -207,10 +207,8 @@ async def _start_actor(self) -> None: except asyncio.CancelledError: logger.debug("Cancelling actor: %s", cls.__name__) raise - except Exception as err: # pylint: disable=broad-except - logger.exception( - "Actor (%s) crashed with error: %s", cls.__name__, err - ) + except Exception: # pylint: disable=broad-except + logger.exception("Actor (%s) crashed", cls.__name__) if ( self.restart_limit is None or number_of_restarts < self.restart_limit From 37d499ec8ad7deade3485271df7d584dd5dc9578 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Thu, 9 Feb 2023 20:07:05 +0100 Subject: [PATCH 3/9] Add run() method to simplify awaiting of actors completion The manual join() of actors was causing issues with linting tools such as pylint and mypy due to the indirection created by the actor decorator. The new run() function simplifies and improves the synchronization of actors on the client side. Signed-off-by: Daniel Zullo --- RELEASE_NOTES.md | 1 + examples/power_distribution.py | 12 ++++-------- src/frequenz/sdk/actor/__init__.py | 2 ++ src/frequenz/sdk/actor/_run_utils.py | 27 +++++++++++++++++++++++++++ tests/actor/test_decorator.py | 5 ++--- 5 files changed, 36 insertions(+), 11 deletions(-) create mode 100644 src/frequenz/sdk/actor/_run_utils.py diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9ea924e06..3c819d230 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,6 +11,7 @@ ## New Features * A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk. +* Add the `run(*actors)` function for running and synchronizing the execution of actors. This new function simplifies the way actors are managed on the client side, allowing for a cleaner and more streamlined approach. Users/apps can now run actors simply by calling run(actor1, actor2, actor3...) without the need to manually call join() and deal with linting errors. ## Bug Fixes diff --git a/examples/power_distribution.py b/examples/power_distribution.py index fcdbaed15..9e7bad5f7 100644 --- a/examples/power_distribution.py +++ b/examples/power_distribution.py @@ -18,14 +18,13 @@ from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender -from frequenz.sdk import microgrid +from frequenz.sdk import actor, microgrid from frequenz.sdk.actor import ( ChannelRegistry, ComponentMetricRequest, ComponentMetricsResamplingActor, DataSourcingActor, ResamplerConfig, - actor, ) from frequenz.sdk.actor.power_distributing import ( PowerDistributingActor, @@ -42,7 +41,7 @@ PORT = 61060 -@actor +@actor.actor class DecisionMakingActor: """Actor that receives set receives power for given batteries.""" @@ -112,7 +111,7 @@ async def run(self) -> None: _logger.info("Set power with %d succeed.", power_to_set) -@actor +@actor.actor class DataCollectingActor: """Actor that makes decisions about how much to charge/discharge batteries.""" @@ -227,10 +226,7 @@ async def run() -> None: active_power_data=await logical_meter.grid_power(), ) - # pylint: disable=no-member - await service_actor.join() # type: ignore[attr-defined] - await client_actor.join() # type: ignore[attr-defined] - await power_distributor.join() # type: ignore[attr-defined] + await actor.run(service_actor, client_actor, power_distributor) asyncio.run(run()) diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index 6917895a7..b17162bef 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -9,6 +9,7 @@ from ._data_sourcing import ComponentMetricRequest, DataSourcingActor from ._decorator import actor from ._resampling import ComponentMetricsResamplingActor +from ._run_utils import run __all__ = [ "ChannelRegistry", @@ -18,4 +19,5 @@ "DataSourcingActor", "ResamplerConfig", "actor", + "run", ] diff --git a/src/frequenz/sdk/actor/_run_utils.py b/src/frequenz/sdk/actor/_run_utils.py new file mode 100644 index 000000000..14a6f10cb --- /dev/null +++ b/src/frequenz/sdk/actor/_run_utils.py @@ -0,0 +1,27 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Utility functions to run and synchronize the execution of actors.""" + + +import asyncio +from typing import Any + +from ._decorator import BaseActor + + +async def run(*actors: Any) -> None: + """Await the completion of all actors. + + Args: + actors: the actors to be awaited. + + Raises: + AssertionError: if any of the actors is not an instance of BaseActor. + """ + # Check that each actor is an instance of BaseActor at runtime, + # due to the indirection created by the actor decorator. + for actor in actors: + assert isinstance(actor, BaseActor), f"{actor} is not an instance of BaseActor" + + await asyncio.gather(*(actor.join() for actor in actors)) diff --git a/tests/actor/test_decorator.py b/tests/actor/test_decorator.py index 72644307f..33851671a 100644 --- a/tests/actor/test_decorator.py +++ b/tests/actor/test_decorator.py @@ -5,7 +5,7 @@ from frequenz.channels import Broadcast, Receiver, Sender from frequenz.channels.util import Select -from frequenz.sdk.actor import actor +from frequenz.sdk.actor import actor, run @actor @@ -111,5 +111,4 @@ async def test_actor_does_not_restart() -> None: ) await channel.new_sender().send(1) - # pylint: disable=no-member - await _faulty_actor.join() # type: ignore + await run(_faulty_actor) From 88b8efc2c7939cf0cef5c142bcd85f165af1e85d Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Fri, 10 Feb 2023 17:40:59 +0100 Subject: [PATCH 4/9] Replace asyncio gather() with wait() This will be only useful in the future if the actor decorator does no longer manage the lifecycle of the actor tasks. Logging the tasks done in this case might contain useful debugging information as they include cancellation information. Signed-off-by: Daniel Zullo --- src/frequenz/sdk/actor/_run_utils.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/actor/_run_utils.py b/src/frequenz/sdk/actor/_run_utils.py index 14a6f10cb..434eb4b1a 100644 --- a/src/frequenz/sdk/actor/_run_utils.py +++ b/src/frequenz/sdk/actor/_run_utils.py @@ -5,10 +5,13 @@ import asyncio +import logging from typing import Any from ._decorator import BaseActor +_logger = logging.getLogger(__name__) + async def run(*actors: Any) -> None: """Await the completion of all actors. @@ -24,4 +27,27 @@ async def run(*actors: Any) -> None: for actor in actors: assert isinstance(actor, BaseActor), f"{actor} is not an instance of BaseActor" - await asyncio.gather(*(actor.join() for actor in actors)) + pending_tasks = set() + for actor in actors: + pending_tasks.add(asyncio.create_task(actor.join(), name=str(actor))) + + # Currently the actor decorator manages the life-cycle of the actor tasks + while pending_tasks: + done_tasks, pending_tasks = await asyncio.wait( + pending_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + # This should always be only one task, but we handle many for extra safety + for task in done_tasks: + # Cancellation needs to be checked first, otherwise the other methods + # could raise a CancelledError + if task.cancelled(): + _logger.info("The actor %s was cancelled", task.get_name()) + elif exception := task.exception(): + _logger.error( + "The actor %s was finished due to an uncaught exception", + task.get_name(), + exc_info=exception, + ) + else: + _logger.info("The actor %s finished normally", task.get_name()) From a6a2006273154c6eecc56d69f66b15cfe78ccb6b Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Fri, 17 Feb 2023 18:39:17 +0100 Subject: [PATCH 5/9] Add unit tests for actor run utils Signed-off-by: Daniel Zullo --- tests/actor/test_run_utils.py | 120 ++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 tests/actor/test_run_utils.py diff --git a/tests/actor/test_run_utils.py b/tests/actor/test_run_utils.py new file mode 100644 index 000000000..c05efab77 --- /dev/null +++ b/tests/actor/test_run_utils.py @@ -0,0 +1,120 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Simple tests for the actor runner.""" + +import asyncio +import time +from typing import Iterator + +import async_solipsism +import pytest +import time_machine + +from frequenz.sdk.actor import actor, run + + +@pytest.fixture(autouse=True) +def event_loop() -> Iterator[async_solipsism.EventLoop]: + """Replace the loop with one that doesn't interact with the outside world.""" + loop = async_solipsism.EventLoop() + yield loop + loop.close() + + +@pytest.fixture +def fake_time() -> Iterator[time_machine.Coordinates]: + """Replace real time with a time machine that doesn't automatically tick.""" + with time_machine.travel(0, tick=False) as traveller: + yield traveller + + +@actor +class FaultyActor: + """A test faulty actor.""" + + def __init__(self, name: str) -> None: + """Initialize the faulty actor. + + Args: + name: the name of the faulty actor. + """ + self.name = name + self.is_cancelled = False + + async def run(self) -> None: + """Run the faulty actor. + + Raises: + CancelledError: the exception causes the actor to be cancelled + """ + self.is_cancelled = True + raise asyncio.CancelledError(f"Faulty Actor {self.name} failed") + + +@actor +class SleepyActor: + """A test actor that sleeps a short time.""" + + def __init__(self, name: str, sleep_duration: float) -> None: + """Initialize the sleepy actor. + + Args: + name: the name of the sleepy actor. + sleep_duration: the virtual duration to sleep while running. + """ + self.name = name + self.sleep_duration = sleep_duration + self.is_joined = False + + async def run(self) -> None: + """Run the sleepy actor.""" + while time.time() < self.sleep_duration: + await asyncio.sleep(0.1) + + self.is_joined = True + + +# pylint: disable=redefined-outer-name +async def test_all_actors_done(fake_time: time_machine.Coordinates) -> None: + """Test the completion of all actors.""" + + sleepy_actor_1 = SleepyActor("sleepy_actor_1", sleep_duration=1.0) + sleepy_actor_2 = SleepyActor("sleepy_actor_2", sleep_duration=2.0) + + test_task = asyncio.create_task(run(sleepy_actor_1, sleepy_actor_2)) + + sleep_duration = time.time() + + assert sleep_duration == 0 + assert sleepy_actor_1.is_joined is False + assert sleepy_actor_2.is_joined is False + + while not test_task.done(): + if sleep_duration < 1: + assert sleepy_actor_1.is_joined is False + assert sleepy_actor_2.is_joined is False + elif sleep_duration < 2: + assert sleepy_actor_1.is_joined is True + assert sleepy_actor_2.is_joined is False + elif sleep_duration == 2: + assert sleepy_actor_1.is_joined is True + assert sleepy_actor_2.is_joined is True + + fake_time.shift(0.5) + sleep_duration = time.time() + await asyncio.sleep(1) + + assert sleepy_actor_1.is_joined + assert sleepy_actor_2.is_joined + + +async def test_actors_cancelled() -> None: + """Test the completion of actors being cancelled.""" + + faulty_actors = [FaultyActor(f"faulty_actor_{idx}") for idx in range(5)] + + await asyncio.wait_for(run(*faulty_actors), timeout=1.0) + + for faulty_actor in faulty_actors: + assert faulty_actor.is_cancelled From ae6253ff9c79e310ff6cfbf35e3e58b9746a9acc Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Thu, 23 Feb 2023 19:48:36 +0100 Subject: [PATCH 6/9] Fix the asyncio import in test The module was imported from the wrong package. Signed-off-by: Daniel Zullo --- tests/actor/test_resampling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index af5439995..7a0b0a657 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -2,6 +2,7 @@ # Copyright © 2022 Frequenz Energy-as-a-Service GmbH """Frequenz Python SDK resampling example.""" +import asyncio import dataclasses from datetime import datetime, timezone from typing import Iterator @@ -9,7 +10,6 @@ import async_solipsism import pytest import time_machine -from async_solipsism.socket import asyncio from frequenz.channels import Broadcast from frequenz.sdk.actor import ( From 94b031e9d1820d94b224036e4277cfd86045e405 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Thu, 23 Feb 2023 20:03:36 +0100 Subject: [PATCH 7/9] Rename function fake_loop to event_loop in tests Using the function name `fake_loop` does not replace the loop with one that doesn't interact with the outside world. So the function needs to be named `event_loop` instead. Signed-off-by: Daniel Zullo --- tests/actor/test_resampling.py | 2 +- tests/timeseries/test_resampling.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 7a0b0a657..25d86e4a4 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -26,7 +26,7 @@ @pytest.fixture(autouse=True) -def fake_loop() -> Iterator[async_solipsism.EventLoop]: +def event_loop() -> Iterator[async_solipsism.EventLoop]: """Replace the loop with one that doesn't interact with the outside world.""" loop = async_solipsism.EventLoop() yield loop diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 78ac92c62..b264104ad 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -38,7 +38,7 @@ @pytest.fixture(autouse=True) -def fake_loop() -> Iterator[async_solipsism.EventLoop]: +def event_loop() -> Iterator[async_solipsism.EventLoop]: """Replace the loop with one that doesn't interact with the outside world.""" loop = async_solipsism.EventLoop() yield loop From 0836138dab00545512919f3deec7f05b86d4305e Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Thu, 23 Feb 2023 20:35:18 +0100 Subject: [PATCH 8/9] Fix infinite loop in actor decorator for finished actors Previously, the actor decorator was causing an infinite loop for actors that had finished normally (without exceptions) even when the restart_limit attribute was set to zero. This was particularly problematic in test cases where the restart_limit attribute was intended to limit the number of times an actor can be restarted. To address this issue, the patch modified the code to consider the restart_limit attribute in cases where exceptions different than CancelledError are raised or when an actor finishes normally. This ensures that the actor decorator does not cause an infinite loop for finished actors, and the restart_limit attribute works as expected. Signed-off-by: Daniel Zullo --- src/frequenz/sdk/actor/_decorator.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/frequenz/sdk/actor/_decorator.py b/src/frequenz/sdk/actor/_decorator.py index 0e6f0b97d..e93c74738 100644 --- a/src/frequenz/sdk/actor/_decorator.py +++ b/src/frequenz/sdk/actor/_decorator.py @@ -201,7 +201,12 @@ async def _start_actor(self) -> None: """ logger.debug("Starting actor: %s", cls.__name__) number_of_restarts = 0 - while True: + while ( + self.restart_limit is None or number_of_restarts <= self.restart_limit + ): + if number_of_restarts > 0: + logger.info("Restarting actor: %s", cls.__name__) + try: await super().run() except asyncio.CancelledError: @@ -209,15 +214,10 @@ async def _start_actor(self) -> None: raise except Exception: # pylint: disable=broad-except logger.exception("Actor (%s) crashed", cls.__name__) - if ( - self.restart_limit is None - or number_of_restarts < self.restart_limit - ): - number_of_restarts += 1 - logger.info("Restarting actor: %s", cls.__name__) - else: - logger.info("Shutting down actor: %s", cls.__name__) - break + finally: + number_of_restarts += 1 + + logger.info("Shutting down actor: %s", cls.__name__) async def _stop(self) -> None: """Stop an running actor.""" From dbea183c68b3c216048b686ff12a037c3b47191a Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Fri, 3 Mar 2023 15:34:01 +0100 Subject: [PATCH 9/9] Unset autouse for solipsism loop in tests Setting autouse has no effect for solipsism event_loop() as the method replaces the event loop for all tests in the file where it is used. Signed-off-by: Daniel Zullo --- tests/actor/test_resampling.py | 3 ++- tests/actor/test_run_utils.py | 3 ++- tests/timeseries/test_resampling.py | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 25d86e4a4..e105407a2 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -25,7 +25,8 @@ # -@pytest.fixture(autouse=True) +# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file. +@pytest.fixture() def event_loop() -> Iterator[async_solipsism.EventLoop]: """Replace the loop with one that doesn't interact with the outside world.""" loop = async_solipsism.EventLoop() diff --git a/tests/actor/test_run_utils.py b/tests/actor/test_run_utils.py index c05efab77..90f40ff6e 100644 --- a/tests/actor/test_run_utils.py +++ b/tests/actor/test_run_utils.py @@ -14,7 +14,8 @@ from frequenz.sdk.actor import actor, run -@pytest.fixture(autouse=True) +# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file. +@pytest.fixture() def event_loop() -> Iterator[async_solipsism.EventLoop]: """Replace the loop with one that doesn't interact with the outside world.""" loop = async_solipsism.EventLoop() diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index b264104ad..c0c70138f 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -37,7 +37,8 @@ # pylint: disable=too-many-locals,redefined-outer-name -@pytest.fixture(autouse=True) +# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file. +@pytest.fixture() def event_loop() -> Iterator[async_solipsism.EventLoop]: """Replace the loop with one that doesn't interact with the outside world.""" loop = async_solipsism.EventLoop()