Skip to content

Subscribers

A subscriber binds a handler to one subscription. The #[subscriber] macro is the ergonomic way to declare one; this guide covers the handler contract, the macro forms, and how handlers are mounted. Grouping handlers into modules is covered in Routing, and how payloads are decoded in Codecs.

The handler contract

A handler is an async fn whose first parameter is a reference to the decoded payload:

use ruststream::runtime::HandlerResult;
use ruststream::subscriber;

#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
    println!("got order {}", order.id);
    HandlerResult::Ack
}

The macro turns the function into a value named after it (here handle) that implements the mounting contract. You pass that value to include.

Accepting the context

Declare an optional second parameter, &mut Context, to read headers, the subscription name, and shared state, or to publish from inside the handler:

#[subscriber("orders")]
async fn with_context(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
    if let Some(id) = ctx.headers().correlation_id() {
        println!("order {} correlates to {id}", order.id);
    }
    HandlerResult::Ack
}

The macro resolves the context type itself, so the Context name needs no import when it appears only in #[subscriber] signatures. The full context surface - the headers working copy, state access, named publishers - is covered in Context and state.

Acking

The return type is anything that converts into a [Settle] (the settlement unit: an outcome plus an optional post-settle continuation):

Return value Result
HandlerResult::ack() (or HandlerResult::Ack) acknowledge; the broker removes the message
HandlerResult::retry() nack with requeue (redeliver later)
HandlerResult::retry_after(delay) nack asking for redelivery no sooner than delay
HandlerResult::drop() nack without requeue (discard or dead-letter)
() always Ack
Result<(), E> Ack on Ok, drop on Err
Result<HandlerResult, E> the inner outcome on Ok, drop on Err
Settle (any of the above .and_after(..)) settle by the outcome, then run the continuation

On the message itself, ack consumes self, so the type system prevents acking twice.

Post-settle continuations

HandlerResult::ack().and_after(fut) attaches a continuation that runs after the message is settled, without gating the ack decision or affecting redelivery - a non-critical notification, slow follow-up work, a cache warm-up. Any outcome works (drop().and_after(..) is valid; the neutral reading is "after settle"):

/// Ack the order, then fire a non-critical follow-up once it is acknowledged. The continuation is
/// at-most-once: if it is lost or panics, the already-acked order is not redelivered.
#[subscriber("orders")]
async fn handle(order: &Order) -> Settle {
    let id = order.id;
    HandlerResult::ack().and_after(async move {
        println!("order {id} acked; notifying downstream");
    })
}

The continuation runs on a tracked task set that a graceful shutdown drains (bounded by shutdown_timeout when set). It is at-most-once: the message is already settled, so a continuation that panics or is lost on a crash never redelivers it. Do not put work whose loss must redeliver the message in here; settle by outcome and let the broker retry instead.

In a batch each element settles individually, so the continuation rides per element - a capability the per-message context hook cannot offer:

/// Per-element settlement: id 0 retries with no continuation, every other order acks and schedules
/// its own follow-up. The continuation rides with the element, so a batch settles each message and
/// its side effect independently.
#[subscriber(batch("orders"))]
async fn handle_page(orders: &[Order]) -> Vec<Settle> {
    orders
        .iter()
        .map(|order| {
            if order.id == 0 {
                HandlerResult::retry().into()
            } else {
                let id = order.id;
                HandlerResult::ack().and_after(async move {
                    println!("order {id} acked in batch; following up");
                })
            }
        })
        .collect()
}

Batch publishing (batch(..) + publish(..)) settles all-or-nothing under one transaction, so per-element and_after does not compose there; it applies to plain batch and single forms only.

Delayed redelivery

retry_after covers the not-ready-yet case (a dependency has not arrived, an upstream is rate-limited), where an immediate redelivery would just spin:

/// The not-ready-yet case: the upstream has not settled this payment, so an immediate
/// redelivery would just spin. Ask the broker to redeliver no sooner than five seconds from now.
#[subscriber("payments")]
async fn reconcile(payment: &Payment) -> HandlerResult {
    if !payment.settled {
        return HandlerResult::retry_after(Duration::from_secs(5));
    }
    println!("payment {} settled", payment.id);
    HandlerResult::Ack
}

Under the hood, the runtime honours the delay:

  • A broker with native delayed redelivery (the memory broker re-delivers on a timer; a NATS JetStream broker could NAK with delay) hands off to the transport directly.
  • On a broker without native support, the runtime schedules a deferred re-publish of the message to its own source subject after delay, then drops the original. The re-published copy carries the framework retry-count header (RETRY_COUNT_HEADER) incremented; a handler can read it to cap redeliveries.

