Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/workflows/services/sample_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ def consume_message(self, header, message):
t = (time.time() % 1000) * 1000

if header:
header = json.dumps(header, indent=2) + "\n" + "----------------" + "\n"
header_str = json.dumps(header, indent=2) + "\n" + "----------------" + "\n"
else:
header = ""
header_str = ""
if isinstance(message, dict):
message = json.dumps(message, indent=2) + "\n" + "----------------" + "\n"

self.log.info(
f"=== Consume ====\n{header}{message}\nReceived message @{t:10.3f} ms"
f"=== Consume ====\n{header_str}{message}\nReceived message @{t:10.3f} ms"
)
time.sleep(0.1)
38 changes: 20 additions & 18 deletions src/workflows/transport/common_transport.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations
Comment thread
ndevenish marked this conversation as resolved.

import decimal
import logging
from typing import Any, Callable, Dict, Mapping, Set
from typing import Any, Callable, Dict, Mapping, Optional, Set

import workflows

Expand All @@ -12,10 +14,10 @@ class CommonTransport:
subscriptions and transactions."""

__callback_interceptor = None
__subscriptions: Dict[Any, Any] = {}
__subscription_id = 0
__transactions: Set[Any] = set()
__transaction_id = 0
__subscriptions: Dict[int, Dict[str, Any]] = {}
__subscription_id: int = 0
__transactions: Set[int] = set()
__transaction_id: int = 0

log = logging.getLogger("workflows.transport")

Expand Down Expand Up @@ -44,7 +46,7 @@ def disconnect(self):
"""Gracefully disconnect the transport class. This function should be
overridden."""

def subscribe(self, channel, callback, **kwargs):
def subscribe(self, channel, callback, **kwargs) -> int:
"""Listen to a queue, notify via callback function.
:param channel: Queue name to subscribe to
:param callback: Function to be called when messages are received.
Expand Down Expand Up @@ -76,7 +78,7 @@ def mangled_callback(header, message):
self._subscribe(self.__subscription_id, channel, mangled_callback, **kwargs)
return self.__subscription_id

def unsubscribe(self, subscription, drop_callback_reference=False, **kwargs):
def unsubscribe(self, subscription: int, drop_callback_reference=False, **kwargs):
"""Stop listening to a queue or a broadcast
:param subscription: Subscription ID to cancel
:param drop_callback_reference: Drop the reference to the registered
Expand All @@ -98,7 +100,7 @@ def unsubscribe(self, subscription, drop_callback_reference=False, **kwargs):
if drop_callback_reference:
self.drop_callback_reference(subscription)

def drop_callback_reference(self, subscription):
def drop_callback_reference(self, subscription: int):
"""Drop reference to the callback function after unsubscribing.
Any future messages arriving for that subscription will result in
exceptions being raised.
Expand All @@ -114,7 +116,7 @@ def drop_callback_reference(self, subscription):
)
del self.__subscriptions[subscription]

def subscribe_broadcast(self, channel, callback, **kwargs):
def subscribe_broadcast(self, channel, callback, **kwargs) -> int:
"""Listen to a broadcast topic, notify via callback function.
:param channel: Topic name to subscribe to
:param callback: Function to be called when messages are received.
Expand Down Expand Up @@ -150,7 +152,7 @@ def mangled_callback(header, message):
)
return self.__subscription_id

def subscription_callback(self, subscription) -> MessageCallback:
def subscription_callback(self, subscription: int) -> MessageCallback:
"""Retrieve the callback function for a subscription. Raise a
workflows.Error if the subscription does not exist.
All transport callbacks can be intercepted by setting an
Expand Down Expand Up @@ -232,7 +234,7 @@ def raw_broadcast(self, destination, message, **kwargs):
"""
self._broadcast(destination, message, **kwargs)

def ack(self, message, subscription_id=None, **kwargs):
def ack(self, message, subscription_id: Optional[int] = None, **kwargs):
"""Acknowledge receipt of a message. This only makes sense when the
'acknowledgement' flag was set for the relevant subscription.
:param message: ID of the message to be acknowledged, OR a dictionary
Expand All @@ -259,7 +261,7 @@ def ack(self, message, subscription_id=None, **kwargs):
)
self._ack(message_id, subscription_id=subscription_id, **kwargs)

def nack(self, message, subscription_id=None, **kwargs):
def nack(self, message, subscription_id: Optional[int] = None, **kwargs):
"""Reject receipt of a message. This only makes sense when the
'acknowledgement' flag was set for the relevant subscription.
:param message: ID of the message to be rejected, OR a dictionary
Expand All @@ -282,11 +284,11 @@ def nack(self, message, subscription_id=None, **kwargs):
if not subscription_id:
raise workflows.Error("Cannot reject message without subscription ID")
self.log.debug(
"Rejecting message %s on subscription %s", message_id, subscription_id
"Rejecting message %s on subscription %d", message_id, subscription_id
)
self._nack(message_id, subscription_id=subscription_id, **kwargs)

def transaction_begin(self, **kwargs):
def transaction_begin(self, **kwargs) -> int:
"""Start a new transaction.
:param **kwargs: Further parameters for the transport layer. For example
:return: A transaction ID that can be passed to other functions.
Expand All @@ -297,7 +299,7 @@ def transaction_begin(self, **kwargs):
self._transaction_begin(self.__transaction_id, **kwargs)
return self.__transaction_id

def transaction_abort(self, transaction_id, **kwargs):
def transaction_abort(self, transaction_id: int, **kwargs):
"""Abort a transaction and roll back all operations.
:param transaction_id: ID of transaction to be aborted.
:param **kwargs: Further parameters for the transport layer.
Expand All @@ -308,7 +310,7 @@ def transaction_abort(self, transaction_id, **kwargs):
self.__transactions.remove(transaction_id)
self._transaction_abort(transaction_id, **kwargs)

def transaction_commit(self, transaction_id, **kwargs):
def transaction_commit(self, transaction_id: int, **kwargs):
"""Commit a transaction.
:param transaction_id: ID of transaction to be committed.
:param **kwargs: Further parameters for the transport layer.
Expand Down Expand Up @@ -341,7 +343,7 @@ def _subscribe(self, sub_id: int, channel, callback, **kwargs):
"""
raise NotImplementedError("Transport interface not implemented")

def _subscribe_broadcast(self, sub_id, channel, callback, **kwargs):
def _subscribe_broadcast(self, sub_id: int, channel, callback, **kwargs):
"""Listen to a broadcast topic, notify via callback function.
:param sub_id: ID for this subscription in the transport layer
:param channel: Topic name to subscribe to
Expand All @@ -351,7 +353,7 @@ def _subscribe_broadcast(self, sub_id, channel, callback, **kwargs):
"""
raise NotImplementedError("Transport interface not implemented")

def _unsubscribe(self, sub_id):
def _unsubscribe(self, sub_id: int, **kwargs):
"""Stop listening to a queue or a broadcast
:param sub_id: ID for this subscription in the transport layer
"""
Expand Down
Loading