Skip to content

Consume from a Queue

A Cloudflare Queue gives you reliable, at-least-once delivery between Workers. The Worker that sends messages uses Cloudflare.QueueBinding.bind(...) from the Queue producer binding — covered in the resource docs. This tutorial focuses on the consumer side: receiving batches with the Effect-style Cloudflare.messages(queue).subscribe(...) API.

subscribe(...) does both halves in one call: it registers a runtime queue listener on the Worker, and it auto-creates the Cloudflare.QueueConsumer resource that tells Cloudflare to dispatch messages from the queue to this Worker. No separate deploy-time wiring is needed.

By the end you’ll have a Worker that:

  • Sends a JSON message via POST /queue/send.
  • Receives the message in a queue handler registered on the same Worker, persists the body to R2, and acks the batch.
  • Reads the persisted body via GET /queue/result/:id.

Both resources are plain yield* calls — no special config.

alchemy.run.ts
import * as Alchemy from "alchemy";
import * as Cloudflare from "alchemy/Cloudflare";
import * as Effect from "effect/Effect";
export const Queue = Cloudflare.Queue("Queue");
export const Bucket = Cloudflare.R2Bucket("Bucket");

The Queue’s name is generated from the stack/stage/id. The Bucket will store each consumed message at /queue/<id> so the integ test can read it back.

In the Worker init phase, yield the queue resource and ask QueueBinding for a typed sender. The sender exposes send / sendBatch that round-trip through Cloudflare’s runtime.

src/Api.ts
import * as Cloudflare from "alchemy/Cloudflare";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import { Bucket } from "./Bucket.ts";
import { Queue } from "./Queue.ts";
export default Cloudflare.Worker(
"Api",
{ main: import.meta.path },
Effect.gen(function* () {
const bucket = yield* Cloudflare.R2Bucket.bind(Bucket);
const queueResource = yield* Queue;
const queue = yield* Cloudflare.QueueBinding.bind(queueResource);
return { fetch: Effect.gen(function* () { return new Response("ok"); }) };
}).pipe(
Effect.provide(
Layer.mergeAll(
Cloudflare.QueueBindingLive,
Cloudflare.R2BucketBindingLive,
),
),
),
);

queueResource (the resolved Queue resource) is what you pass to messages(...) next — it’s the same handle, not a new one.

Cloudflare.messages(queue).subscribe(handler) registers a queue event listener on the Worker. The handler receives a Stream.Stream<Message<Body>> — one stream per batch — and is expected to return Effect.Effect<void>.

import * as Stream from "effect/Stream";
interface QueueMessageBody {
id: string;
text: string;
sentAt: number;
}
Effect.gen(function* () {
const bucket = yield* Cloudflare.R2Bucket.bind(Bucket);
const queueResource = yield* Queue;
const queue = yield* Cloudflare.QueueBinding.bind(queueResource);
yield* Cloudflare.messages<QueueMessageBody>(queueResource).subscribe(
(stream) =>
Stream.runForEach(stream, (msg) =>
bucket
.put(`/queue/${msg.body.id}`, JSON.stringify(msg.body), {
httpMetadata: { contentType: "application/json" },
})
.pipe(Effect.asVoid),
),
);

Acking is automatic: if the handler succeeds, every message in the batch is ack()ed; if it fails, every message is retry()ed and Cloudflare applies the consumer’s maxRetries and retryDelay before dead-lettering. For finer control, call msg.ack() / msg.retry() per message inside the handler.

messages(...).subscribe(...) is a Context.Service call — QueueEventSourceLive is the layer that registers the listener with the Worker’s runtime context. Add it to the layer stack alongside the other binding lives.

}).pipe(
Effect.provide(
Layer.mergeAll(
Cloudflare.QueueEventSourceLive,
Cloudflare.QueueBindingLive,
Cloudflare.R2BucketBindingLive,
),
),
),

Without the live layer, the subscribe call fails at deploy with Service not found: Cloudflare.Queue.QueueEventSource.

POST /queue/send enqueues a message and returns the generated id. The integ test uses the id to poll for the consumed result.

return {
fetch: Effect.gen(function* () {
const request = yield* HttpServerRequest;
if (request.url === "/queue/send" && request.method === "POST") {
const text = yield* request.text;
const msg: QueueMessageBody = {
id: crypto.randomUUID(),
text,
sentAt: Date.now(),
};
yield* queue.send(msg).pipe(Effect.orDie);
return yield* HttpServerResponse.json({ sent: msg }, { status: 202 });
}
return HttpServerResponse.text("Not Found", { status: 404 });
}),
};

GET /queue/result/:id reads /queue/<id> from the bucket. The consumer runs asynchronously, so the test polls this route with a short backoff until the object appears.

if (request.url.startsWith("/queue/result/") && request.method === "GET") {
const id = request.url.split("/queue/result/")[1];
return yield* bucket.get(`/queue/${id}`).pipe(
Effect.flatMap((object) =>
object === null
? Effect.succeed(HttpServerResponse.text("not yet", { status: 404 }))
: object.text().pipe(
Effect.map((body) =>
HttpServerResponse.text(body, {
headers: { "content-type": "application/json" },
}),
),
),
),
Effect.catchTag("R2Error", (e) =>
Effect.succeed(HttpServerResponse.text(e.message, { status: 500 })),
),
);
}

What’s the difference vs. a native queue() handler?

Section titled “What’s the difference vs. a native queue() handler?”

Cloudflare’s runtime delivers queue events to a queue(batch, env) export on the worker module. You can write that directly — see examples/cloudflare-worker-async for the plain async-handler shape:

export default {
async queue(batch, env) {
for (const msg of batch.messages) {
await env.Bucket.put(`/queue/${msg.body.id}`, ...);
msg.ack();
}
},
};

Cloudflare.messages(queue).subscribe(...) is the same primitive on the Effect side — the Worker bundle’s runtime context routes the dispatch to the registered listener — but you get Effect.gen composition, typed errors, automatic batch ack/retry, and the same surface as AWS.SQS.messages(queue).subscribe(...).