Skip to content

Redis Streams

A #[subscriber("key")] handler binds to a Redis stream key. Because Redis Streams always read through a consumer group, the bare-string form needs a broker-wide default group (.default_group):

use ruststream::runtime::{AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use ruststream_fred::RedisBroker;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Order {
    id: u64,
}

#[subscriber("orders")]
async fn handle(order: &Order) -> HandlerResult {
    println!("got order {}", order.id);
    HandlerResult::Ack
}

Wire it onto the broker; the with_broker / include part is identical to every other broker.

#[ruststream::app]
fn app() -> RustStream {
    RustStream::new(AppInfo::new("orders", "0.1.0")).with_broker(
        RedisBroker::standalone("redis://localhost:6379").default_group("workers"),
        |b| {
            b.include(handle);
        },
    )
}

Payload and headers travel as stream entry fields: the body under a reserved field and each header under a h: prefix, so a round-trip through XADD / XREADGROUP preserves both.

Read modes: fresh tail vs reclaim

The read mode is chosen by constructor, never a runtime flag, because the two return disjoint sets of messages:

  • RedisStream::new(key) reads fresh entries off the tail (XREADGROUP >). This is the normal worker.
  • RedisStream::reclaim(key, min_idle) reclaims entries another consumer fetched but never acked (XAUTOCLAIM, idle at least min_idle). This is crash recovery, run alongside a new subscriber on the same group ("two handlers per group").

min_idle has no default and must exceed the longest legitimate handler runtime: set it too low and a healthy consumer's in-flight message gets reclaimed and processed twice.

A descriptor can sit directly in the #[subscriber(...)] decorator. The fresh-tail worker:

// The descriptor sits directly in the decorator: a fresh-tail consumer on the `workers` group.
#[subscriber(RedisStream::new("orders").group("workers"))]
async fn handle(order: &Order) -> HandlerResult {
    println!("processing order {}", order.id);
    HandlerResult::Ack
}

The recovery handler on the same group, reclaiming entries idle for over 30 seconds:

// A recovery handler for the same group: reclaims entries left pending for over 30s.
#[subscriber(RedisStream::reclaim("orders", Duration::from_secs(30)).group("workers"))]
async fn recover(order: &Order) -> HandlerResult {
    println!("recovering order {}", order.id);
    HandlerResult::Ack
}

Acknowledgement

Settlement follows the republish-retry model:

  • ack -> XACK (remove from the pending list).
  • nack(requeue = true) -> re-append a copy to the same stream, then XACK the original. The copy is reprocessed by the normal new consumer. This is at-least-once: a crash between the two leaves a duplicate.
  • nack(requeue = false) -> XACK to drop.

Delayed retry

A handler can ask for a delayed redelivery (HandlerResult::retry_after(delay)), for example to back off a transient failure. Redis Streams have no native per-message delay, so by default the runtime falls back to an in-process timer that re-publishes the message after the delay - at-most-once over that window, since a crash before the timer fires loses the deferred copy.

For a crash-safe alternative, opt a subscription into a durable ZSET delay queue. It is off by default and you name the ZSET key explicitly (the key has no sane default):

// On a transient failure the handler asks for a delayed retry. The delay queue is the named ZSET,
// so the redelivery is durable: it survives a crash between the failure and the retry firing.
#[subscriber(
    RedisStream::new("orders")
        .group("workers")
        .delayed_retry(DelayedRetry::DurableZset { key: "orders.delayed".to_owned(), ttl: None })
)]
async fn handle_order(order: &Order) -> HandlerResult {
    if order.id == 0 {
        // Park the message in the ZSET for 30s instead of blocking the worker or busy-requeuing.
        return HandlerResult::retry_after(Duration::from_secs(30));
    }
    println!("processed order {}", order.id);
    HandlerResult::Ack
}

A delayed delivery is ZADDed to the named ZSET with a fire_at score, then the original is XACKed; a sweeper folded into the subscription's read loop moves due entries back onto the stream with XADD, so the retry survives a restart. The sweeper's granularity is the read block interval, and the retry-count header is incremented on each pass. An optional TTL on the ZSET key cleans up an abandoned queue, but it must exceed the longest scheduled delay or pending entries are dropped before they fire. Scores are wall-clock epoch milliseconds, so keep clocks synced (NTP).