Pub/Sub¶
Pub/Sub is fire-and-forget: no durability, no consumer groups, no ack (ack / nack report
Unsupported). A RedisPubSub descriptor selects the channel and mode. Classic broadcasts
cluster-wide and supports patterns:
// Classic broadcast subscription that re-publishes each event to an `audit` channel. The macro
// `publish("audit")` form publishes the handler's return value through the publisher wired at mount.
#[subscriber(RedisPubSub::new("events"), publish("audit"))]
async fn on_event(event: &Event) -> Event {
println!("event: {}", event.kind);
Event {
kind: event.kind.clone(),
}
}
Sharded delivery (SSUBSCRIBE, Redis 7+) stays slot-local so it scales across a cluster, at the cost
of patterns. Enable it per subscription with .mode(PubSubMode::Sharded):
// Sharded subscription (`SSUBSCRIBE`): on a cluster this stays slot-local and scales. It belongs on
// a cluster broker (below), and pairs with a sharded publisher
// (`broker.pubsub_publisher().mode(Sharded)`).
#[subscriber(RedisPubSub::new("events").mode(PubSubMode::Sharded))]
async fn on_event_sharded(event: &Event) -> HandlerResult {
println!("sharded event: {}", event.kind);
HandlerResult::Ack
}
Because RustStream is multi-broker, one service can run classic Pub/Sub on a standalone server and sharded Pub/Sub on a cluster at the same time - each handler mounts on its own broker:
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("events", "0.1.0"))
.with_broker(RedisBroker::standalone("redis://localhost:6379"), |b| {
// `publish("audit")` sends through this Pub/Sub publisher (PUBLISH), not the default
// stream publisher (XADD).
let audit = TypedPublisher::new(b.broker().pubsub_publisher());
b.include_publishing(on_event, audit);
})
.with_broker(RedisBroker::cluster(["redis://localhost:7000"]), |b| {
b.include(on_event_sharded);
})
}
To publish, mount the handler with include_publishing and a broker.pubsub_publisher() (add
.mode(PubSubMode::Sharded) to match a sharded subscriber). The classic handler above uses the macro
publish("audit") form, so its return value goes out through that Pub/Sub publisher - not the default
stream publisher:
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("events", "0.1.0"))
.with_broker(RedisBroker::standalone("redis://localhost:6379"), |b| {
// `publish("audit")` sends through this Pub/Sub publisher (PUBLISH), not the default
// stream publisher (XADD).
let audit = TypedPublisher::new(b.broker().pubsub_publisher());
b.include_publishing(on_event, audit);
})
.with_broker(RedisBroker::cluster(["redis://localhost:7000"]), |b| {
b.include(on_event_sharded);
})
}
Headers travel in a frame around the payload: a lossless binary frame by default, or - when you set a
codec on both the publisher and the subscriber (.codec(JsonCodec)) - a readable codec-serialized
{headers, payload} envelope (so the wire value is legible JSON in tools like RedisInsight). A raw
value an external client published is delivered as the payload with empty headers.