Skip to content

Pub/Sub

Pub/Sub is fire-and-forget: no durability, no consumer groups, no ack (ack / nack report Unsupported). A RedisPubSub descriptor selects the channel and mode. Classic broadcasts cluster-wide and supports patterns:

// Classic broadcast subscription that re-publishes each event to an `audit` channel. The macro
// `publish("audit")` form publishes the handler's return value through the publisher wired at mount.
#[subscriber(RedisPubSub::new("events"), publish("audit"))]
async fn on_event(event: &Event) -> Event {
    println!("event: {}", event.kind);
    Event {
        kind: event.kind.clone(),
    }
}

Sharded delivery (SSUBSCRIBE, Redis 7+) stays slot-local so it scales across a cluster, at the cost of patterns. Enable it per subscription with .mode(PubSubMode::Sharded):

// Sharded subscription (`SSUBSCRIBE`): on a cluster this stays slot-local and scales. It belongs on
// a cluster broker (below), and pairs with a sharded publisher
// (`broker.pubsub_publisher().mode(Sharded)`).
#[subscriber(RedisPubSub::new("events").mode(PubSubMode::Sharded))]
async fn on_event_sharded(event: &Event) -> HandlerResult {
    println!("sharded event: {}", event.kind);
    HandlerResult::Ack
}

Because RustStream is multi-broker, one service can run classic Pub/Sub on a standalone server and sharded Pub/Sub on a cluster at the same time - each handler mounts on its own broker:

#[ruststream::app]
fn app() -> RustStream {
    RustStream::new(AppInfo::new("events", "0.1.0"))
        .with_broker(RedisBroker::standalone("redis://localhost:6379"), |b| {
            // `publish("audit")` sends through this Pub/Sub publisher (PUBLISH), not the default
            // stream publisher (XADD).
            let audit = TypedPublisher::new(b.broker().pubsub_publisher());
            b.include_publishing(on_event, audit);
        })
        .with_broker(RedisBroker::cluster(["redis://localhost:7000"]), |b| {
            b.include(on_event_sharded);
        })
}

To publish, mount the handler with include_publishing and a broker.pubsub_publisher() (add .mode(PubSubMode::Sharded) to match a sharded subscriber). The classic handler above uses the macro publish("audit") form, so its return value goes out through that Pub/Sub publisher - not the default stream publisher:

#[ruststream::app]
fn app() -> RustStream {
    RustStream::new(AppInfo::new("events", "0.1.0"))
        .with_broker(RedisBroker::standalone("redis://localhost:6379"), |b| {
            // `publish("audit")` sends through this Pub/Sub publisher (PUBLISH), not the default
            // stream publisher (XADD).
            let audit = TypedPublisher::new(b.broker().pubsub_publisher());
            b.include_publishing(on_event, audit);
        })
        .with_broker(RedisBroker::cluster(["redis://localhost:7000"]), |b| {
            b.include(on_event_sharded);
        })
}

Headers travel in a frame around the payload: a lossless binary frame by default, or - when you set a codec on both the publisher and the subscriber (.codec(JsonCodec)) - a readable codec-serialized {headers, payload} envelope (so the wire value is legible JSON in tools like RedisInsight). A raw value an external client published is delivered as the payload with empty headers.