Failure policy¶
Two things can go wrong before your handler logic even gets a chance: the handler body can
panic, and an incoming payload can fail to decode. RustStream settles both through one
vocabulary, set per subscriber with the on_failure(..) clause, but with different defaults,
because the two failures mean different things.
Defaults¶
With no clause, a subscriber uses the built-in defaults:
- panic =
fail_fast: a panic is an internal bug. The runtime logs a loud error naming the subscription, then starts a graceful shutdown (it cancels the shutdown token and runs the shutdown hooks) and makesrunreturnErrwith a non-zero exit. An orchestrator restarts the service and the operator lands in the logs, instead of a subscriber that silently stopped consuming or an invisible redelivery loop. - decode =
drop: a decode failure is usually bad external input. Dropping the one bad message (a nack without requeue) keeps a single malformed payload from taking the consumer down, which on an untrusted topic would be a poison-message or denial-of-service footgun.
/// No clause: the defaults apply. A panic in the body fails fast (a loud error, then a graceful
/// shutdown so an orchestrator restarts the service); a payload that cannot decode is dropped.
#[subscriber("orders")]
async fn process(order: &Order) -> HandlerResult {
println!("processing order {}", order.id);
HandlerResult::Ack
}
Setting a policy¶
on_failure(panic = .., decode = ..) overrides either key (both are optional; an omitted key keeps
its default):
/// An untrusted topic: a handler bug should still take the service down (fail fast), but a
/// malformed message must not, so decode failures requeue instead of dropping or failing.
#[subscriber("ingest", on_failure(panic = fail_fast, decode = retry))]
async fn ingest(order: &Order) -> HandlerResult {
println!("ingesting order {}", order.id);
HandlerResult::Ack
}
The policy values are:
| Value | Effect |
|---|---|
fail_fast |
Log, start a graceful shutdown, and make run return Err. |
drop |
Drop the message (nack without requeue). |
retry |
Requeue the message (nack with requeue). |
retry_after(<dur>) |
Requeue after a delay (see the delayed-redelivery section in Subscribers). |
skip |
Acknowledge the failed message to move past it. Not success: the message is gone, unprocessed. |
skip is the deliberate poison-message escape hatch: it advances past a message that cannot be
processed rather than dropping or retrying it.
/// A poison-tolerant consumer: move past anything that cannot be processed. A panic acks the
/// offending message and keeps consuming; a decode failure does the same.
#[subscriber("audit", on_failure(panic = skip, decode = skip))]
async fn audit(order: &Order) -> HandlerResult {
println!("auditing order {}", order.id);
HandlerResult::Ack
}
How it behaves¶
- A panic is caught (
catch_unwind), so a panicking handler never kills the dispatch loop. Underfail_fastthe message is left unsettled, so a broker with redelivery hands it back after the restart; under the other policies it is settled and the subscriber keeps consuming. Catching only applies under an unwinding panic profile; withpanic = "abort"the process is already gone. - A decode failure surfaces as a
Result, so no unwinding is involved; thedecodepolicy settles the message directly. The sameon_decode_failurepolicy can also be set when building a handler by hand through the typed adapter (see Codecs). - On the batch path the policy applies per batch decode (each element decodes independently) and to a panic in the batch handler. Per-element panic handling is out of scope.
This is the full example: examples/failure_policy.rs.