Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,90 @@

## Summary

<!-- Here goes a general summary of what this release is about -->
The minimum Python supported version was bumped to 3.11 and the `Select` class replaced by the new `select()` function.

## Upgrading

* The minimum supported Python version was bumped to 3.11, downstream projects will need to upgrade too to use this version.

* The `Select` class was replaced by a new `select()` function, with the following improvements:

* Type-safe: proper type hinting by using the new helper type guard `selected_from()`.
* Fixes potential starvation issues.
* Simplifies the interface by providing values one-by-one.
* Guarantees there are no dangling tasks left behind when used as an async context manager.

This new function is an [async iterator](https://docs.python.org/3.11/library/collections.abc.html#collections.abc.AsyncIterator), and makes sure no dangling tasks are left behind after a select loop is done.

Example:
```python
timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))

async for selected in selector(timer1, timer2):
if selected_from(selected, timer1):
# Beware: `selected.value` might raise an exception, you can always
# check for exceptions with `selected.exception` first or use
# a try-except block. You can also quickly check if the receiver was
# stopped and let any other unexpected exceptions bubble up.
if selected.was_stopped():
print("timer1 was stopped")
continue
print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
timer2.stop()
elif selected_from(selected, timer2):
# Explicitly handling of exceptions
match selected.exception:
case ReceiverStoppedError():
print("timer2 was stopped")
case Exception() as exception:
print(f"timer2: exception={exception}")
case None:
# All good, no exception, we can use `selected.value` safely
print(
f"timer2: now={datetime.datetime.now()} "
f"drift={selected.value}"
)
case _ as unhanded:
assert_never(unhanded)
else:
# This is not necessary, as select() will check for exhaustiveness, but
# it is good practice to have it in case you forgot to handle a new
# receiver added to `select()` at a later point in time.
assert False
```

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* A new `select()` function was added, please look at the *Upgrading* section for details.

* A new `Event` utility receiver was added.

This receiver can be made ready manually. It is mainly useful for testing but can also become handy in scenarios where a simple, on-off signal needs to be sent to a select loop for example.

Example:

```python
import asyncio
from frequenz.channels import Receiver
from frequenz.channels.util import Event, select, selected_from

other_receiver: Receiver[int] = ...
exit_event = Event()

async def exit_after_10_seconds() -> None:
asyncio.sleep(10)
exit_event.set()

asyncio.ensure_future(exit_after_10_seconds())

## Bug Fixes
async for selected in selector(exit_event, other_receiver):
if selected_from(selected, exit_event):
break
if selected_from(selected, other_receiver):
print(selected.value)
else:
assert False, "Unknow receiver selected"
```

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
* The `Timer` class now has more descriptive `__str__` and `__repr__` methods.
2 changes: 1 addition & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Anycast(Generic[T]):
thread-safe.

When there are multiple channel receivers, they can be awaited
simultaneously using [Select][frequenz.channels.util.Select],
simultaneously using [select][frequenz.channels.util.select],
[Merge][frequenz.channels.util.Merge] or
[MergeNamed][frequenz.channels.util.MergeNamed].

Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Broadcast(Generic[T]):
are thread-safe. Because of this, `Broadcast` channels are thread-safe.

When there are multiple channel receivers, they can be awaited
simultaneously using [Select][frequenz.channels.util.Select],
simultaneously using [select][frequenz.channels.util.select],
[Merge][frequenz.channels.util.Merge] or
[MergeNamed][frequenz.channels.util.MergeNamed].

Expand Down
28 changes: 22 additions & 6 deletions src/frequenz/channels/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

A module with several utilities to work with channels:

* [Event][frequenz.channels.util.Event]:
A [receiver][frequenz.channels.Receiver] that can be made ready through an event.

* [FileWatcher][frequenz.channels.util.FileWatcher]:
A [receiver][frequenz.channels.Receiver] that watches for file events.

Expand All @@ -20,15 +23,22 @@
* [Timer][frequenz.channels.util.Timer]:
A [receiver][frequenz.channels.Receiver] that ticks at certain intervals.

* [Select][frequenz.channels.util.Select]: A helper to select the next
available message for each [receiver][frequenz.channels.Receiver] in a group
of receivers.
* [select][frequenz.channels.util.select]: Iterate over the values of all
[receivers][frequenz.channels.Receiver] as new values become available.
"""

from ._event import Event
from ._file_watcher import FileWatcher
from ._merge import Merge
from ._merge_named import MergeNamed
from ._select import Select
from ._select import (
Selected,
SelectError,
SelectErrorGroup,
UnhandledSelectedError,
select,
selected_from,
)
from ._timer import (
MissedTickPolicy,
SkipMissedAndDrift,
Expand All @@ -38,13 +48,19 @@
)

__all__ = [
"Event",
"FileWatcher",
"Merge",
"MergeNamed",
"MissedTickPolicy",
"Timer",
"Select",
"SelectError",
"SelectErrorGroup",
"Selected",
"SkipMissedAndDrift",
"SkipMissedAndResync",
"Timer",
"TriggerAllMissed",
"UnhandledSelectedError",
"select",
"selected_from",
]
161 changes: 161 additions & 0 deletions src/frequenz/channels/util/_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# License: MIT
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH

"""A receiver that can be made ready through an event."""


import asyncio as _asyncio

from frequenz.channels import _base_classes, _exceptions


class Event(_base_classes.Receiver[None]):
"""A receiver that can be made ready through an event.

The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait
until [`set()`][frequenz.channels.util.Event.set] is called. At that point the
receiver will wait again after the event is
[`consume()`][frequenz.channels.Receiver.consume]d.

The receiver can be completely stopped by calling
[`stop()`][frequenz.channels.Receiver.stop].

Example:
```python
import asyncio
from frequenz.channels import Receiver
from frequenz.channels.util import Event, select, selected_from

other_receiver: Receiver[int] = ...
exit_event = Event()

async def exit_after_10_seconds() -> None:
asyncio.sleep(10)
exit_event.set()

asyncio.ensure_future(exit_after_10_seconds())

async for selected in select(exit_event, other_receiver):
if selected_from(selected, exit_event):
break
if selected_from(selected, other_receiver):
print(selected.value)
else:
assert False, "Unknow receiver selected"
```
"""

def __init__(self, name: str | None = None) -> None:
"""Create a new instance.

Args:
name: The name of the receiver. If `None` the `id(self)` will be used as
the name. This is only for debugging purposes, it will be shown in the
string representation of the receiver.
"""
self._event: _asyncio.Event = _asyncio.Event()
"""The event that is set when the receiver is ready."""

self._name: str = name or str(id(self))
"""The name of the receiver.

This is for debugging purposes, it will be shown in the string representation
of the receiver.
"""

self._is_set: bool = False
"""Whether the receiver is ready to be consumed.

This is used to differentiate between when the receiver was stopped (the event
is triggered too) but still there is an event to be consumed and when it was
stopped but was not explicitly set().
"""

self._is_stopped: bool = False
"""Whether the receiver is stopped."""

@property
def name(self) -> str:
"""The name of this receiver.

This is for debugging purposes, it will be shown in the string representation
of this receiver.

Returns:
The name of this receiver.
"""
return self._name

@property
def is_set(self) -> bool:
"""Whether this receiver is set (ready).

Returns:
Whether this receiver is set (ready).
"""
return self._is_set

@property
def is_stopped(self) -> bool:
"""Whether this receiver is stopped.

Returns:
Whether this receiver is stopped.
"""
return self._is_stopped

def stop(self) -> None:
"""Stop this receiver."""
self._is_stopped = True
self._event.set()

def set(self) -> None:
"""Trigger the event (make the receiver ready)."""
self._is_set = True
self._event.set()

async def ready(self) -> bool:
"""Wait until this receiver is ready.

Returns:
Whether this receiver is still running.
"""
if self._is_stopped:
return False
await self._event.wait()
return not self._is_stopped

def consume(self) -> None:
"""Consume the event.

This makes this receiver wait again until the event is set again.

Raises:
ReceiverStoppedError: If this receiver is stopped.
"""
if not self._is_set and self._is_stopped:
raise _exceptions.ReceiverStoppedError(self)

assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

self._is_set = False
self._event.clear()

def __str__(self) -> str:
"""Return a string representation of this receiver.

Returns:
A string representation of this receiver.
"""
return f"{type(self).__name__}({self._name!r})"

def __repr__(self) -> str:
"""Return a string representation of this receiver.

Returns:
A string representation of this receiver.
"""
return (
f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
f"is_stopped={self.is_stopped!r}>"
)
Loading