Skip to content

Commit dd449bf

Browse files
authored
PikaTransport: use numeric subscription IDs internally (#85)
and only convert to strings at the pika API boundary. Write the numeric subscription ID into message headers when passing them to the subscribed callback functions, so they can be used to ACK/NACK later. The assertion preventing acknowledgements with multiple channels was removed This commit also adds some type annotations to the common transport class, and make existing ones more precise. The SampleConsumer is slightly modified to make it easier to use as a base for experiments.
1 parent 564147c commit dd449bf

4 files changed

Lines changed: 92 additions & 89 deletions

File tree

src/workflows/services/sample_consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ def consume_message(self, header, message):
2424
t = (time.time() % 1000) * 1000
2525

2626
if header:
27-
header = json.dumps(header, indent=2) + "\n" + "----------------" + "\n"
27+
header_str = json.dumps(header, indent=2) + "\n" + "----------------" + "\n"
2828
else:
29-
header = ""
29+
header_str = ""
3030
if isinstance(message, dict):
3131
message = json.dumps(message, indent=2) + "\n" + "----------------" + "\n"
3232

3333
self.log.info(
34-
f"=== Consume ====\n{header}{message}\nReceived message @{t:10.3f} ms"
34+
f"=== Consume ====\n{header_str}{message}\nReceived message @{t:10.3f} ms"
3535
)
3636
time.sleep(0.1)

src/workflows/transport/common_transport.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
from __future__ import annotations
2+
13
import decimal
24
import logging
3-
from typing import Any, Callable, Dict, Mapping, Set
5+
from typing import Any, Callable, Dict, Mapping, Optional, Set
46

57
import workflows
68

@@ -12,10 +14,10 @@ class CommonTransport:
1214
subscriptions and transactions."""
1315

1416
__callback_interceptor = None
15-
__subscriptions: Dict[Any, Any] = {}
16-
__subscription_id = 0
17-
__transactions: Set[Any] = set()
18-
__transaction_id = 0
17+
__subscriptions: Dict[int, Dict[str, Any]] = {}
18+
__subscription_id: int = 0
19+
__transactions: Set[int] = set()
20+
__transaction_id: int = 0
1921

2022
log = logging.getLogger("workflows.transport")
2123

@@ -44,7 +46,7 @@ def disconnect(self):
4446
"""Gracefully disconnect the transport class. This function should be
4547
overridden."""
4648

47-
def subscribe(self, channel, callback, **kwargs):
49+
def subscribe(self, channel, callback, **kwargs) -> int:
4850
"""Listen to a queue, notify via callback function.
4951
:param channel: Queue name to subscribe to
5052
:param callback: Function to be called when messages are received.
@@ -76,7 +78,7 @@ def mangled_callback(header, message):
7678
self._subscribe(self.__subscription_id, channel, mangled_callback, **kwargs)
7779
return self.__subscription_id
7880

79-
def unsubscribe(self, subscription, drop_callback_reference=False, **kwargs):
81+
def unsubscribe(self, subscription: int, drop_callback_reference=False, **kwargs):
8082
"""Stop listening to a queue or a broadcast
8183
:param subscription: Subscription ID to cancel
8284
:param drop_callback_reference: Drop the reference to the registered
@@ -98,7 +100,7 @@ def unsubscribe(self, subscription, drop_callback_reference=False, **kwargs):
98100
if drop_callback_reference:
99101
self.drop_callback_reference(subscription)
100102

101-
def drop_callback_reference(self, subscription):
103+
def drop_callback_reference(self, subscription: int):
102104
"""Drop reference to the callback function after unsubscribing.
103105
Any future messages arriving for that subscription will result in
104106
exceptions being raised.
@@ -114,7 +116,7 @@ def drop_callback_reference(self, subscription):
114116
)
115117
del self.__subscriptions[subscription]
116118

