Context and state¶
Everything a handler can reach besides its payload arrives through two objects with different lifetimes:
| Level | Type | Lives for | Holds |
|---|---|---|---|
| Application | State |
the whole service | shared resources: pools, clients, configuration |
| Delivery | Context |
one message | the channel name, a headers working copy, a per-delivery Extensions type-map, access to State and named publishers |
State is filled once, at startup. A Context is built fresh for every delivery and threaded as
&mut through the middleware chain into the handler, so middleware and the handler observe (and
can enrich) the same per-message view.
Application level: State¶
State is a type-map: one value per type, Send + Sync. Two ways to fill it:
- At build time with
RustStream::insert_state(value)- for values that need no async work (parsed configuration, a constructed client). - In an
on_startuphook withstate.insert(value)- for resources that needawaitto create (a database pool). See Lifespan for the hook contract.
Inserting a second value of the same type replaces the first. Handlers read it through the context
with ctx.state().get::<T>(), which returns Option<&T> - None only if nothing of that type was
inserted. The state is shared behind an Arc once the service runs, so handlers get cheap shared
references, not copies; interior mutability (an AtomicU64, a mutex-guarded map) is the tool when
a shared value must change at runtime.
/// Shared configuration: inserted once at build time, read by every handler.
#[derive(Debug)]
struct AppConfig {
reject_zero_ids: bool,
}
Delivery level: Context¶
A #[subscriber] handler opts in by declaring a second parameter after the payload; omit it when
the handler needs nothing but the message. The macro resolves the type itself, so Context needs
no import when it appears only in handler signatures:
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
// 1. The channel the message arrived on.
println!("received on {}", ctx.name());
// 2. The headers working copy - including what middleware added on the way in.
if let Some(id) = ctx.headers().get("x-request-id") {
println!("request {}", String::from_utf8_lossy(id));
}
// 3. App-level shared state, reached through state().
let config = ctx
.state()
.get::<AppConfig>()
.expect("config inserted at build time");
if config.reject_zero_ids && order.id == 0 {
return HandlerResult::drop();
}
// 4. A post-settle hook: fires after the broker has acked this message, off the delivery
// path, so slow follow-up work never gates the ack or the next delivery. At-most-once:
// a lost hook does not redeliver.
let id = order.id;
ctx.after_ack(async move {
println!("order {id} acked; sending the confirmation");
});
// 5. Per-delivery extensions: a value scoped to this one delivery (set by middleware or a
// broker, then read by the handler). Distinct from the shared app state above.
ctx.insert(order.id);
if let Some(seen) = ctx.get::<u64>() {
println!("processing order {seen}");
}
HandlerResult::Ack
}
What the context exposes:
| Method | Returns | Purpose |
|---|---|---|
name() |
&str |
the channel / subject the message arrived on |
headers() |
&Headers |
the working copy of the message headers |
headers_mut() |
&mut Headers |
the same copy, for middleware to enrich |
state() |
&State |
shared application state (then .get::<T>()) |
get::<T>() |
Option<&T> |
a per-delivery extension |
insert::<T>(v) |
() |
write a per-delivery extension |
publisher(name) |
Option<ScopedPublisher> |
a named publisher |
after(outcome).then(fut) |
() |
a post-settle hook gated on the settlement outcome |
after_ack(fut) / after_settle(fut) |
() |
post-settle hook sugar (after an ack / after any settlement) |
Closure handlers (the manual typed(codec, |msg, ctx| ...) form) always take the context as their
second argument.
Per-delivery extensions¶
Beside the shared State, the context carries an Extensions type-map scoped to one delivery:
ctx.insert::<T>(value) writes it, ctx.get::<T>() reads it back, and it is dropped when the
handler returns, so one delivery's values never leak into the next. Use it for data that belongs to
a single message rather than the whole service - a correlation id, an authenticated user a layer
resolved, a tracing span.
// 5. Per-delivery extensions: a value scoped to this one delivery (set by middleware or a
// broker, then read by the handler). Distinct from the shared app state above.
ctx.insert(order.id);
if let Some(seen) = ctx.get::<u64>() {
println!("processing order {seen}");
}
A broker can also seed the map: implementing IncomingMessage::extensions lets it hand the handler
typed per-delivery values (native delivery metadata, a commit token, a reply-to handle) without
serializing them into the byte-only headers. The runtime moves the broker's map into the context
before the handler runs, and those values stay reachable through the publish pipeline, so a
transactional publisher can read a broker-supplied commit token at commit time.
The headers working copy¶
ctx.headers() is not the broker message itself: each delivery clones the incoming headers into a
working copy that lives in the context. That makes it a scratchpad for the dispatch chain -
middleware earlier in the chain can stamp values onto it with headers_mut(), and the handler
reads the enriched result:
/// A layer that stamps a request id onto the context headers before the handler runs.
#[derive(Clone)]
struct RequestId;
struct WithRequestId<H>(H);
impl<H> Layer<H> for RequestId {
type Handler = WithRequestId<H>;
fn layer(&self, inner: H) -> WithRequestId<H> {
WithRequestId(inner)
}
}
static NEXT_REQUEST: AtomicU64 = AtomicU64::new(1);
impl<M: Send + Sync, H: Handler<M>> Handler<M> for WithRequestId<H> {
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
if ctx.headers().get("x-request-id").is_none() {
let id = format!("req-{}", NEXT_REQUEST.fetch_add(1, Ordering::Relaxed));
ctx.headers_mut().insert("x-request-id", id.into_bytes());
}
self.0.handle(msg, ctx).await
}
}
Mounted globally, the layer runs before every handler, so handle above always finds
x-request-id:
#[ruststream::app]
fn app() -> RustStream<Stack<RequestId, Identity>> {
RustStream::new(AppInfo::new("context", "0.1.0"))
.insert_state(AppConfig {
reject_zero_ids: true,
})
.layer(RequestId)
.with_broker(MemoryBroker::new(), |b| b.include(handle))
}
Two boundaries to keep in mind:
- Mutations stay within the delivery: the broker message and other subscribers' deliveries are untouched.
- Outgoing messages do not inherit the copy. Replies and manual publishes start from fresh
headers; attach outgoing metadata in the publish pipeline
(a
PublishLayerorPublishMiddleware) instead.
Publishing through the context¶
ctx.publisher("name") resolves a publisher registered with RustStream::publisher(name, p) and
returns None when nothing is registered under that name. Sends through it run the application's
dynamic publish middleware - the same chain as a macro reply - so a manual publish is not a hole
in the pipeline:
#[subscriber("ingress")]
async fn forward(event: &Event, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(publisher) = ctx.publisher("egress") {
let payload = JsonCodec.encode(event).expect("serializable");
let out = Outgoing::new("egress", payload);
if publisher.publish(out).await.is_err() {
return HandlerResult::retry();
}
}
HandlerResult::Ack
}
A closure handler cannot return a future that borrows the context across .await; publish from a
#[subscriber] handler or a struct handler with async fn handle, both of which own the borrow.
See Publishing.
Post-settle hooks¶
Sometimes a handler needs a side effect to fire after the message has been settled - a non-critical notification, slow follow-up work - without it gating the ack decision or affecting redelivery. Register one on the context:
#[subscriber("orders")]
async fn handle(order: &Order, ctx: &mut Context<'_>) -> HandlerResult {
// 1. The channel the message arrived on.
println!("received on {}", ctx.name());
// 2. The headers working copy - including what middleware added on the way in.
if let Some(id) = ctx.headers().get("x-request-id") {
println!("request {}", String::from_utf8_lossy(id));
}
// 3. App-level shared state, reached through state().
let config = ctx
.state()
.get::<AppConfig>()
.expect("config inserted at build time");
if config.reject_zero_ids && order.id == 0 {
return HandlerResult::drop();
}
// 4. A post-settle hook: fires after the broker has acked this message, off the delivery
// path, so slow follow-up work never gates the ack or the next delivery. At-most-once:
// a lost hook does not redeliver.
let id = order.id;
ctx.after_ack(async move {
println!("order {id} acked; sending the confirmation");
});
// 5. Per-delivery extensions: a value scoped to this one delivery (set by middleware or a
// broker, then read by the handler). Distinct from the shared app state above.
ctx.insert(order.id);
if let Some(seen) = ctx.get::<u64>() {
println!("processing order {seen}");
}
HandlerResult::Ack
}
The handler above ends with ctx.after_ack(..): the continuation runs only once the broker has
acked the message, off the delivery path, so it never delays the ack or the next delivery.
Three forms, all additive:
ctx.after(outcome).then(fut)- runs only if the message settles byoutcome, matched by kind. The four kinds are distinct:Ack,drop()(nack, no requeue),retry()(nack, requeue), andretry_after()(matched regardless of the delay). Drop and retry are separate mechanics, so a hook gated ondrop()does not fire on aretry()settlement, and vice versa.ctx.after_ack(fut)- sugar forctx.after(HandlerResult::Ack).then(fut).ctx.after_settle(fut)- runs after the message settles, whatever the outcome.
Multiple registrations accumulate and every matching one runs. The semantics are at-most-once:
the message is already settled before any hook runs, so a hook that panics, or that is lost when the
process crashes, never causes a redelivery. A graceful shutdown drains in-flight hooks (bounded by
shutdown_timeout); an aborted shutdown may drop them.
On the batch path a Context is one per batch, so a hook runs after the whole batch has settled.
Because a batch has per-element outcomes, the outcome gate is ill-defined there: only
after_settle hooks fire (the gated after(..) / after_ack forms are ignored on a batch).
Context in middleware¶
Every middleware form receives the same &mut Context the handler will see, which is what makes
the enrichment pattern work:
- A static layer's
Handler::handle(&self, msg, ctx)- as in the example above. - A dynamic
DynMiddleware::handle(&self, input, ctx, next)- inspect or enrich, thennext.run(input, ctx).
The middleware forms themselves are covered in Middleware. The full program for
this page is
examples/context.rs.