From 695bb9161ec36d001b095ca191bb3b8edb10a9e2 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 19 Jun 2024 12:17:53 +0200 Subject: [PATCH 1/5] Improve speed of battery pool control tests This is done by patching the wait time for component data in the battery pool to a lower value. Signed-off-by: Sahas Subramanian --- .../_battery_pool/test_battery_pool_control_methods.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index 7896711c3..72cb3991a 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -89,6 +89,11 @@ async def _patch_battery_pool_status( If `battery_ids` is not None, the mock will always return `battery_ids`. Otherwise, it will return the requested batteries. """ + mocker.patch.object( + timeseries.battery_pool._methods, # pylint: disable=protected-access + "WAIT_FOR_COMPONENT_DATA_SEC", + 0.1, + ) if battery_ids: mock = MagicMock(spec=ComponentPoolStatusTracker) mock.get_working_components.return_value = battery_ids From 0359bac1df83877e3618199219b3fcdb10e31f68 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 19 Jun 2024 12:31:05 +0200 Subject: [PATCH 2/5] Fix bug in `_battery_manager.py` A function's existence was being checked rather than its (boolean) result. This wasn't an issue in practice, because ever since we've had the PowerManager, the PowerDistributor has not received requests when there's no data. Signed-off-by: Sahas Subramanian --- .../_component_managers/_battery_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index f0c32a7ca..8c0b14986 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -446,9 +446,9 @@ def _get_battery_inverter_data( # This should be handled by BatteryStatus. BatteryStatus should not return # this batteries as working. if not all( - self._battery_caches[bat_id].has_value for bat_id in battery_ids + self._battery_caches[bat_id].has_value() for bat_id in battery_ids ) or not all( - self._inverter_caches[inv_id].has_value for inv_id in inverter_ids + self._inverter_caches[inv_id].has_value() for inv_id in inverter_ids ): _logger.error( "Battery %s or inverter %s send no data, yet. They should be not used.", From b45a810bd7fa9d186effc6ad153148f687aff426 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 19 Jun 2024 12:52:56 +0200 Subject: [PATCH 3/5] Remove patching of power distributor wait time in `*_pool` tests This is because the initial wait time is going to be removed from the power distributor. Signed-off-by: Sahas Subramanian --- .../test_ev_charger_pool_control_methods.py | 8 -------- tests/timeseries/_pv_pool/test_pv_pool_control_methods.py | 8 -------- 2 files changed, 16 deletions(-) diff --git a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py index d4d297235..75855e43e 100644 --- a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py +++ b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py @@ -96,13 +96,6 @@ async def _patch_ev_pool_status( ComponentPoolStatus(working=set(mocks.microgrid.evc_ids), uncertain=set()) ) - async def _patch_data_pipeline(self, mocker: MockerFixture) -> None: - mocker.patch( - "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper" - "._pd_wait_for_data_sec", - 0.1, - ) - async def _patch_power_distributing_actor( self, mocker: MockerFixture, @@ -213,7 +206,6 @@ async def test_setting_power( ) await self._init_ev_chargers(mocks) - await self._patch_data_pipeline(mocker) ev_charger_pool = microgrid.ev_charger_pool(priority=5) await self._patch_ev_pool_status(mocks, mocker) await self._patch_power_distributing_actor(mocker) diff --git a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py index 4f6936062..faa00662c 100644 --- a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py +++ b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py @@ -52,13 +52,6 @@ async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[_Mocks]: class TestPVPoolControl: """Test control methods for the PVPool.""" - async def _patch_data_pipeline(self, mocker: MockerFixture) -> None: - mocker.patch( - "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._pv_power_wrapper" - "._pd_wait_for_data_sec", - 0.1, - ) - async def _init_pv_inverters(self, mocks: _Mocks) -> None: now = datetime.now(tz=timezone.utc) for idx, comp_id in enumerate(mocks.microgrid.pv_inverter_ids): @@ -138,7 +131,6 @@ async def test_setting_power( # pylint: disable=too-many-statements ) await self._init_pv_inverters(mocks) - await self._patch_data_pipeline(mocker) pv_pool = microgrid.pv_pool(priority=5) bounds_rx = pv_pool.power_status.new_receiver() await self._recv_reports_until( From 2337228f1853f3c7afeaaee80847ce0aff8f4909 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 19 Jun 2024 12:57:16 +0200 Subject: [PATCH 4/5] Remove the initial wait time in the power distributor The power distributor is no longer controlled directly by the users, but instead gets requests only from the power manager. So if will not receive requests until there's data. But if it receives requests when there's no data, it will return an `Error` response, saying there's no data. Signed-off-by: Sahas Subramanian --- benchmarks/power_distribution/power_distributor.py | 1 - .../sdk/actor/power_distributing/power_distributing.py | 9 --------- src/frequenz/sdk/microgrid/_power_wrapper.py | 4 ---- 3 files changed, 14 deletions(-) diff --git a/benchmarks/power_distribution/power_distributor.py b/benchmarks/power_distribution/power_distributor.py index 0223acd11..62ea66906 100644 --- a/benchmarks/power_distribution/power_distributor.py +++ b/benchmarks/power_distribution/power_distributor.py @@ -120,7 +120,6 @@ async def run_test( # pylint: disable=too-many-locals requests_receiver=power_request_channel.new_receiver(), results_sender=power_result_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=2.0, ): tasks: list[Coroutine[Any, Any, list[Result]]] = [] tasks.append(send_requests(batteries, num_requests)) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index c6473343b..fb0378135 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -12,8 +12,6 @@ """ -import asyncio - from frequenz.channels import Receiver, Sender from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType @@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments requests_receiver: Receiver[Request], results_sender: Sender[Result], component_pool_status_sender: Sender[ComponentPoolStatus], - wait_for_data_sec: float, *, component_category: ComponentCategory, component_type: ComponentType | None = None, @@ -74,8 +71,6 @@ def __init__( # pylint: disable=too-many-arguments results_sender: Sender for sending results to the power manager. component_pool_status_sender: Channel for sending information about which components are expected to be working. - wait_for_data_sec: How long actor should wait before processing first - request. It is a time needed to collect first components data. component_category: The category of the components that this actor is responsible for. component_type: The type of the component of the given category that this @@ -96,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments self._component_type = component_type self._requests_receiver = requests_receiver self._result_sender = results_sender - self._wait_for_data_sec = wait_for_data_sec self._component_manager: ComponentManager if component_category == ComponentCategory.BATTERY: @@ -130,9 +124,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals """ await self._component_manager.start() - # Wait few seconds to get data from the channels created above. - await asyncio.sleep(self._wait_for_data_sec) - async for request in self._requests_receiver: await self._component_manager.distribute_power(request) diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 2749f5f50..fc13e47dd 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -30,8 +30,6 @@ _logger = logging.getLogger(__name__) -_POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC = 2.0 - class PowerWrapper: """Wrapper around the power managing and power distributing actors.""" @@ -80,7 +78,6 @@ def __init__( self._power_distributing_actor: PowerDistributingActor | None = None self._power_managing_actor: _power_managing.PowerManagingActor | None = None - self._pd_wait_for_data_sec: float = _POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC def _start_power_managing_actor(self) -> None: """Start the power managing actor if it is not already running.""" @@ -151,7 +148,6 @@ def _start_power_distributing_actor(self) -> None: requests_receiver=self._power_distribution_requests_channel.new_receiver(), results_sender=self._power_distribution_results_channel.new_sender(), component_pool_status_sender=self.status_channel.new_sender(), - wait_for_data_sec=self._pd_wait_for_data_sec, ) self._power_distributing_actor.start() From f791f4f35c31dd857d9e82a718c5fef35f91a52b Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 19 Jun 2024 13:00:04 +0200 Subject: [PATCH 5/5] Update PowerDistributor tests to not use the wait flag anymore And instead sleep for the same amount of time, before sending requests to the power distributor. This sleep is needed only in the direct tests of the PowerDistributor. In the `*_pool_control_methods` tests, PowerDistributor receives requests from the PowerManager, which would happen only when there are data, so additional sleep is not required in those tests. Signed-off-by: Sahas Subramanian --- .../test_power_distributing.py | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 761b9f5b1..f87e5a502 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -112,7 +112,6 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.0, ) as distributor: assert isinstance(distributor._component_manager, BatteryManager) assert distributor._component_manager._bat_invs_map == { @@ -144,7 +143,6 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.0, ) as distributor: assert isinstance(distributor._component_manager, BatteryManager) assert distributor._component_manager._bat_invs_map == { @@ -210,8 +208,9 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -271,8 +270,9 @@ async def test_power_distributor_exclusion_bounds( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + # zero power requests should pass through despite the exclusion bounds. request = Request( power=Power.zero(), @@ -374,8 +374,9 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -454,7 +455,6 @@ async def test_two_batteries_one_broken_one_inverters( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -510,8 +510,9 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -562,8 +563,9 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -648,8 +650,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -735,8 +738,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -801,7 +805,6 @@ async def test_connected_but_not_requested_batteries( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -859,8 +862,9 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -914,8 +918,9 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -988,8 +993,9 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1034,7 +1040,6 @@ async def test_power_distributor_invalid_battery_id( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1079,8 +1084,9 @@ async def test_power_distributor_one_user_adjust_power_consume( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1126,8 +1132,9 @@ async def test_power_distributor_one_user_adjust_power_supply( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1173,8 +1180,9 @@ async def test_power_distributor_one_user_adjust_power_success( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1213,8 +1221,9 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + request = Request( power=Power.from_kilowatts(1.2), component_ids=batteries, @@ -1267,8 +1276,9 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + request = Request( power=Power.from_kilowatts(1.70), component_ids=batteries,