117-
def subscribe_broadcast(self, channel, callback, **kwargs):
119+
def subscribe_broadcast(self, channel, callback, **kwargs) -> int:
118120
"""Listen to a broadcast topic, notify via callback function.
119121
:param channel: Topic name to subscribe to
120122
:param callback: Function to be called when messages are received.
@@ -150,7 +152,7 @@ def mangled_callback(header, message):
150152
)
151153
return self.__subscription_id
152154

153-
def subscription_callback(self, subscription) -> MessageCallback:
155+
def subscription_callback(self, subscription: int) -> MessageCallback:
154156
"""Retrieve the callback function for a subscription. Raise a
155157
workflows.Error if the subscription does not exist.
156158
All transport callbacks can be intercepted by setting an
@@ -232,7 +234,7 @@ def raw_broadcast(self, destination, message, **kwargs):
232234
"""
233235
self._broadcast(destination, message, **kwargs)
234236

235-
def ack(self, message, subscription_id=None, **kwargs):
237+
def ack(self, message, subscription_id: Optional[int] = None, **kwargs):
236238
"""Acknowledge receipt of a message. This only makes sense when the
237239
'acknowledgement' flag was set for the relevant subscription.
238240
:param message: ID of the message to be acknowledged, OR a dictionary
@@ -259,7 +261,7 @@ def ack(self, message, subscription_id=None, **kwargs):
259261
)
260262
self._ack(message_id, subscription_id=subscription_id, **kwargs)
261263

262-
def nack(self, message, subscription_id=None, **kwargs):
264+
def nack(self, message, subscription_id: Optional[int] = None, **kwargs):
263265
"""Reject receipt of a message. This only makes sense when the
264266
'acknowledgement' flag was set for the relevant subscription.
265267
:param message: ID of the message to be rejected, OR a dictionary
@@ -282,11 +284,11 @@ def nack(self, message, subscription_id=None, **kwargs):
282284
if not subscription_id:
283285
raise workflows.Error("Cannot reject message without subscription ID")
284286
self.log.debug(
285-
"Rejecting message %s on subscription %s", message_id, subscription_id
287+
"Rejecting message %s on subscription %d", message_id, subscription_id
286288
)
287289
self._nack(message_id, subscription_id=subscription_id, **kwargs)
288290

289-
def transaction_begin(self, **kwargs):
291+
def transaction_begin(self, **kwargs) -> int:
290292
"""Start a new transaction.
291293
:param **kwargs: Further parameters for the transport layer. For example
292294
:return: A transaction ID that can be passed to other functions.
@@ -297,7 +299,7 @@ def transaction_begin(self, **kwargs):
297299
self._transaction_begin(self.__transaction_id, **kwargs)
298300
return self.__transaction_id
299301

300-
def transaction_abort(self, transaction_id, **kwargs):
302+
def transaction_abort(self, transaction_id: int, **kwargs):
301303
"""Abort a transaction and roll back all operations.
302304
:param transaction_id: ID of transaction to be aborted.
303305
:param **kwargs: Further parameters for the transport layer.
@@ -308,7 +310,7 @@ def transaction_abort(self, transaction_id, **kwargs):
308310
self.__transactions.remove(transaction_id)
309311
self._transaction_abort(transaction_id, **kwargs)
310312

311-
def transaction_commit(self, transaction_id, **kwargs):
313+
def transaction_commit(self, transaction_id: int, **kwargs):
312314
"""Commit a transaction.
313315
:param transaction_id: ID of transaction to be committed.
314316
:param **kwargs: Further parameters for the transport layer.
@@ -341,7 +343,7 @@ def _subscribe(self, sub_id: int, channel, callback, **kwargs):
341343
"""
342344
raise NotImplementedError("Transport interface not implemented")
343345

344-
def _subscribe_broadcast(self, sub_id, channel, callback, **kwargs):
346+
def _subscribe_broadcast(self, sub_id: int, channel, callback, **kwargs):
345347
"""Listen to a broadcast topic, notify via callback function.
346348
:param sub_id: ID for this subscription in the transport layer
347349
:param channel: Topic name to subscribe to
@@ -351,7 +353,7 @@ def _subscribe_broadcast(self, sub_id, channel, callback, **kwargs):
351353
"""
352354
raise NotImplementedError("Transport interface not implemented")
353355

354-
def _unsubscribe(self, sub_id):
356+
def _unsubscribe(self, sub_id: int, **kwargs):
355357
"""Stop listening to a queue or a broadcast
356358
:param sub_id: ID for this subscription in the transport layer
357359
"""

0 commit comments

Comments
 (0)