From 2c05e403f0cc78755648c3b92b757aad97cc5c3a Mon Sep 17 00:00:00 2001 From: Deepika Awasthi Date: Thu, 28 May 2026 11:10:45 -0700 Subject: [PATCH 1/4] Add custom_worker_tuner sample --- custom_worker_tuner/README.md | 71 +++++++++++++++++++++++++++++++ custom_worker_tuner/__init__.py | 0 custom_worker_tuner/downstream.py | 34 +++++++++++++++ custom_worker_tuner/shared.py | 39 +++++++++++++++++ custom_worker_tuner/starter.py | 49 +++++++++++++++++++++ custom_worker_tuner/supplier.py | 70 ++++++++++++++++++++++++++++++ custom_worker_tuner/worker.py | 57 +++++++++++++++++++++++++ 7 files changed, 320 insertions(+) create mode 100644 custom_worker_tuner/README.md create mode 100644 custom_worker_tuner/__init__.py create mode 100644 custom_worker_tuner/downstream.py create mode 100644 custom_worker_tuner/shared.py create mode 100644 custom_worker_tuner/starter.py create mode 100644 custom_worker_tuner/supplier.py create mode 100644 custom_worker_tuner/worker.py diff --git a/custom_worker_tuner/README.md b/custom_worker_tuner/README.md new file mode 100644 index 00000000..4a8ab12f --- /dev/null +++ b/custom_worker_tuner/README.md @@ -0,0 +1,71 @@ +# Custom Worker Tuner + +A `CustomSlotSupplier` is a sample that lets you gate slot grants on whatever you want. +This sample gates on a fake DB pool: the worker only polls for a new +activity when the pool has a free connection. + +## What this sample is +downstream.py - A static-capacity counter. Pretends to be a DB pool. Two methods: increment() (claim a slot, returns False if full), decrement() (release) +supplier.py - The custom slot supplier. On reserve_slot it polls downstream.increment() until it succeeds. On release_slot it calls downstream.decrement() +shared.py - A RunBatch workflow that runs N do_work activities in parallel. The activity just sleeps +worker.py - Wires Downstream + DownstreamAwareSupplier into a WorkerTuner +starter.py - Drives load + +The flow: + +When the downstream is at capacity, `reserve_slot` blocks until a +slot frees up. The excess work piles up on the Temporal server, not +inside the worker. + +## Run + +In three terminals from `samples-python/`: + +```bash +temporal server start-dev # terminal 1 +uv run custom_worker_tuner/worker.py # terminal 2 +uv run custom_worker_tuner/starter.py # terminal 3 +``` + +## What you'll see + +The worker prints one line per slot lifecycle event: + +``` + +TIME EVENT SLOT COUNT DETAIL +──────────────────────────────────────────────────────────── +10:31:49.842 reserve #1 1/10 ready to poll +10:31:49.842 reserve #2 2/10 ready to poll +10:31:49.843 reserve #3 3/10 ready to poll +10:31:49.843 reserve #4 4/10 ready to poll +10:31:49.843 reserve #5 5/10 ready to poll +10:31:49.843 reserve #6 6/10 ready to poll +10:31:56.763 reserve #7 7/10 eager dispatch +10:31:56.763 reserve #8 8/10 eager dispatch +10:31:56.764 reserve #9 9/10 eager dispatch +10:31:56.766 reserve #10 10/10 eager dispatch +10:31:56.767 release #7 9/10 no task arrived +10:31:56.768 release #8 8/10 no task arrived +10:31:56.768 release #9 7/10 no task arrived +10:31:56.768 reserve #11 8/10 eager dispatch +10:31:56.768 reserve #12 9/10 eager dispatch +10:31:56.768 reserve #13 10/10 eager dispatch +10:31:56.771 used #1 10/10 activity running +10:31:56.771 release #10 9/10 no task arrived +``` + +Under load, with more activities than capacity, COUNT pins at +10/10 — that's the supplier refusing to poll past the gate. +we chose 10 because default there are 5 pollers for python sdk + +## Knobs + +worker.py: + +CAPACITY — downstream capacity (the gate) +POLL_INTERVAL_MS — how often the supplier rechecks when full + +starter.py: + +WORKFLOWS, ACTIVITIES_PER_WORKFLOW, SECONDS_PER_ACTIVITY — amount and duration of load diff --git a/custom_worker_tuner/__init__.py b/custom_worker_tuner/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/custom_worker_tuner/downstream.py b/custom_worker_tuner/downstream.py new file mode 100644 index 00000000..f8c77a0a --- /dev/null +++ b/custom_worker_tuner/downstream.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import logging +import threading + +logger = logging.getLogger(__name__) + + +class Downstream: + """A counter with a fixed capacity. Thread-safe.""" + + def __init__(self, allowed_connections: int, name: str = "downstream") -> None: + self.allowed_connections = allowed_connections + self.name = name + self.currently_connected = 0 + self.connection_pool = threading.Lock() + logger.info( + "Downstream ready: name=%s allowed_connections=%d", + name, + allowed_connections, + ) + + def increment(self) -> bool: + """allow one connection. Returns False if at capacity.""" + with self.connection_pool: + if self.currently_connected >= self.allowed_connections: + return False + self.currently_connected += 1 + return True + + def decrement(self) -> None: + """Release one slot. Floored at 0 so a buggy caller can't go negative.""" + with self.connection_pool: + self.currently_connected = max(0, self.currently_connected - 1) diff --git a/custom_worker_tuner/shared.py b/custom_worker_tuner/shared.py new file mode 100644 index 00000000..61cb8c3b --- /dev/null +++ b/custom_worker_tuner/shared.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from datetime import timedelta + +from temporalio import activity, workflow + +TASK_QUEUE = "custom-worker-tuner" + + +@dataclass +class BatchInput: + activities: int + seconds: float + + +@activity.defn +async def do_work(seconds: float) -> None: + """Sleep, simulating an I/O-bound activity.""" + await asyncio.sleep(seconds) + + +@workflow.defn +class RunBatch: + """Runs N do_work activities in parallel.""" + + @workflow.run + async def run(self, inp: BatchInput) -> None: + await asyncio.gather( + *( + workflow.execute_activity( + do_work, + inp.seconds, + start_to_close_timeout=timedelta(minutes=2), + ) + for _ in range(inp.activities) + ) + ) diff --git a/custom_worker_tuner/starter.py b/custom_worker_tuner/starter.py new file mode 100644 index 00000000..84ed770b --- /dev/null +++ b/custom_worker_tuner/starter.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import asyncio +import time +import uuid + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig + +from custom_worker_tuner.shared import TASK_QUEUE, BatchInput, RunBatch + +# Tweak these to push more or less load. +WORKFLOWS = 10 +ACTIVITIES_PER_WORKFLOW = 20 +SECONDS_PER_ACTIVITY = 2.0 + + +async def main() -> None: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + run_id = uuid.uuid4().hex[:8] + inp = BatchInput(activities=ACTIVITIES_PER_WORKFLOW, seconds=SECONDS_PER_ACTIVITY) + total = WORKFLOWS * ACTIVITIES_PER_WORKFLOW + + print( + f"starting {WORKFLOWS} workflows × {ACTIVITIES_PER_WORKFLOW} activities × {SECONDS_PER_ACTIVITY}s" + ) + t0 = time.perf_counter() + + handles = await asyncio.gather( + *( + client.start_workflow( + RunBatch.run, + inp, + id=f"batch-{run_id}-{i}", + task_queue=TASK_QUEUE, + ) + for i in range(WORKFLOWS) + ) + ) + await asyncio.gather(*(h.result() for h in handles)) + + wall = time.perf_counter() - t0 + print(f"done in {wall:.1f}s ({total} activities, {total / wall:.0f}/s)") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/custom_worker_tuner/supplier.py b/custom_worker_tuner/supplier.py new file mode 100644 index 00000000..c43e2779 --- /dev/null +++ b/custom_worker_tuner/supplier.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import asyncio +import itertools +import logging + +from temporalio.worker import ( + CustomSlotSupplier, + SlotMarkUsedContext, + SlotPermit, + SlotReleaseContext, + SlotReserveContext, +) + +from custom_worker_tuner.downstream import Downstream + +logger = logging.getLogger(__name__) + +_slot_id_gen = itertools.count(1) + + +class _Permit(SlotPermit): + """SlotPermit subclass that just carries a sequential id for logs.""" + + def __init__(self, slot_id: int) -> None: + super().__init__() + self.slot_id = slot_id + + +class DownstreamAwareSupplier(CustomSlotSupplier): + def __init__(self, downstream: Downstream, poll_interval_ms: int = 100) -> None: + self.downstream = downstream + self.poll_interval_ms = poll_interval_ms + logger.info( + "DownstreamAwareSupplier ready: downstream=%s poll_interval_ms=%d", + downstream.name, + poll_interval_ms, + ) + + async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit: + """block downstream until it has capacity to get incremented and then grant a slot.""" + slot_id = next(_slot_id_gen) + while not self.downstream.increment(): + await asyncio.sleep(self.poll_interval_ms / 1000.0) + self._log("reserve", slot_id, "ready to poll") + return _Permit(slot_id) + + def try_reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit | None: + """Eager path: can i run this activity right now?""" + if self.downstream.increment(): + slot_id = next(_slot_id_gen) + self._log("reserve", slot_id, "eager dispatch") + return _Permit(slot_id) + return None + + def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None: + """A task arrived for a reserved slot""" + slot_id = getattr(ctx.permit, "slot_id", "?") + self._log("used", slot_id, "activity running") + + def release_slot(self, ctx: SlotReleaseContext) -> None: + """Return the slot to the downstream.""" + slot_id = getattr(ctx.permit, "slot_id", "?") + detail = "no task arrived" if ctx.slot_info is None else "activity done" + self.downstream.decrement() + self._log("release", slot_id, detail) + + def _log(self, event: str, slot_id, note: str) -> None: + count = f"{self.downstream.currently_connected}/{self.downstream.allowed_connections}" + logger.info(f"{event:<8} {count:>5} {note}") diff --git a/custom_worker_tuner/worker.py b/custom_worker_tuner/worker.py new file mode 100644 index 00000000..7340fcf7 --- /dev/null +++ b/custom_worker_tuner/worker.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import FixedSizeSlotSupplier, Worker, WorkerTuner + +from custom_worker_tuner.downstream import Downstream +from custom_worker_tuner.shared import TASK_QUEUE, RunBatch, do_work +from custom_worker_tuner.supplier import DownstreamAwareSupplier + +CAPACITY = 10 # number of connections allowed at a time +POLL_INTERVAL_MS = 500 +LOG_LEVEL = "INFO" # flip to "DEBUG" to see every increment/decrement + + +async def main() -> None: + logging.basicConfig( + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s.%(msecs)03d %(message)s", + datefmt="%H:%M:%S", + ) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + downstream = Downstream(allowed_connections=CAPACITY, name="db") + supplier = DownstreamAwareSupplier(downstream, poll_interval_ms=POLL_INTERVAL_MS) + tuner = WorkerTuner.create_composite( + workflow_supplier=FixedSizeSlotSupplier(100), + activity_supplier=supplier, + local_activity_supplier=FixedSizeSlotSupplier(100), + nexus_supplier=FixedSizeSlotSupplier(100), + ) + + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[RunBatch], + activities=[do_work], + tuner=tuner, + ) + + print(f"\nworker started — capacity={CAPACITY}, poll={POLL_INTERVAL_MS}ms\n") + print("TIME EVENT COUNT DETAIL") + print("─" * 60) + await worker.run() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass From 1ce12944e8a913b8b2fb0e07f2bed14bfc9856ff Mon Sep 17 00:00:00 2001 From: Deepika Awasthi Date: Fri, 29 May 2026 13:16:34 -0700 Subject: [PATCH 2/4] Address review feedback --- custom_worker_tuner/README.md | 2 +- custom_worker_tuner/db_pool.py | 33 ++++++++++++++++++++++++++++++ custom_worker_tuner/downstream.py | 34 ------------------------------- custom_worker_tuner/supplier.py | 32 ++++++++++++----------------- custom_worker_tuner/worker.py | 15 +++++++------- 5 files changed, 54 insertions(+), 62 deletions(-) create mode 100644 custom_worker_tuner/db_pool.py delete mode 100644 custom_worker_tuner/downstream.py diff --git a/custom_worker_tuner/README.md b/custom_worker_tuner/README.md index 4a8ab12f..da2afed6 100644 --- a/custom_worker_tuner/README.md +++ b/custom_worker_tuner/README.md @@ -5,7 +5,7 @@ This sample gates on a fake DB pool: the worker only polls for a new activity when the pool has a free connection. ## What this sample is -downstream.py - A static-capacity counter. Pretends to be a DB pool. Two methods: increment() (claim a slot, returns False if full), decrement() (release) +db_pool.py - A static-capacity counter. Pretends to be a DB pool. Two methods: increment() (claim a slot, returns False if full), decrement() (release) supplier.py - The custom slot supplier. On reserve_slot it polls downstream.increment() until it succeeds. On release_slot it calls downstream.decrement() shared.py - A RunBatch workflow that runs N do_work activities in parallel. The activity just sleeps worker.py - Wires Downstream + DownstreamAwareSupplier into a WorkerTuner diff --git a/custom_worker_tuner/db_pool.py b/custom_worker_tuner/db_pool.py new file mode 100644 index 00000000..f427e488 --- /dev/null +++ b/custom_worker_tuner/db_pool.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import logging +import threading + +logger = logging.getLogger(__name__) + + +class FakeDatabaseConnectionPool: + """Pretend connection pool with a fixed capacity, backed by a Semaphore.""" + + def __init__(self, allowed_connections: int, name: str = "db") -> None: + self.allowed_connections = allowed_connections + self.name = name + self._connection_pool = threading.BoundedSemaphore(allowed_connections) + logger.info( + "FakeDatabaseConnectionPool ready: name=%s allowed_connections=%d", + name, + allowed_connections, + ) + + def acquire(self, blocking: bool = True) -> bool: + """Claim a connection. When blocking, waits until one is free.""" + return self._connection_pool.acquire(blocking=blocking) + + def release(self) -> None: + """Return a connection to the pool.""" + self._connection_pool.release() + + @property + def in_use(self) -> int: + """Derived from the semaphore — single source of truth.""" + return self.allowed_connections - self._connection_pool._value diff --git a/custom_worker_tuner/downstream.py b/custom_worker_tuner/downstream.py deleted file mode 100644 index f8c77a0a..00000000 --- a/custom_worker_tuner/downstream.py +++ /dev/null @@ -1,34 +0,0 @@ -from __future__ import annotations - -import logging -import threading - -logger = logging.getLogger(__name__) - - -class Downstream: - """A counter with a fixed capacity. Thread-safe.""" - - def __init__(self, allowed_connections: int, name: str = "downstream") -> None: - self.allowed_connections = allowed_connections - self.name = name - self.currently_connected = 0 - self.connection_pool = threading.Lock() - logger.info( - "Downstream ready: name=%s allowed_connections=%d", - name, - allowed_connections, - ) - - def increment(self) -> bool: - """allow one connection. Returns False if at capacity.""" - with self.connection_pool: - if self.currently_connected >= self.allowed_connections: - return False - self.currently_connected += 1 - return True - - def decrement(self) -> None: - """Release one slot. Floored at 0 so a buggy caller can't go negative.""" - with self.connection_pool: - self.currently_connected = max(0, self.currently_connected - 1) diff --git a/custom_worker_tuner/supplier.py b/custom_worker_tuner/supplier.py index c43e2779..8625f583 100644 --- a/custom_worker_tuner/supplier.py +++ b/custom_worker_tuner/supplier.py @@ -12,7 +12,7 @@ SlotReserveContext, ) -from custom_worker_tuner.downstream import Downstream +from custom_worker_tuner.db_pool import FakeDatabaseConnectionPool logger = logging.getLogger(__name__) @@ -27,44 +27,38 @@ def __init__(self, slot_id: int) -> None: self.slot_id = slot_id -class DownstreamAwareSupplier(CustomSlotSupplier): - def __init__(self, downstream: Downstream, poll_interval_ms: int = 100) -> None: - self.downstream = downstream - self.poll_interval_ms = poll_interval_ms - logger.info( - "DownstreamAwareSupplier ready: downstream=%s poll_interval_ms=%d", - downstream.name, - poll_interval_ms, - ) +class PoolSlotSupplier(CustomSlotSupplier): + """Hands out slots only when the backing pool has a free connection.""" + + def __init__(self, connection_pool: FakeDatabaseConnectionPool) -> None: + self.connection_pool = connection_pool + logger.info("PoolSlotSupplier ready: connection_pool=%s", connection_pool.name) async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit: - """block downstream until it has capacity to get incremented and then grant a slot.""" + """Block until the pool has capacity, then grant a slot.""" + await asyncio.to_thread(self.connection_pool.acquire) slot_id = next(_slot_id_gen) - while not self.downstream.increment(): - await asyncio.sleep(self.poll_interval_ms / 1000.0) self._log("reserve", slot_id, "ready to poll") return _Permit(slot_id) def try_reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit | None: - """Eager path: can i run this activity right now?""" - if self.downstream.increment(): + """Eager path: try to claim a slot without blocking.""" + if self.connection_pool.acquire(blocking=False): slot_id = next(_slot_id_gen) self._log("reserve", slot_id, "eager dispatch") return _Permit(slot_id) return None def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None: - """A task arrived for a reserved slot""" slot_id = getattr(ctx.permit, "slot_id", "?") self._log("used", slot_id, "activity running") def release_slot(self, ctx: SlotReleaseContext) -> None: - """Return the slot to the downstream.""" slot_id = getattr(ctx.permit, "slot_id", "?") detail = "no task arrived" if ctx.slot_info is None else "activity done" - self.downstream.decrement() + self.connection_pool.release() self._log("release", slot_id, detail) def _log(self, event: str, slot_id, note: str) -> None: - count = f"{self.downstream.currently_connected}/{self.downstream.allowed_connections}" + count = f"{self.connection_pool.in_use}/{self.connection_pool.allowed_connections}" logger.info(f"{event:<8} {count:>5} {note}") diff --git a/custom_worker_tuner/worker.py b/custom_worker_tuner/worker.py index 7340fcf7..39e00726 100644 --- a/custom_worker_tuner/worker.py +++ b/custom_worker_tuner/worker.py @@ -7,13 +7,12 @@ from temporalio.envconfig import ClientConfig from temporalio.worker import FixedSizeSlotSupplier, Worker, WorkerTuner -from custom_worker_tuner.downstream import Downstream +from custom_worker_tuner.db_pool import FakeDatabaseConnectionPool from custom_worker_tuner.shared import TASK_QUEUE, RunBatch, do_work -from custom_worker_tuner.supplier import DownstreamAwareSupplier +from custom_worker_tuner.supplier import PoolSlotSupplier -CAPACITY = 10 # number of connections allowed at a time -POLL_INTERVAL_MS = 500 -LOG_LEVEL = "INFO" # flip to "DEBUG" to see every increment/decrement +CAPACITY = 10 # number of pool connections (and concurrent activities) +LOG_LEVEL = "INFO" async def main() -> None: @@ -27,8 +26,8 @@ async def main() -> None: config.setdefault("target_host", "localhost:7233") client = await Client.connect(**config) - downstream = Downstream(allowed_connections=CAPACITY, name="db") - supplier = DownstreamAwareSupplier(downstream, poll_interval_ms=POLL_INTERVAL_MS) + pool = FakeDatabaseConnectionPool(allowed_connections=CAPACITY, name="db") + supplier = PoolSlotSupplier(pool) tuner = WorkerTuner.create_composite( workflow_supplier=FixedSizeSlotSupplier(100), activity_supplier=supplier, @@ -44,7 +43,7 @@ async def main() -> None: tuner=tuner, ) - print(f"\nworker started — capacity={CAPACITY}, poll={POLL_INTERVAL_MS}ms\n") + print(f"\nworker started — capacity={CAPACITY}\n") print("TIME EVENT COUNT DETAIL") print("─" * 60) await worker.run() From cc615bf004016373dff91de1845bef4aba0cd9bb Mon Sep 17 00:00:00 2001 From: Deepika Awasthi Date: Fri, 29 May 2026 13:36:40 -0700 Subject: [PATCH 3/4] Address review feedback --- custom_worker_tuner/README.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/custom_worker_tuner/README.md b/custom_worker_tuner/README.md index da2afed6..c8c5a435 100644 --- a/custom_worker_tuner/README.md +++ b/custom_worker_tuner/README.md @@ -4,17 +4,19 @@ A `CustomSlotSupplier` is a sample that lets you gate slot grants on whatever yo This sample gates on a fake DB pool: the worker only polls for a new activity when the pool has a free connection. +**Note:** This sample is illustrative only. It shouldn't be used for production grade use-cases. + ## What this sample is -db_pool.py - A static-capacity counter. Pretends to be a DB pool. Two methods: increment() (claim a slot, returns False if full), decrement() (release) -supplier.py - The custom slot supplier. On reserve_slot it polls downstream.increment() until it succeeds. On release_slot it calls downstream.decrement() +db_pool.py - A fixed-capacity fake pool backed by a `BoundedSemaphore`. Two methods: `acquire(blocking=True)` (claim a slot, returns False if full when non-blocking), `release()` (return a slot) +supplier.py - The custom slot supplier. `reserve_slot` blocks on `connection_pool.acquire()` until a slot is free; `try_reserve_slot` does the same non-blocking. `release_slot` calls `connection_pool.release()` shared.py - A RunBatch workflow that runs N do_work activities in parallel. The activity just sleeps -worker.py - Wires Downstream + DownstreamAwareSupplier into a WorkerTuner +worker.py - Wires `FakeDatabaseConnectionPool` + `PoolSlotSupplier` into a WorkerTuner starter.py - Drives load The flow: -When the downstream is at capacity, `reserve_slot` blocks until a -slot frees up. The excess work piles up on the Temporal server, not +When the pool is at capacity, `reserve_slot` blocks until a +connection frees up. The excess work piles up on the Temporal server, not inside the worker. ## Run @@ -63,8 +65,7 @@ we chose 10 because default there are 5 pollers for python sdk worker.py: -CAPACITY — downstream capacity (the gate) -POLL_INTERVAL_MS — how often the supplier rechecks when full +CAPACITY — pool capacity (the gate) starter.py: From 2a9505ab49e15f67b2ae6b6d97205dc95463e551 Mon Sep 17 00:00:00 2001 From: Deepika Awasthi Date: Fri, 29 May 2026 13:36:40 -0700 Subject: [PATCH 4/4] Address review feedback --- custom_worker_tuner/README.md | 58 +++++++++++++++++++++------------ custom_worker_tuner/db_pool.py | 30 +++++++++++++---- custom_worker_tuner/supplier.py | 28 +++++++++------- custom_worker_tuner/worker.py | 5 +-- 4 files changed, 81 insertions(+), 40 deletions(-) diff --git a/custom_worker_tuner/README.md b/custom_worker_tuner/README.md index c8c5a435..a03416c6 100644 --- a/custom_worker_tuner/README.md +++ b/custom_worker_tuner/README.md @@ -34,27 +34,43 @@ uv run custom_worker_tuner/starter.py # terminal 3 The worker prints one line per slot lifecycle event: ``` - -TIME EVENT SLOT COUNT DETAIL -──────────────────────────────────────────────────────────── -10:31:49.842 reserve #1 1/10 ready to poll -10:31:49.842 reserve #2 2/10 ready to poll -10:31:49.843 reserve #3 3/10 ready to poll -10:31:49.843 reserve #4 4/10 ready to poll -10:31:49.843 reserve #5 5/10 ready to poll -10:31:49.843 reserve #6 6/10 ready to poll -10:31:56.763 reserve #7 7/10 eager dispatch -10:31:56.763 reserve #8 8/10 eager dispatch -10:31:56.764 reserve #9 9/10 eager dispatch -10:31:56.766 reserve #10 10/10 eager dispatch -10:31:56.767 release #7 9/10 no task arrived -10:31:56.768 release #8 8/10 no task arrived -10:31:56.768 release #9 7/10 no task arrived -10:31:56.768 reserve #11 8/10 eager dispatch -10:31:56.768 reserve #12 9/10 eager dispatch -10:31:56.768 reserve #13 10/10 eager dispatch -10:31:56.771 used #1 10/10 activity running -10:31:56.771 release #10 9/10 no task arrived +TIME EVENT COUNT QUEUE DETAIL +(COUNT shows before→after / capacity; QUEUE = tasks parked waiting) +───────────────────────────────────────────────────────────────── +12:30:32.591 reserve 0→ 1/10 0 ready to poll +12:30:32.591 reserve 1→ 2/10 0 ready to poll +12:30:32.592 reserve 2→ 3/10 0 ready to poll +12:30:32.592 reserve 3→ 4/10 0 ready to poll +12:30:32.592 reserve 4→ 5/10 0 ready to poll +12:30:32.592 reserve 5→ 6/10 0 ready to poll +12:30:40.501 reserve 6→ 7/10 0 eager dispatch +12:30:40.502 reserve 7→ 8/10 0 eager dispatch +12:30:40.502 reserve 8→ 9/10 0 eager dispatch +12:30:40.505 release 9→ 8/10 0 no task arrived +12:30:40.506 release 8→ 7/10 0 no task arrived +12:30:40.506 release 7→ 6/10 0 no task arrived +12:30:40.510 used 6→ 6/10 0 activity running +12:30:40.510 reserve 6→ 7/10 0 eager dispatch +12:30:40.511 reserve 7→ 8/10 0 eager dispatch +12:30:40.511 reserve 8→ 9/10 0 eager dispatch +12:30:40.514 reserve 9→10/10 0 ready to poll +12:30:40.520 release 10→ 9/10 0 no task arrived +12:30:40.520 release 9→ 8/10 0 no task arrived +12:30:40.520 release 8→ 7/10 0 no task arrived +12:30:40.520 used 7→ 7/10 0 activity running +12:30:40.520 reserve 7→ 8/10 0 eager dispatch +12:30:40.520 reserve 8→ 9/10 0 eager dispatch +12:30:40.520 reserve 9→10/10 0 eager dispatch +12:30:40.525 release 10→10/10 0 no task arrived +12:30:40.525 release 10→ 9/10 0 no task arrived +12:30:40.525 release 9→ 8/10 0 no task arrived +12:30:40.528 reserve 7→ 8/10 0 ready to poll +12:30:40.530 used 8→ 8/10 0 activity running +12:30:40.535 reserve 8→ 9/10 0 eager dispatch +12:30:40.537 reserve 9→10/10 0 eager dispatch +12:30:40.539 used 10→10/10 1 activity running +12:30:40.540 used 10→10/10 1 activity running +12:30:40.541 used 10→10/10 1 activity running ``` Under load, with more activities than capacity, COUNT pins at diff --git a/custom_worker_tuner/db_pool.py b/custom_worker_tuner/db_pool.py index f427e488..15a196c6 100644 --- a/custom_worker_tuner/db_pool.py +++ b/custom_worker_tuner/db_pool.py @@ -1,27 +1,37 @@ from __future__ import annotations +import asyncio import logging -import threading logger = logging.getLogger(__name__) class FakeDatabaseConnectionPool: - """Pretend connection pool with a fixed capacity, backed by a Semaphore.""" + """Pretend connection pool with a fixed capacity, backed by an asyncio.Semaphore.""" def __init__(self, allowed_connections: int, name: str = "db") -> None: self.allowed_connections = allowed_connections self.name = name - self._connection_pool = threading.BoundedSemaphore(allowed_connections) + self._connection_pool = asyncio.Semaphore(allowed_connections) logger.info( "FakeDatabaseConnectionPool ready: name=%s allowed_connections=%d", name, allowed_connections, ) - def acquire(self, blocking: bool = True) -> bool: - """Claim a connection. When blocking, waits until one is free.""" - return self._connection_pool.acquire(blocking=blocking) + async def acquire(self) -> None: + """Claim a connection, awaiting until one is free.""" + await self._connection_pool.acquire() + + def try_acquire(self) -> bool: + """Non-blocking claim, try_reserve_slot will call this + if the pool is full - it will return false + if it is not full - total pool connections - 1 and slot granted to activity + """ + if self._connection_pool.locked(): + return False + self._connection_pool._value -= 1 + return True def release(self) -> None: """Return a connection to the pool.""" @@ -31,3 +41,11 @@ def release(self) -> None: def in_use(self) -> int: """Derived from the semaphore — single source of truth.""" return self.allowed_connections - self._connection_pool._value + + @property + def queued(self) -> int: + """How many tasks are parked waiting for a free slot.""" + waiters = self._connection_pool._waiters + if not waiters: + return 0 + return sum(1 for w in waiters if not w.done()) diff --git a/custom_worker_tuner/supplier.py b/custom_worker_tuner/supplier.py index 8625f583..fa8961c1 100644 --- a/custom_worker_tuner/supplier.py +++ b/custom_worker_tuner/supplier.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import itertools import logging @@ -36,29 +35,36 @@ def __init__(self, connection_pool: FakeDatabaseConnectionPool) -> None: async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit: """Block until the pool has capacity, then grant a slot.""" - await asyncio.to_thread(self.connection_pool.acquire) + await self.connection_pool.acquire() + after = self.connection_pool.in_use slot_id = next(_slot_id_gen) - self._log("reserve", slot_id, "ready to poll") + self._log("reserve", slot_id, "ready to poll", after - 1, after) return _Permit(slot_id) def try_reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit | None: """Eager path: try to claim a slot without blocking.""" - if self.connection_pool.acquire(blocking=False): + if self.connection_pool.try_acquire(): + after = self.connection_pool.in_use slot_id = next(_slot_id_gen) - self._log("reserve", slot_id, "eager dispatch") + self._log("reserve", slot_id, "eager dispatch", after - 1, after) return _Permit(slot_id) return None def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None: slot_id = getattr(ctx.permit, "slot_id", "?") - self._log("used", slot_id, "activity running") + in_use = self.connection_pool.in_use + self._log("used", slot_id, "activity running", in_use, in_use) def release_slot(self, ctx: SlotReleaseContext) -> None: slot_id = getattr(ctx.permit, "slot_id", "?") detail = "no task arrived" if ctx.slot_info is None else "activity done" + before = self.connection_pool.in_use self.connection_pool.release() - self._log("release", slot_id, detail) - - def _log(self, event: str, slot_id, note: str) -> None: - count = f"{self.connection_pool.in_use}/{self.connection_pool.allowed_connections}" - logger.info(f"{event:<8} {count:>5} {note}") + after = self.connection_pool.in_use + self._log("release", slot_id, detail, before, after) + + def _log(self, event: str, slot_id, note: str, before: int, after: int) -> None: + cap = self.connection_pool.allowed_connections + count = f"{before:>2}→{after:>2}/{cap}" + queued = self.connection_pool.queued + logger.info(f"{event:<8} {count} {queued:>5} {note}") diff --git a/custom_worker_tuner/worker.py b/custom_worker_tuner/worker.py index 39e00726..1a0d5669 100644 --- a/custom_worker_tuner/worker.py +++ b/custom_worker_tuner/worker.py @@ -44,8 +44,9 @@ async def main() -> None: ) print(f"\nworker started — capacity={CAPACITY}\n") - print("TIME EVENT COUNT DETAIL") - print("─" * 60) + print("TIME EVENT COUNT QUEUE DETAIL") + print("(COUNT shows before→after / capacity; QUEUE = tasks parked waiting)") + print("─" * 65) await worker.run()