Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmarks/power_distribution/power_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ async def run_test( # pylint: disable=too-many-locals
requests_receiver=power_request_channel.new_receiver(),
results_sender=power_result_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=2.0,
):
tasks: list[Coroutine[Any, Any, list[Result]]] = []
tasks.append(send_requests(batteries, num_requests))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,9 @@ def _get_battery_inverter_data(
# This should be handled by BatteryStatus. BatteryStatus should not return
# this batteries as working.
if not all(
self._battery_caches[bat_id].has_value for bat_id in battery_ids
self._battery_caches[bat_id].has_value() for bat_id in battery_ids
) or not all(
self._inverter_caches[inv_id].has_value for inv_id in inverter_ids
self._inverter_caches[inv_id].has_value() for inv_id in inverter_ids
):
_logger.error(
"Battery %s or inverter %s send no data, yet. They should be not used.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
"""


import asyncio

from frequenz.channels import Receiver, Sender
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType

Expand Down Expand Up @@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments
requests_receiver: Receiver[Request],
results_sender: Sender[Result],
component_pool_status_sender: Sender[ComponentPoolStatus],
wait_for_data_sec: float,
*,
component_category: ComponentCategory,
component_type: ComponentType | None = None,
Expand All @@ -74,8 +71,6 @@ def __init__( # pylint: disable=too-many-arguments
results_sender: Sender for sending results to the power manager.
component_pool_status_sender: Channel for sending information about which
components are expected to be working.
wait_for_data_sec: How long actor should wait before processing first
request. It is a time needed to collect first components data.
component_category: The category of the components that this actor is
responsible for.
component_type: The type of the component of the given category that this
Expand All @@ -96,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments
self._component_type = component_type
self._requests_receiver = requests_receiver
self._result_sender = results_sender
self._wait_for_data_sec = wait_for_data_sec

self._component_manager: ComponentManager
if component_category == ComponentCategory.BATTERY:
Expand Down Expand Up @@ -130,9 +124,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
"""
await self._component_manager.start()

# Wait few seconds to get data from the channels created above.
await asyncio.sleep(self._wait_for_data_sec)

async for request in self._requests_receiver:
await self._component_manager.distribute_power(request)

Expand Down
4 changes: 0 additions & 4 deletions src/frequenz/sdk/microgrid/_power_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

_logger = logging.getLogger(__name__)

_POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC = 2.0


class PowerWrapper:
"""Wrapper around the power managing and power distributing actors."""
Expand Down Expand Up @@ -80,7 +78,6 @@ def __init__(

self._power_distributing_actor: PowerDistributingActor | None = None
self._power_managing_actor: _power_managing.PowerManagingActor | None = None
self._pd_wait_for_data_sec: float = _POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC

def _start_power_managing_actor(self) -> None:
"""Start the power managing actor if it is not already running."""
Expand Down Expand Up @@ -151,7 +148,6 @@ def _start_power_distributing_actor(self) -> None:
requests_receiver=self._power_distribution_requests_channel.new_receiver(),
results_sender=self._power_distribution_results_channel.new_sender(),
component_pool_status_sender=self.status_channel.new_sender(),
wait_for_data_sec=self._pd_wait_for_data_sec,
)
self._power_distributing_actor.start()

Expand Down
50 changes: 30 additions & 20 deletions tests/actor/power_distributing/test_power_distributing.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.0,
) as distributor:
assert isinstance(distributor._component_manager, BatteryManager)
assert distributor._component_manager._bat_invs_map == {
Expand Down Expand Up @@ -144,7 +143,6 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.0,
) as distributor:
assert isinstance(distributor._component_manager, BatteryManager)
assert distributor._component_manager._bat_invs_map == {
Expand Down Expand Up @@ -210,8 +208,9 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this PR just moving the issue elsewhere instead? Or is this synchronization only needed for the tests but when actually used one shouldn't care when there is data and when there isn't?

In any case, having tests depend on random sleep makes me sad (as I see the flakiness coming to bite us eventually). 😢 😞

If we can really really get rid of the need for synchronization, then I completely agree we shouldn't provide any way to synchronize, but otherwise I think having a way to explicitly synchronize that is it not a flaky sleep it is a better approach. In terms of performance, even if it is a hot path, it is just one extra boolean evaluation and branch, which modern CPUs probably won´t even care about because they will do branch prediction so the performance might not even be affected at all after the initialization.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the commit message, these are needed only in tests where we control the PowerDistributor directly.

And they are needed only for those tests that need component data, so that PowerDistributor can return Success. Else it will return Error("No data"), but that's not the case we're testing.

It is only a testing problem, not with behaviour in production.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm still not happy about potential flaky tests, but this is still an improvement over what we had before, so approving 👍


await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -271,8 +270,9 @@ async def test_power_distributor_exclusion_bounds(
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

# zero power requests should pass through despite the exclusion bounds.
request = Request(
power=Power.zero(),
Expand Down Expand Up @@ -374,8 +374,9 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -454,7 +455,6 @@ async def test_two_batteries_one_broken_one_inverters(
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()
Expand Down Expand Up @@ -510,8 +510,9 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -562,8 +563,9 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -648,8 +650,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2(
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -735,8 +738,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds(
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -801,7 +805,6 @@ async def test_connected_but_not_requested_batteries(
requests_receiver=requests_channel.new_receiver(),
component_pool_status_sender=battery_status_channel.new_sender(),
results_sender=results_channel.new_sender(),
wait_for_data_sec=0.1,
):
await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()
Expand Down Expand Up @@ -859,8 +862,9 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -914,8 +918,9 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -988,8 +993,9 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -1034,7 +1040,6 @@ async def test_power_distributor_invalid_battery_id(
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()
Expand Down Expand Up @@ -1079,8 +1084,9 @@ async def test_power_distributor_one_user_adjust_power_consume(
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -1126,8 +1132,9 @@ async def test_power_distributor_one_user_adjust_power_supply(
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -1173,8 +1180,9 @@ async def test_power_distributor_one_user_adjust_power_success(
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

await requests_channel.new_sender().send(request)
result_rx = results_channel.new_receiver()

Expand Down Expand Up @@ -1213,8 +1221,9 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

request = Request(
power=Power.from_kilowatts(1.2),
component_ids=batteries,
Expand Down Expand Up @@ -1267,8 +1276,9 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None:
requests_receiver=requests_channel.new_receiver(),
results_sender=results_channel.new_sender(),
component_pool_status_sender=battery_status_channel.new_sender(),
wait_for_data_sec=0.1,
):
await asyncio.sleep(0.1) # wait for actor to collect data

request = Request(
power=Power.from_kilowatts(1.70),
component_ids=batteries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ async def _patch_battery_pool_status(
If `battery_ids` is not None, the mock will always return `battery_ids`.
Otherwise, it will return the requested batteries.
"""
mocker.patch.object(
timeseries.battery_pool._methods, # pylint: disable=protected-access
"WAIT_FOR_COMPONENT_DATA_SEC",
0.1,
)
Comment on lines +92 to +96

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still have some other waiting for data as a simple sleep? Shouldn't that wait be removed too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the metric streaming in the battery pool. It won't stream any metrics until it has data for all the batteries, etc. So we don't send a new value for capacity or soc as we get data from more and more batteries.

This is independent of the power distributor.

if battery_ids:
mock = MagicMock(spec=ComponentPoolStatusTracker)
mock.get_working_components.return_value = battery_ids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ async def _patch_ev_pool_status(
ComponentPoolStatus(working=set(mocks.microgrid.evc_ids), uncertain=set())
)

async def _patch_data_pipeline(self, mocker: MockerFixture) -> None:
mocker.patch(
"frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper"
"._pd_wait_for_data_sec",
0.1,
)

async def _patch_power_distributing_actor(
self,
mocker: MockerFixture,
Expand Down Expand Up @@ -213,7 +206,6 @@ async def test_setting_power(
)

await self._init_ev_chargers(mocks)
await self._patch_data_pipeline(mocker)
ev_charger_pool = microgrid.ev_charger_pool(priority=5)
await self._patch_ev_pool_status(mocks, mocker)
await self._patch_power_distributing_actor(mocker)
Expand Down
8 changes: 0 additions & 8 deletions tests/timeseries/_pv_pool/test_pv_pool_control_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[_Mocks]:
class TestPVPoolControl:
"""Test control methods for the PVPool."""

async def _patch_data_pipeline(self, mocker: MockerFixture) -> None:
mocker.patch(
"frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._pv_power_wrapper"
"._pd_wait_for_data_sec",
0.1,
)

async def _init_pv_inverters(self, mocks: _Mocks) -> None:
now = datetime.now(tz=timezone.utc)
for idx, comp_id in enumerate(mocks.microgrid.pv_inverter_ids):
Expand Down Expand Up @@ -138,7 +131,6 @@ async def test_setting_power( # pylint: disable=too-many-statements
)

await self._init_pv_inverters(mocks)
await self._patch_data_pipeline(mocker)
pv_pool = microgrid.pv_pool(priority=5)
bounds_rx = pv_pool.power_status.new_receiver()
await self._recv_reports_until(
Expand Down