Middleware¶
Middleware wraps handlers with cross-cutting logic: tracing, metrics, auth, retries. RustStream has
two middleware scopes, both built on the same Layer machinery, applied at different points in the
dispatch path.
Middleware scopes¶
The two scopes compose: the application stack is the outer one, a router's own stack sits inside it.
Application scope. Add a layer to the whole application with RustStream::layer, before
with_broker. Every handler registered after it is wrapped - both handlers registered directly on
a broker scope and handlers a router brings in via include_router:
#[ruststream::app]
fn app() -> RustStream<Stack<LogLayer, Identity>> {
RustStream::new(AppInfo::new("app-scope", "0.1.0"))
// wraps every handler registered directly on a broker scope below
.layer(LogLayer)
.with_broker(MemoryBroker::new(), |b| {
b.include(orders); // wrapped by LogLayer
b.include(shipments); // wrapped by LogLayer
// Mounted through a router: also wrapped by the app stack.
b.include_router(Router::new().include(audit));
})
}
Router scope. Give a router its own middleware with Router::layer, which wraps every handler
on that router when it is mounted (see Routing). Handlers mounted
directly on the broker scope stay outside it:
fn routes() -> impl RouterDef<MemoryBroker> {
// wraps every handler on this router when it is mounted
Router::new()
.layer(LogLayer)
.include(orders) // wrapped by LogLayer
.include(shipments) // wrapped by LogLayer
}
#[ruststream::app]
fn app() -> RustStream {
RustStream::new(AppInfo::new("router-scope", "0.1.0")).with_broker(MemoryBroker::new(), |b| {
b.include_router(routes());
b.include(audit); // directly on the scope: outside the router's stack
})
}
The two programs are
middleware_app_scope.rs
and
middleware_router_scope.rs;
LogLayer is the hand-written layer from the next section, and the built-in
layers::TracingLayer mounts the same way.
The first layer added is the outermost. Both stacks are static: zero runtime dispatch cost, and
the type grows as you call layer.
Reaching router handlers requires a BlanketLayer
A router hides its handlers' concrete types, so a layer that wraps them (the app stack at
include_router, or Router::layer) must implement BlanketLayer - one generic method that
wraps any handler. The bundled layers implement it; for a custom layer it is a few lines next
to its Layer impl (see LogLayer in the examples above).
Writing a layer¶
A layer transforms one handler into another. Implement Layer<H>:
use ruststream::runtime::{Context, Handler, HandlerResult, Layer};
#[derive(Clone)]
struct LogLayer;
struct Logged<H>(H);
impl<H> Layer<H> for LogLayer {
type Handler = Logged<H>;
fn layer(&self, inner: H) -> Logged<H> {
Logged(inner)
}
}
impl<M: Send + Sync, H: Handler<M>> Handler<M> for Logged<H> {
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
println!("-> {}", ctx.name());
let settle = self.0.handle(msg, ctx).await;
println!("<- {}", ctx.name());
settle
}
}
Identity is the no-op layer (the default global stack), and Stack<Inner, Outer> composes two.
The ctx here is the same per-delivery Context the handler receives, so a layer
can enrich the headers working copy before the handler
reads it.
Per-handler middleware¶
Wrap a single handler with HandlerExt::with instead of the whole application:
This is the right tool when only some handlers need a layer. It composes with the global stack.
Why middleware is static by default¶
The layers above are resolved at compile time: with/layer build a concrete, nested handler type
(Logged<Typed<..>>), and Handler::handle returns an impl Future whose type is known. The
compiler monomorphizes the whole chain into one state machine and inlines across the layer
boundaries, so a static layer adds no dispatch cost and no allocation - it is a zero-cost
abstraction.
Making every middleware dynamic (dyn) would throw that away. Handler::handle is an async fn in
trait, so its future is an anonymous impl Future - and a trait with an impl Trait return is not
object-safe. To store middleware behind dyn, the future has to be boxed (Pin<Box<dyn Future>>):
one heap allocation per layer per message, and the call can no longer be inlined or specialized
across the dyn boundary. dyn + async does not optimize, so paying that cost on every handler -
when the chain is almost always known at compile time - would be the wrong default.
Dynamic middleware¶
When the chain genuinely is decided at runtime (layers toggled by config, or held behind dyn), opt
into the dynamic stack for exactly those handlers: DynStack, DynMiddleware, and Next. A
DynMiddleware has an around/next signature - it inspects the input and context, then either calls
next.run(..) to continue or short-circuits with its own result. Because it is object-safe, it
returns a boxed future explicitly:
use std::future::Future;
use std::pin::Pin;
use ruststream::runtime::{Context, DynMiddleware, HandlerResult, Next};
struct Audit {
service: String,
}
impl<I: Send + Sync> DynMiddleware<I> for Audit {
fn handle<'a>(
&'a self,
input: &'a I,
ctx: &'a mut Context<'_>,
next: Next<'a, I>,
) -> Pin<Box<dyn Future<Output = Settle> + Send + 'a>> {
Box::pin(async move {
println!("[{}] handling {}", self.service, ctx.name());
next.run(input, ctx).await
})
}
}
Only the list is dynamic. Build it at runtime, freeze it into a DynStack, and the result is an
ordinary static Layer - compose it into the application stack with layer, exactly like a
hand-written one. The rest of the dispatch chain stays static; the boxing cost is paid only inside
the stack:
use std::sync::Arc;
use ruststream::memory::MemoryMessage;
use ruststream::runtime::DynStack;
// The chain is decided at runtime...
let mut middleware: Vec<Arc<dyn DynMiddleware<MemoryMessage>>> = Vec::new();
if audit_enabled {
middleware.push(Arc::new(Audit {
service: "orders".to_owned(),
}));
}
let stack = DynStack::new(middleware); // empty list -> a no-op layer
// ...but the frozen DynStack is an ordinary static Layer: compose it into the
// application stack like any other (HandlerExt::with works too, per handler).
RustStream::new(info)
.layer(LogLayer)
.layer(stack)
.with_broker(MemoryBroker::new(), |b| {
b.include(handle);
b.include(returns);
})
The full program, with the chain toggled by an environment variable, is
examples/middleware.rs.
DynStack<I> is generic over the input it wraps. In the application stack it wraps the whole
decoding handler, so it is built over the broker's raw message type (DynStack<MemoryMessage>
above) and runs before decoding - a middleware generic over I, like Audit, works at either
level. To run on the decoded value instead, build a DynStack<Order> and apply it to the inner
typed handler with with (the manual registration form). Middleware in the same DynStack runs
in list order, outermost first. Each dynamic layer costs one boxed future per call, against zero
for the static layers, so keep the static chain as the default and reach for DynStack only where
runtime composition earns it.
Publish-side middleware¶
The middleware above runs on the consume path (incoming messages). The publish path has its own pipeline; see Publishing and replies.