From 00677f2e670f8d386eaa0db1e872778b2d3947b6 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Thu, 9 Apr 2026 19:16:34 +0900 Subject: [PATCH 1/3] Fix cfworkers README examples Update the Cloudflare Workers README snippet to use the current federation API. The example now imports createFederation, calls federation.fetch(), and passes context data and a Message value to processQueuedTask(). AI-assisted with Codex for drafting and editing the documentation changes; reviewed in the workspace. Co-Authored-By: OpenAI Codex --- packages/cfworkers/README.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/cfworkers/README.md b/packages/cfworkers/README.md index a9743337d..760c3091f 100644 --- a/packages/cfworkers/README.md +++ b/packages/cfworkers/README.md @@ -16,7 +16,7 @@ implementations for [Cloudflare Workers]: - [`WorkersMessageQueue`] ~~~~ typescript -import type { Federation } from "@fedify/fedify"; +import { createFederation, type Message } from "@fedify/fedify"; import { WorkersKvStore, WorkersMessageQueue } from "@fedify/cfworkers"; export default { @@ -26,22 +26,25 @@ export default { queue: new WorkersMessageQueue(env.QUEUE_BINDING), // ... other options }); - - return federation.handle(request, { contextData: env }); + + return federation.fetch(request, { contextData: env }); }, - + async queue(batch, env, ctx) { const federation = createFederation({ kv: new WorkersKvStore(env.KV_BINDING), queue: new WorkersMessageQueue(env.QUEUE_BINDING), // ... other options }); - + for (const message of batch.messages) { - await federation.processQueuedTask(message.body); + await federation.processQueuedTask( + env, + message.body as unknown as Message, + ); } } -} satisfies ExportedHandler<{ +} satisfies ExportedHandler<{ KV_BINDING: KVNamespace; QUEUE_BINDING: Queue; }>; From d827be402383f1808e9c6fd3756e72580ee4787d Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Thu, 9 Apr 2026 19:16:48 +0900 Subject: [PATCH 2/3] Fix deployment doc mismatches Correct the Cloudflare Workers deployment example to match the processQueuedTask() signature and repair the broken InProcessMessageQueue anchor in the same guide. AI-assisted with Codex for drafting and editing the documentation changes; reviewed in the workspace. Co-Authored-By: OpenAI Codex --- docs/manual/deploy.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/manual/deploy.md b/docs/manual/deploy.md index dadcecc33..623ae430f 100644 --- a/docs/manual/deploy.md +++ b/docs/manual/deploy.md @@ -163,7 +163,7 @@ on your infrastructure: Development : Use [`MemoryKvStore`](./kv.md#memorykvstore) and - [`InProcessMessageQueue`](./mq.md#inprocessmessagequeu) for quick setup. + [`InProcessMessageQueue`](./mq.md#inprocessmessagequeue) for quick setup. Production : Consider [`PostgresKvStore`](./kv.md#postgreskvstore) and @@ -291,8 +291,8 @@ export default { for (const message of batch.messages) { try { await federation.processQueuedTask( - message.body as unknown as Message, env, + message.body as unknown as Message, ); message.ack(); } catch (error) { From 85fc4b8b2f856a815ece15008be0f7c6e8344573 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Thu, 9 Apr 2026 19:17:40 +0900 Subject: [PATCH 3/3] Document Workers ordering key handling Explain the extra queue consumer step needed for ordering keys on Cloudflare Workers. The manual, deployment guide, and package README now point users to orderingKv and WorkersMessageQueue.processMessage(). AI-assisted with Codex for drafting and editing the documentation changes; reviewed in the workspace. Co-Authored-By: OpenAI Codex --- docs/manual/deploy.md | 7 +++++ docs/manual/mq.md | 50 ++++++++++++++++++++++++++++++++++++ packages/cfworkers/README.md | 8 ++++++ 3 files changed, 65 insertions(+) diff --git a/docs/manual/deploy.md b/docs/manual/deploy.md index 623ae430f..468301ae0 100644 --- a/docs/manual/deploy.md +++ b/docs/manual/deploy.md @@ -303,6 +303,13 @@ export default { }; ~~~~ +If you use queue ordering keys on Cloudflare Workers, instantiate +`WorkersMessageQueue` with an `orderingKv` namespace and call +`WorkersMessageQueue.processMessage()` before +`Federation.processQueuedTask()`. See the +[*`WorkersMessageQueue`* section](./mq.md#workersmessagequeue-cloudflare-workers-only) +for a complete example and caveats about best-effort ordering. + ### Example deployment For a complete working example, see the [Cloudflare Workers example] diff --git a/docs/manual/mq.md b/docs/manual/mq.md index ba2f77dee..f736254b3 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -604,6 +604,56 @@ export default { > process the messages. The `queue()` method is the only way to consume > messages from the queue in Cloudflare Workers. +> [!NOTE] +> If you use `~MessageQueueEnqueueOptions.orderingKey` with +> `WorkersMessageQueue`, you also need to provide a KV namespace for ordering +> locks and pass each raw queue message through +> `~WorkersMessageQueue.processMessage()` before calling +> `Federation.processQueuedTask()`. Otherwise, the ordering key is embedded in +> the message, but not enforced when the worker consumes it. +> +> ~~~~ typescript +> import { createFederationBuilder, type Message } from "@fedify/fedify"; +> import { WorkersKvStore, WorkersMessageQueue } from "@fedify/cfworkers"; +> +> type Env = { +> KV_NAMESPACE: KVNamespace; +> QUEUE_BINDING: Queue; +> ORDERING_KV: KVNamespace; +> }; +> +> const builder = createFederationBuilder(); +> +> export default { +> async queue(batch: MessageBatch, env: Env): Promise { +> const queue = new WorkersMessageQueue(env.QUEUE_BINDING, { +> orderingKv: env.ORDERING_KV, +> }); +> const federation = await builder.build({ +> kv: new WorkersKvStore(env.KV_NAMESPACE), +> queue, +> }); +> +> for (const message of batch.messages) { +> const result = await queue.processMessage(message.body); +> if (!result.shouldProcess) { +> message.retry(); +> continue; +> } +> try { +> await federation.processQueuedTask( +> env, +> result.message as Message, +> ); +> message.ack(); +> } finally { +> await result.release?.(); +> } +> } +> }, +> }; +> ~~~~ + [Cloudflare Workers]: https://workers.cloudflare.com/ [Cloudflare Queues]: https://developers.cloudflare.com/queues/ diff --git a/packages/cfworkers/README.md b/packages/cfworkers/README.md index 760c3091f..c3f004778 100644 --- a/packages/cfworkers/README.md +++ b/packages/cfworkers/README.md @@ -104,6 +104,14 @@ management. > process the messages. The `queue()` method is the only way to consume > messages from the queue in Cloudflare Workers. +> [!NOTE] +> If you use `orderingKey` when enqueueing messages, construct +> `WorkersMessageQueue` with an `orderingKv` namespace and pass each raw queue +> message through `WorkersMessageQueue.processMessage()` before calling +> `Federation.processQueuedTask()`. This acquires and releases the best-effort +> ordering lock for that key. You can also customize the lock behavior with +> the `orderingKeyPrefix` and `orderingLockTtl` options. + [Cloudflare Queues]: https://developers.cloudflare.com/queues/