Queues & Run Loops
Use actor-local durable queues for serial run loops and request/response workflows.
What are queues?
- Realtime: messages are delivered to a live actor as soon as possible.
- Durable: messages are persisted and survive actor sleep/restart.
- Request/response: clients can wait for a queue completion response.
- Scalable: queues absorb large bursts and handle heavy backpressure safely.
- Local per actor: each actor instance has its own queue storage (scoped by actor key/id).
Queues are commonly referred to as “mailboxes” in other actor frameworks.
What are queues good for?
- Great for any task that changes actor state.
- Helps avoid race conditions by handling work in order.
- Makes complex behavior easier to organize.
Basic queue
This is the default pattern. Define queue names in queues, process them in run, and publish from the client with handle.send(...).
import { actor, queue, setup } from "rivetkit";
export const counter = actor({
state: { value: 0 },
queues: {
increment: queue<{ amount: number }>(),
},
run: async (c) => {
for await (const message of c.queue.iter()) {
c.state.value += message.body.amount;
}
},
});
export const registry = setup({ use: { counter } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.counter.getOrCreate(["main"]);
await handle.send("increment", { amount: 1 });
await handle.send("increment", { amount: 5 });
Completable messages
Use this when you want explicit completion/ack semantics but do not need to return data.
- If processing fails before
message.complete(), the message is not acknowledged. - Unacknowledged messages are retried, so mutation handlers should be idempotent.
status: "timedOut"means sender timeout elapsed beforemessage.complete(...).
import { actor, queue, setup } from "rivetkit";
export const counter = actor({
state: { value: 0 },
queues: {
increment: queue<{ amount: number }, undefined>(),
},
run: async (c) => {
for await (const message of c.queue.iter({ completable: true })) {
c.state.value += message.body.amount;
await message.complete();
}
},
});
export const registry = setup({ use: { counter } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.counter.getOrCreate(["main"]);
const result = await handle.send(
"increment",
{ amount: 5 },
{ wait: true, timeout: 5_000 },
);
if (result.status === "completed") {
console.log("applied");
} else if (result.status === "timedOut") {
console.log("timed out");
}
Request/reply pattern
Use this when the sender needs data back from queued work.
import { actor, queue, setup } from "rivetkit";
export const counter = actor({
state: { value: 0 },
queues: {
increment: queue<{ amount: number }, { value: number }>(),
},
run: async (c) => {
for await (const message of c.queue.iter({ completable: true })) {
c.state.value += message.body.amount;
await message.complete({ value: c.state.value });
}
},
});
export const registry = setup({ use: { counter } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.counter.getOrCreate(["main"]);
const result = await handle.send(
"increment",
{ amount: 5 },
{ wait: true, timeout: 5_000 },
);
if (result.status === "completed") {
console.log(result.response); // { value: 5 }
} else if (result.status === "timedOut") {
console.log("timed out");
}
Queue messages from within an actor
Queueing is useful from inside actor logic too, not just from clients.
- Use actions as entrypoints, then enqueue into the run loop to keep mutations serialized.
- You can also call
c.queue.send(...)from other parts ofrunwhen needed. c.queue.send(...)confirms durable enqueue. It does not wait for processing to finish.
import { actor, queue, setup } from "rivetkit";
export const counter = actor({
state: { value: 0 },
queues: {
mutate: queue<{ delta: number }>(),
},
run: async (c) => {
for await (const message of c.queue.iter()) {
c.state.value += message.body.delta;
}
},
actions: {
increment: async (c, delta: number) => {
await c.queue.send("mutate", { delta });
},
},
});
export const registry = setup({ use: { counter } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.counter.getOrCreate(["main"]);
await handle.increment(5);
await handle.increment(2);
Defining queue schemas
You can define queue types with queue<TMessage, TComplete>() or with schema objects. Schema objects support Standard Schema validators, including Zod.
import { actor, queue, setup } from "rivetkit";
import { z } from "zod";
export const worker = actor({
state: {},
queues: {
// Use generic queue typing when you want compile-time typing only.
foo: queue<{ id: string }, { ok: true }>(),
// Use schema objects when you want runtime validation for message and completion payloads.
bar: {
message: z.object({ id: z.string() }),
complete: z.object({ ok: z.boolean() }),
},
},
});
export const registry = setup({ use: { worker } });
Pull messages with next
Use next when you want to wait for queue messages.
- Waits until messages are available unless timeout is hit.
- Omit
timeoutto wait indefinitely.
import { actor, queue, setup } from "rivetkit";
export const queueWorker = actor({
state: {},
queues: {
jobs: queue<{ id: string }>(),
},
actions: {
pull: async (c) => {
const batch = await c.queue.next({
count: 10,
timeout: 1_000,
});
const oneWithoutTimeout = await c.queue.next({
count: 1,
});
return {
batchCount: batch.length,
oneWithoutTimeoutCount: oneWithoutTimeout.length,
};
},
},
});
export const registry = setup({ use: { queueWorker } });
Poll messages
Use tryNext when you need a non-blocking read.
- Returns immediately and never waits.
import { actor, queue, setup } from "rivetkit";
export const queueWorker = actor({
state: {},
queues: {
jobs: queue<{ id: string }>(),
},
actions: {
poll: async (c) => {
const immediate = await c.queue.tryNext({
count: 10,
});
return {
immediateCount: immediate.length,
};
},
},
});
export const registry = setup({ use: { queueWorker } });
Abort signals
Use signal when your receive loop needs external cancellation semantics in addition to actor shutdown behavior.
import { actor, queue, setup } from "rivetkit";
import { joinSignals } from "rivetkit/utils";
export const signalWorker = actor({
state: {},
createVars: () => ({
cancelController: new AbortController(),
}),
queues: {
jobs: queue<{ id: string }>(),
},
actions: {
cancelProcessing: async (c) => {
c.vars.cancelController.abort();
},
},
run: async (c) => {
while (!c.aborted) {
const signal = joinSignals(c.abortSignal, c.vars.cancelController.signal);
try {
const [message] = await c.queue.next({ signal });
if (!message) continue;
console.log("Processing job", message.body.id);
} catch (error) {
if (c.vars.cancelController.signal.aborted && !c.aborted) {
c.vars.cancelController = new AbortController();
continue;
}
throw error;
}
}
},
});
export const registry = setup({ use: { signalWorker } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.signalWorker.getOrCreate(["main"]);
await handle.send("jobs", { id: "job-1" });
await handle.cancelProcessing();
Multiple queues
Multiple queues let you separate message flows by purpose. By default, receive calls race across all queues when names is not specified. In this pattern, prompt messages run through a streaming loop while stop messages act as control signals on a separate receive path.
Use iter({ names: ["prompt"] }) as the main stream and next({ names: ["stop"] }) as a stop signal.
import { openai } from "@ai-sdk/openai";
import { generateText } from "ai";
import { actor, queue, setup } from "rivetkit";
import { joinSignals } from "rivetkit/utils";
export const agent = actor({
state: { running: false, messages: [] as string[] },
queues: {
prompt: queue<{ prompt: string }, undefined>(),
stop: queue<{ reason?: string }>(),
},
run: async (c) => {
for await (const promptMessage of c.queue.iter({ names: ["prompt"], completable: true })) {
// Create a stop controller for this prompt run.
const stopController = new AbortController();
const runSignal = joinSignals(c.abortSignal, stopController.signal);
// Watch for stop messages while generation is running.
const stopWatcher = c.queue
.next({ names: ["stop"], signal: runSignal })
.then(([stopMessage]) => {
if (stopMessage) stopController.abort();
})
.catch(() => {});
// Generate a response for the prompt.
c.state.running = true;
const { text } = await generateText({
model: openai("gpt-5"),
prompt: promptMessage.body.prompt,
abortSignal: runSignal,
}).finally(async () => {
stopController.abort();
c.state.running = false;
});
// Append each model response to actor state and acknowledge the prompt.
c.state.messages.push(text);
await promptMessage.complete();
}
},
});
export const registry = setup({ use: { agent } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.agent.getOrCreate(["main"]);
await handle.send("prompt", { prompt: "summarize latest logs" });
await handle.send("stop", { reason: "user canceled" });
Sleeping behavior
If an actor has a run handler, it does not sleep while that handler is actively doing work. It only can sleep when the run loop is blocked waiting for queue entries (for example inside iter(...) or next(...)).
This means you can run normal code in run without worrying about sleep interrupting it mid-call.
Debugging
GET /inspector/queue?limit=50returns queue size and pending message metadata.GET /inspector/summaryincludesqueueSizefor quick queue health checks.POST /queue/:namewithwait: trueis useful to verify completable/request-response behavior.- In non-dev mode, inspector endpoints require authorization.
Recommendations
- Actions are for getting data, queue entries are for mutating data.
- Implement connection auth in
onBeforeConnect. See Authentication. - Route most state changes through one queue loop so ordering stays predictable.
- If you need more complex multi-step run loops, consider using workflows.
- Use
c.abortedandc.abortSignalfor graceful shutdown and cancellation. - Add
timeoutwhen callers need bounded wait behavior. - Use
wait: trueonly when the caller actually needs a response.