Subscribers¶
A subscriber binds a handler to one subscription. The #[subscriber] macro is the ergonomic way to
declare one; this guide covers the handler contract, the macro forms, and how handlers are mounted.
Grouping handlers into modules is covered in Routing, and how payloads are decoded in
Codecs.
The handler contract¶
A handler is an async fn whose first parameter is a reference to the decoded payload:
use ruststream::runtime::HandlerResult;
use ruststream::subscriber;
#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
println!("got order {}", order.id);
HandlerResult::Ack
}
The macro turns the function into a value named after it (here handle) that implements the
mounting contract. You pass that value to include.
Accepting the context¶
Declare an optional second parameter, &mut Context, to read headers, the subscription name, and
shared state, or to publish from inside the handler:
#[subscriber("orders")]
async fn with_context(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(id) = ctx.headers().correlation_id() {
println!("order {} correlates to {id}", order.id);
}
HandlerResult::Ack
}
The macro resolves the context type itself, so the Context name needs no import when it appears
only in #[subscriber] signatures. The full context surface - the headers working copy, state
access, named publishers - is covered in Context and state.
Acking¶
The return type is anything that converts into a [Settle] (the settlement unit: an outcome plus
an optional post-settle continuation):
| Return value | Result |
|---|---|
HandlerResult::ack() (or HandlerResult::Ack) |
acknowledge; the broker removes the message |
HandlerResult::retry() |
nack with requeue (redeliver later) |
HandlerResult::retry_after(delay) |
nack asking for redelivery no sooner than delay |
HandlerResult::drop() |
nack without requeue (discard or dead-letter) |
() |
always Ack |
Result<(), E> |
Ack on Ok, drop on Err |
Result<HandlerResult, E> |
the inner outcome on Ok, drop on Err |
Settle (any of the above .and_after(..)) |
settle by the outcome, then run the continuation |
On the message itself, ack consumes self, so the type system prevents acking twice.
Post-settle continuations¶
HandlerResult::ack().and_after(fut) attaches a continuation that runs after the message is
settled, without gating the ack decision or affecting redelivery - a non-critical notification,
slow follow-up work, a cache warm-up. Any outcome works (drop().and_after(..) is valid; the
neutral reading is "after settle"):
/// Ack the order, then fire a non-critical follow-up once it is acknowledged. The continuation is
/// at-most-once: if it is lost or panics, the already-acked order is not redelivered.
#[subscriber("orders")]
async fn handle(order: &Order) -> Settle {
let id = order.id;
HandlerResult::ack().and_after(async move {
println!("order {id} acked; notifying downstream");
})
}
The continuation runs on a tracked task set that a graceful shutdown drains (bounded by
shutdown_timeout when set). It is at-most-once: the message is already settled, so a continuation
that panics or is lost on a crash never redelivers it. Do not put work whose loss must redeliver the
message in here; settle by outcome and let the broker retry instead.
In a batch each element settles individually, so the continuation rides per element - a capability the per-message context hook cannot offer:
/// Per-element settlement: id 0 retries with no continuation, every other order acks and schedules
/// its own follow-up. The continuation rides with the element, so a batch settles each message and
/// its side effect independently.
#[subscriber(batch("orders"))]
async fn handle_page(orders: &[Order]) -> Vec<Settle> {
orders
.iter()
.map(|order| {
if order.id == 0 {
HandlerResult::retry().into()
} else {
let id = order.id;
HandlerResult::ack().and_after(async move {
println!("order {id} acked in batch; following up");
})
}
})
.collect()
}
Batch publishing (batch(..) + publish(..)) settles all-or-nothing under one transaction, so
per-element and_after does not compose there; it applies to plain batch and single forms only.
Delayed redelivery¶
retry_after covers the not-ready-yet case (a dependency has not arrived, an upstream is
rate-limited), where an immediate redelivery would just spin:
/// The not-ready-yet case: the upstream has not settled this payment, so an immediate
/// redelivery would just spin. Ask the broker to redeliver no sooner than five seconds from now.
#[subscriber("payments")]
async fn reconcile(payment: &Payment) -> HandlerResult {
if !payment.settled {
return HandlerResult::retry_after(Duration::from_secs(5));
}
println!("payment {} settled", payment.id);
HandlerResult::Ack
}
Under the hood, the runtime honours the delay:
- A broker with native delayed redelivery (the memory broker re-delivers on a timer; a
NATS JetStream broker could
NAKwith delay) hands off to the transport directly. - On a broker without native support, the runtime schedules a deferred re-publish of the
message to its own source subject after
delay, then drops the original. The re-published copy carries the framework retry-count header (RETRY_COUNT_HEADER) incremented; a handler can read it to cap redeliveries.
Opt in per scope with
BrokerScope::retry_via(publisher)
(the publisher must target the same broker). Without a publisher the delay is dropped and the
message is requeued immediately. The deferred re-publish is at-most-once over the delay
window: if the process exits before the timer fires the copy is lost.
The batch_retry_after form composes with
selective batch outcomes: a Vec<HandlerResult> carries
per-element delays, so pending entries back off without holding up the rest of the page:
/// Selective outcomes carry per-element delays: settled payments ack immediately, pending ones
/// come back in thirty seconds without holding up the rest of the page.
#[subscriber(batch("payments"))]
async fn reconcile_page(payments: &[Payment]) -> Vec<HandlerResult> {
payments
.iter()
.map(|payment| {
if payment.settled {
HandlerResult::Ack
} else {
HandlerResult::retry_after(Duration::from_secs(30))
}
})
.collect()
}
Choosing the subscription source¶
By name¶
#[subscriber("orders")] subscribes by name. It works with any broker that implements the
Subscribe capability, which covers the common case.
Broker-specific descriptors¶
When a subscription needs broker-specific options (a consumer group, a durable name, a delivery policy), the broker crate exposes a descriptor type. Use its constructor directly in the decorator:
#[subscriber(OrdersStream::new("orders", "workers"))]
async fn handle(order: &Order) -> HandlerResult {
HandlerResult::Ack
}
The macro reads the descriptor type out of the constructor call, so the compiler checks the
descriptor against the broker it is mounted on. A descriptor is any type that implements
SubscriptionSource<B>; see Broker authors.
The source may also be a builder chain on that constructor, so fluent options stay inline. For example, a broker that ships an options builder lets a handler name a specific stream and consumer right in the decorator:
#[subscriber(StreamOptions::new("orders").durable("audit"))]
async fn handle(order: &Order) -> HandlerResult {
HandlerResult::Ack
}
The macro follows the chain down to the base Type::new(..) to name the source type, so each method
in the chain must return Self. Free functions are rejected, since their type is not visible to the
macro.
Mounting handlers¶
Inside with_broker, mount a definition with include:
include decodes the payload with the codec resolved from the most specific level you set - per
handler, per scope, or the feature-selected default. See
where the codec comes from.
To group handlers per module and mount them all at once, collect them into a Router; see
Routing.
Batch subscribers¶
Wrapping the source in batch(..) switches the handler to whole-batch consumption: it takes the
decoded batch as a slice and runs once per batch the broker delivers - one database round-trip,
one bulk API call.
/// Settles a whole page of orders in one go.
#[subscriber(batch("orders"))]
async fn settle(orders: &[Order]) -> HandlerResult {
println!("settling {} orders", orders.len());
HandlerResult::Ack
}
Mount it with include_batch (the batch counterpart of include):
The source's subscriber must implement the BatchSubscriber capability. Brokers whose clients
batch natively (Kafka poll, JetStream pull consumers) expose it directly, and batch sizing lives
in their subscription options; the in-memory broker batches natively too. For any other source,
the Buffered adapter buffers single deliveries client-side, closing a batch by size or by a
deadline after its first delivery:
// Client-side batching for sources without native batches: close a batch at 128 deliveries or
// 20 ms after its first one. The macro recovers the source type from the constructor path, so
// the generic parameter is spelled out.
#[subscriber(batch(Buffered::<Name>::new(Name::new("orders"))
.max_size(128)
.max_wait(Duration::from_millis(20))))]
async fn drain(orders: &[Order]) -> HandlerResult {
println!("draining {} orders", orders.len());
HandlerResult::Ack
}
The semantics differ from single-message handlers in a few ways:
- Elements that fail to decode are nacked individually (per the decode-failure policy) and never reach the handler; the rest arrive as one slice.
- The returned value settles the batch. A single
HandlerResult(or()/Result<_, E>) settles every message uniformly:Ackacks them all,retry()requeues them all. - Per-message headers are not accessible in the
&[T]form, and the context starts with empty headers. - App-global and router middleware wrap per-message handlers and do not apply to batch registrations.
Selective acknowledgement¶
A common case is partial readiness: some messages of the page are processed, others are not
ready yet and should be redelivered without retrying the ones that succeeded. Return
Vec<HandlerResult> to settle element i of the slice with outcome i:
/// Retries only the entries that are not ready yet; the rest of the page settles.
#[subscriber(batch("orders"))]
async fn reconcile(orders: &[Order]) -> Vec<HandlerResult> {
orders
.iter()
.map(|order| {
if order.id == 0 {
HandlerResult::retry()
} else {
HandlerResult::Ack
}
})
.collect()
}
Broker semantics are exactly those of per-message nack(requeue = true): brokers with
per-message redelivery honour selective retry natively; a positional broker degrades the same
way it does for a single-message nack (the crate of that broker documents it). Returning a
vector whose length does not match the batch is a bug in the handler: the unmatched remainder is
retried (an extra redelivery beats a silently lost message) and the mismatch is logged.
Worker pools¶
The dispatch loop is sequential per subscriber: one delivery is handled and settled before the
next is pulled, so one slow handler stalls the whole subscription. A workers(n) clause
processes up to n deliveries of this subscriber concurrently, each in its own task on the
multi-thread runtime:
/// Up to 16 orders processed concurrently; global order is lost by design.
#[subscriber("orders", workers(16))]
async fn fan_out(order: &Order) -> HandlerResult {
println!("processing order {}", order.id);
HandlerResult::Ack
}
Back-pressure holds: the stream is not polled while n deliveries are in flight, which plays
well with broker-side limits like JetStream max_ack_pending. Global processing order is lost
by design - if any ordering matters, either stay sequential or use keyed lanes:
/// 16 lanes keyed by the message's partition key: per-key order is preserved.
#[subscriber("orders", workers(16, by_key))]
async fn per_customer(order: &Order) -> HandlerResult {
println!("processing order {}", order.id);
HandlerResult::Ack
}
workers(n, by_key) runs n sequential lanes. A delivery goes to the lane its partition key
hashes to, so messages sharing a key never overlap or reorder - the in-process analogue of
Kafka partition semantics. The key comes from the broker message's partition_key() (brokers
whose messages implement the Partitioned capability expose it; the in-memory broker reads the
partition-key header). Messages without a key rotate over the lanes. by_key applies to
single-message subscribers; batch forms take a plain workers(n) pool of batches.
On shutdown, the subscriber stops pulling new deliveries and in-flight workers drain under the
app's shutdown_timeout.
Composition rules¶
The subscriber features compose; these are the rules at each intersection, each pinned by an integration test.
| Combination | Rule |
|---|---|
workers(n) × batch(..) |
The pool holds up to n batches in flight. by_key does not apply to batch forms: lanes order single messages per key, and the macro rejects the combination at compile time. |
retry() / retry_after × workers(n) |
Retried deliveries re-enter the pool and complete like any other delivery. |
retry() / retry_after × workers(n, by_key) |
Retries complete, but per-key ordering across a retry is not promised: a requeued message rejoins the stream from the back. If a key's messages must stay ordered even through failures, the handler has to absorb the failure instead of nacking. |
.transactional() × workers(n) |
One transaction per batch, exactly as in the sequential loop. Concurrent batches run concurrent, independent transactions; each stays atomic (commit-then-ack per batch). |
Buffered × workers(n) |
Batches still close by max_size / max_wait only; the pool bounds how many closed batches are processed at once and never affects batch boundaries. |
publish(..) × workers(n) |
Replies are produced concurrently, so reply order across deliveries is not promised. A failed reply publish retries only its own delivery. |
middleware × batch(..) |
App-global and router layers wrap per-message handlers and do not apply to batch registrations (a per-message layer cannot wrap a whole-batch handler). |
Macro or manual¶
#[subscriber] is sugar over a generic API. The macro generates a typed handler and its metadata;
you can write the same registration by hand with typed (which decodes the payload), a closure or
struct handler, and HandlerMetadata. Both forms below register the same handler.
use ruststream::Name;
use ruststream::codec::JsonCodec;
use ruststream::runtime::{Context, HandlerMetadata, HandlerResult, typed};
// inside with_broker(...):
b.subscribe(
Name::new("orders"),
typed(JsonCodec, |_order: &Order, _ctx: &mut Context| async {
HandlerResult::Ack
}),
HandlerMetadata::typed::<Order>("orders"),
);
Reach for the manual form when a handler needs state the macro cannot express (a struct handler with fields), or to set a non-default decode-failure policy. Otherwise the macro is less to maintain.
Publishers¶
A handler that produces a reply is a publisher. See Publishing and replies.