Opt in per scope with BrokerScope::retry_via(publisher) (the publisher must target the same broker). Without a publisher the delay is dropped and the message is requeued immediately. The deferred re-publish is at-most-once over the delay window: if the process exits before the timer fires the copy is lost.

The batch_retry_after form composes with selective batch outcomes: a Vec<HandlerResult> carries per-element delays, so pending entries back off without holding up the rest of the page:

/// Selective outcomes carry per-element delays: settled payments ack immediately, pending ones
/// come back in thirty seconds without holding up the rest of the page.
#[subscriber(batch("payments"))]
async fn reconcile_page(payments: &[Payment]) -> Vec<HandlerResult> {
    payments
        .iter()
        .map(|payment| {
            if payment.settled {
                HandlerResult::Ack
            } else {
                HandlerResult::retry_after(Duration::from_secs(30))
            }
        })
        .collect()
}

Choosing the subscription source

By name

#[subscriber("orders")] subscribes by name. It works with any broker that implements the Subscribe capability, which covers the common case.

Broker-specific descriptors

When a subscription needs broker-specific options (a consumer group, a durable name, a delivery policy), the broker crate exposes a descriptor type. Use its constructor directly in the decorator:

#[subscriber(OrdersStream::new("orders", "workers"))]
async fn handle(order: &Order) -> HandlerResult {
    HandlerResult::Ack
}

The macro reads the descriptor type out of the constructor call, so the compiler checks the descriptor against the broker it is mounted on. A descriptor is any type that implements SubscriptionSource<B>; see Broker authors.

The source may also be a builder chain on that constructor, so fluent options stay inline. For example, a broker that ships an options builder lets a handler name a specific stream and consumer right in the decorator:

#[subscriber(StreamOptions::new("orders").durable("audit"))]
async fn handle(order: &Order) -> HandlerResult {
    HandlerResult::Ack
}

The macro follows the chain down to the base Type::new(..) to name the source type, so each method in the chain must return Self. Free functions are rejected, since their type is not visible to the macro.

Mounting handlers

Inside with_broker, mount a definition with include:

RustStream::new(info).with_broker(broker, |b| {
    b.include(handle);
});

include decodes the payload with the codec resolved from the most specific level you set - per handler, per scope, or the feature-selected default. See where the codec comes from.

To group handlers per module and mount them all at once, collect them into a Router; see Routing.

Batch subscribers

Wrapping the source in batch(..) switches the handler to whole-batch consumption: it takes the decoded batch as a slice and runs once per batch the broker delivers - one database round-trip, one bulk API call.

/// Settles a whole page of orders in one go.
#[subscriber(batch("orders"))]
async fn settle(orders: &[Order]) -> HandlerResult {
    println!("settling {} orders", orders.len());
    HandlerResult::Ack
}

Mount it with include_batch (the batch counterpart of include):

b.include_batch(settle);

The source's subscriber must implement the BatchSubscriber capability. Brokers whose clients batch natively (Kafka poll, JetStream pull consumers) expose it directly, and batch sizing lives in their subscription options; the in-memory broker batches natively too. For any other source, the Buffered adapter buffers single deliveries client-side, closing a batch by size or by a deadline after its first delivery:

// Client-side batching for sources without native batches: close a batch at 128 deliveries or
// 20 ms after its first one. The macro recovers the source type from the constructor path, so
// the generic parameter is spelled out.
#[subscriber(batch(Buffered::<Name>::new(Name::new("orders"))
    .max_size(128)
    .max_wait(Duration::from_millis(20))))]
async fn drain(orders: &[Order]) -> HandlerResult {
    println!("draining {} orders", orders.len());
    HandlerResult::Ack
}

The semantics differ from single-message handlers in a few ways:

  • Elements that fail to decode are nacked individually (per the decode-failure policy) and never reach the handler; the rest arrive as one slice.
  • The returned value settles the batch. A single HandlerResult (or () / Result<_, E>) settles every message uniformly: Ack acks them all, retry() requeues them all.
  • Per-message headers are not accessible in the &[T] form, and the context starts with empty headers.
  • App-global and router middleware wrap per-message handlers and do not apply to batch registrations.

Selective acknowledgement

A common case is partial readiness: some messages of the page are processed, others are not ready yet and should be redelivered without retrying the ones that succeeded. Return Vec<HandlerResult> to settle element i of the slice with outcome i:

