diff --git a/benchmarks/power_distribution/power_distributor.py b/benchmarks/power_distribution/power_distributor.py index 0223acd11..62ea66906 100644 --- a/benchmarks/power_distribution/power_distributor.py +++ b/benchmarks/power_distribution/power_distributor.py @@ -120,7 +120,6 @@ async def run_test( # pylint: disable=too-many-locals requests_receiver=power_request_channel.new_receiver(), results_sender=power_result_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=2.0, ): tasks: list[Coroutine[Any, Any, list[Result]]] = [] tasks.append(send_requests(batteries, num_requests)) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index f0c32a7ca..8c0b14986 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -446,9 +446,9 @@ def _get_battery_inverter_data( # This should be handled by BatteryStatus. BatteryStatus should not return # this batteries as working. if not all( - self._battery_caches[bat_id].has_value for bat_id in battery_ids + self._battery_caches[bat_id].has_value() for bat_id in battery_ids ) or not all( - self._inverter_caches[inv_id].has_value for inv_id in inverter_ids + self._inverter_caches[inv_id].has_value() for inv_id in inverter_ids ): _logger.error( "Battery %s or inverter %s send no data, yet. They should be not used.", diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index c6473343b..fb0378135 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -12,8 +12,6 @@ """ -import asyncio - from frequenz.channels import Receiver, Sender from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType @@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments requests_receiver: Receiver[Request], results_sender: Sender[Result], component_pool_status_sender: Sender[ComponentPoolStatus], - wait_for_data_sec: float, *, component_category: ComponentCategory, component_type: ComponentType | None = None, @@ -74,8 +71,6 @@ def __init__( # pylint: disable=too-many-arguments results_sender: Sender for sending results to the power manager. component_pool_status_sender: Channel for sending information about which components are expected to be working. - wait_for_data_sec: How long actor should wait before processing first - request. It is a time needed to collect first components data. component_category: The category of the components that this actor is responsible for. component_type: The type of the component of the given category that this @@ -96,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments self._component_type = component_type self._requests_receiver = requests_receiver self._result_sender = results_sender - self._wait_for_data_sec = wait_for_data_sec self._component_manager: ComponentManager if component_category == ComponentCategory.BATTERY: @@ -130,9 +124,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals """ await self._component_manager.start() - # Wait few seconds to get data from the channels created above. - await asyncio.sleep(self._wait_for_data_sec) - async for request in self._requests_receiver: await self._component_manager.distribute_power(request) diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 2749f5f50..fc13e47dd 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -30,8 +30,6 @@ _logger = logging.getLogger(__name__) -_POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC = 2.0 - class PowerWrapper: """Wrapper around the power managing and power distributing actors.""" @@ -80,7 +78,6 @@ def __init__( self._power_distributing_actor: PowerDistributingActor | None = None self._power_managing_actor: _power_managing.PowerManagingActor | None = None - self._pd_wait_for_data_sec: float = _POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC def _start_power_managing_actor(self) -> None: """Start the power managing actor if it is not already running.""" @@ -151,7 +148,6 @@ def _start_power_distributing_actor(self) -> None: requests_receiver=self._power_distribution_requests_channel.new_receiver(), results_sender=self._power_distribution_results_channel.new_sender(), component_pool_status_sender=self.status_channel.new_sender(), - wait_for_data_sec=self._pd_wait_for_data_sec, ) self._power_distributing_actor.start() diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 761b9f5b1..f87e5a502 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -112,7 +112,6 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.0, ) as distributor: assert isinstance(distributor._component_manager, BatteryManager) assert distributor._component_manager._bat_invs_map == { @@ -144,7 +143,6 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.0, ) as distributor: assert isinstance(distributor._component_manager, BatteryManager) assert distributor._component_manager._bat_invs_map == { @@ -210,8 +208,9 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -271,8 +270,9 @@ async def test_power_distributor_exclusion_bounds( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + # zero power requests should pass through despite the exclusion bounds. request = Request( power=Power.zero(), @@ -374,8 +374,9 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -454,7 +455,6 @@ async def test_two_batteries_one_broken_one_inverters( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -510,8 +510,9 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -562,8 +563,9 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -648,8 +650,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -735,8 +738,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -801,7 +805,6 @@ async def test_connected_but_not_requested_batteries( requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), - wait_for_data_sec=0.1, ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -859,8 +862,9 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -914,8 +918,9 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -988,8 +993,9 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1034,7 +1040,6 @@ async def test_power_distributor_invalid_battery_id( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1079,8 +1084,9 @@ async def test_power_distributor_one_user_adjust_power_consume( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1126,8 +1132,9 @@ async def test_power_distributor_one_user_adjust_power_supply( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1173,8 +1180,9 @@ async def test_power_distributor_one_user_adjust_power_success( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1213,8 +1221,9 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + request = Request( power=Power.from_kilowatts(1.2), component_ids=batteries, @@ -1267,8 +1276,9 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), - wait_for_data_sec=0.1, ): + await asyncio.sleep(0.1) # wait for actor to collect data + request = Request( power=Power.from_kilowatts(1.70), component_ids=batteries, diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index 7896711c3..72cb3991a 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -89,6 +89,11 @@ async def _patch_battery_pool_status( If `battery_ids` is not None, the mock will always return `battery_ids`. Otherwise, it will return the requested batteries. """ + mocker.patch.object( + timeseries.battery_pool._methods, # pylint: disable=protected-access + "WAIT_FOR_COMPONENT_DATA_SEC", + 0.1, + ) if battery_ids: mock = MagicMock(spec=ComponentPoolStatusTracker) mock.get_working_components.return_value = battery_ids diff --git a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py index d4d297235..75855e43e 100644 --- a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py +++ b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py @@ -96,13 +96,6 @@ async def _patch_ev_pool_status( ComponentPoolStatus(working=set(mocks.microgrid.evc_ids), uncertain=set()) ) - async def _patch_data_pipeline(self, mocker: MockerFixture) -> None: - mocker.patch( - "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper" - "._pd_wait_for_data_sec", - 0.1, - ) - async def _patch_power_distributing_actor( self, mocker: MockerFixture, @@ -213,7 +206,6 @@ async def test_setting_power( ) await self._init_ev_chargers(mocks) - await self._patch_data_pipeline(mocker) ev_charger_pool = microgrid.ev_charger_pool(priority=5) await self._patch_ev_pool_status(mocks, mocker) await self._patch_power_distributing_actor(mocker) diff --git a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py index 4f6936062..faa00662c 100644 --- a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py +++ b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py @@ -52,13 +52,6 @@ async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[_Mocks]: class TestPVPoolControl: """Test control methods for the PVPool.""" - async def _patch_data_pipeline(self, mocker: MockerFixture) -> None: - mocker.patch( - "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._pv_power_wrapper" - "._pd_wait_for_data_sec", - 0.1, - ) - async def _init_pv_inverters(self, mocks: _Mocks) -> None: now = datetime.now(tz=timezone.utc) for idx, comp_id in enumerate(mocks.microgrid.pv_inverter_ids): @@ -138,7 +131,6 @@ async def test_setting_power( # pylint: disable=too-many-statements ) await self._init_pv_inverters(mocks) - await self._patch_data_pipeline(mocker) pv_pool = microgrid.pv_pool(priority=5) bounds_rx = pv_pool.power_status.new_receiver() await self._recv_reports_until(