Skip to content

Failure policy

Two things can go wrong before your handler logic even gets a chance: the handler body can panic, and an incoming payload can fail to decode. RustStream settles both through one vocabulary, set per subscriber with the on_failure(..) clause, but with different defaults, because the two failures mean different things.

Defaults

With no clause, a subscriber uses the built-in defaults:

  • panic = fail_fast: a panic is an internal bug. The runtime logs a loud error naming the subscription, then starts a graceful shutdown (it cancels the shutdown token and runs the shutdown hooks) and makes run return Err with a non-zero exit. An orchestrator restarts the service and the operator lands in the logs, instead of a subscriber that silently stopped consuming or an invisible redelivery loop.
  • decode = drop: a decode failure is usually bad external input. Dropping the one bad message (a nack without requeue) keeps a single malformed payload from taking the consumer down, which on an untrusted topic would be a poison-message or denial-of-service footgun.
/// No clause: the defaults apply. A panic in the body fails fast (a loud error, then a graceful
/// shutdown so an orchestrator restarts the service); a payload that cannot decode is dropped.
#[subscriber("orders")]
async fn process(order: &Order) -> HandlerResult {
    println!("processing order {}", order.id);
    HandlerResult::Ack
}

Setting a policy

on_failure(panic = .., decode = ..) overrides either key (both are optional; an omitted key keeps its default):

/// An untrusted topic: a handler bug should still take the service down (fail fast), but a
/// malformed message must not, so decode failures requeue instead of dropping or failing.
#[subscriber("ingest", on_failure(panic = fail_fast, decode = retry))]
async fn ingest(order: &Order) -> HandlerResult {
    println!("ingesting order {}", order.id);
    HandlerResult::Ack
}

The policy values are:

Value Effect
fail_fast Log, start a graceful shutdown, and make run return Err.
drop Drop the message (nack without requeue).
retry Requeue the message (nack with requeue).
retry_after(<dur>) Requeue after a delay (see the delayed-redelivery section in Subscribers).
skip Acknowledge the failed message to move past it. Not success: the message is gone, unprocessed.

skip is the deliberate poison-message escape hatch: it advances past a message that cannot be processed rather than dropping or retrying it.

/// A poison-tolerant consumer: move past anything that cannot be processed. A panic acks the
/// offending message and keeps consuming; a decode failure does the same.
#[subscriber("audit", on_failure(panic = skip, decode = skip))]
async fn audit(order: &Order) -> HandlerResult {
    println!("auditing order {}", order.id);
    HandlerResult::Ack
}

How it behaves

  • A panic is caught (catch_unwind), so a panicking handler never kills the dispatch loop. Under fail_fast the message is left unsettled, so a broker with redelivery hands it back after the restart; under the other policies it is settled and the subscriber keeps consuming. Catching only applies under an unwinding panic profile; with panic = "abort" the process is already gone.
  • A decode failure surfaces as a Result, so no unwinding is involved; the decode policy settles the message directly. The same on_decode_failure policy can also be set when building a handler by hand through the typed adapter (see Codecs).
  • On the batch path the policy applies per batch decode (each element decodes independently) and to a panic in the batch handler. Per-element panic handling is out of scope.

This is the full example: examples/failure_policy.rs.