/// Retries only the entries that are not ready yet; the rest of the page settles.
#[subscriber(batch("orders"))]
async fn reconcile(orders: &[Order]) -> Vec<HandlerResult> {
    orders
        .iter()
        .map(|order| {
            if order.id == 0 {
                HandlerResult::retry()
            } else {
                HandlerResult::Ack
            }
        })
        .collect()
}

Broker semantics are exactly those of per-message nack(requeue = true): brokers with per-message redelivery honour selective retry natively; a positional broker degrades the same way it does for a single-message nack (the crate of that broker documents it). Returning a vector whose length does not match the batch is a bug in the handler: the unmatched remainder is retried (an extra redelivery beats a silently lost message) and the mismatch is logged.

Worker pools

The dispatch loop is sequential per subscriber: one delivery is handled and settled before the next is pulled, so one slow handler stalls the whole subscription. A workers(n) clause processes up to n deliveries of this subscriber concurrently, each in its own task on the multi-thread runtime:

/// Up to 16 orders processed concurrently; global order is lost by design.
#[subscriber("orders", workers(16))]
async fn fan_out(order: &Order) -> HandlerResult {
    println!("processing order {}", order.id);
    HandlerResult::Ack
}

Back-pressure holds: the stream is not polled while n deliveries are in flight, which plays well with broker-side limits like JetStream max_ack_pending. Global processing order is lost by design - if any ordering matters, either stay sequential or use keyed lanes:

/// 16 lanes keyed by the message's partition key: per-key order is preserved.
#[subscriber("orders", workers(16, by_key))]
async fn per_customer(order: &Order) -> HandlerResult {
    println!("processing order {}", order.id);
    HandlerResult::Ack
}

workers(n, by_key) runs n sequential lanes. A delivery goes to the lane its partition key hashes to, so messages sharing a key never overlap or reorder - the in-process analogue of Kafka partition semantics. The key comes from the broker message's partition_key() (brokers whose messages implement the Partitioned capability expose it; the in-memory broker reads the partition-key header). Messages without a key rotate over the lanes. by_key applies to single-message subscribers; batch forms take a plain workers(n) pool of batches.

On shutdown, the subscriber stops pulling new deliveries and in-flight workers drain under the app's shutdown_timeout.

Composition rules

The subscriber features compose; these are the rules at each intersection, each pinned by an integration test.

Combination Rule
workers(n) × batch(..) The pool holds up to n batches in flight. by_key does not apply to batch forms: lanes order single messages per key, and the macro rejects the combination at compile time.
retry() / retry_after × workers(n) Retried deliveries re-enter the pool and complete like any other delivery.
retry() / retry_after × workers(n, by_key) Retries complete, but per-key ordering across a retry is not promised: a requeued message rejoins the stream from the back. If a key's messages must stay ordered even through failures, the handler has to absorb the failure instead of nacking.
.transactional() × workers(n) One transaction per batch, exactly as in the sequential loop. Concurrent batches run concurrent, independent transactions; each stays atomic (commit-then-ack per batch).
Buffered × workers(n) Batches still close by max_size / max_wait only; the pool bounds how many closed batches are processed at once and never affects batch boundaries.
publish(..) × workers(n) Replies are produced concurrently, so reply order across deliveries is not promised. A failed reply publish retries only its own delivery.
middleware × batch(..) App-global and router layers wrap per-message handlers and do not apply to batch registrations (a per-message layer cannot wrap a whole-batch handler).

Macro or manual

#[subscriber] is sugar over a generic API. The macro generates a typed handler and its metadata; you can write the same registration by hand with typed (which decodes the payload), a closure or struct handler, and HandlerMetadata. Both forms below register the same handler.

use ruststream::subscriber;

#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
    println!("got order {}", order.id);
    HandlerResult::Ack
}

// inside with_broker(...):
b.include(handle);
use ruststream::Name;
use ruststream::codec::JsonCodec;
use ruststream::runtime::{Context, HandlerMetadata, HandlerResult, typed};

// inside with_broker(...):
b.subscribe(
    Name::new("orders"),
    typed(JsonCodec, |_order: &Order, _ctx: &mut Context| async {
        HandlerResult::Ack
    }),
    HandlerMetadata::typed::<Order>("orders"),
);

Reach for the manual form when a handler needs state the macro cannot express (a struct handler with fields), or to set a non-default decode-failure policy. Otherwise the macro is less to maintain.

Publishers

A handler that produces a reply is a publisher. See Publishing and replies.