Publishing and replies¶
There are two ways to publish: return a reply from a handler, or publish explicitly from inside a handler through a named publisher. Both run through the publish pipeline.
Replying from a handler¶
Name a reply destination with publish(..) and return the reply value. The runtime encodes it and
sends it:
use ruststream::subscriber;
#[subscriber("requests", publish("responses"))]
async fn respond(req: &Request) -> Response {
println!("responding to request {}", req.id);
Response { ok: true }
}
Mount it with include_publishing, handing it a [TypedPublisher] that carries the broker
connection and the reply codec (TypedPublisher::new uses the default codec; name one with
TypedPublisher::with_codec). include_publishing reuses that codec to decode the request too:
use ruststream::runtime::TypedPublisher;
RustStream::new(info).with_broker(broker, |b| {
let replies = TypedPublisher::new(b.broker().publisher());
b.include_publishing(respond, replies);
});
The TypedPublisher's codec encodes the reply, and include_publishing reuses it to decode the
incoming request. To decode the request with a different codec, set a scope default with
with_broker_codec (or Router::with_codec on a router chain); see
Codecs.
Controlling the acknowledgement¶
A plain reply form always publishes and acks. Return Result<Reply, HandlerResult> instead to
take control: Ok(reply) publishes and acks, Err(result) publishes nothing and the dispatcher
acts on the returned HandlerResult (HandlerResult::drop() to dead-letter,
HandlerResult::retry() to ask for redelivery):
// `Ok` publishes the reply and acks; `Err` publishes nothing and the dispatcher acts on the
// returned HandlerResult (here: drop the malformed request instead of replying).
#[subscriber("validated-requests", publish("responses"))]
async fn validate(req: &Request) -> Result<Response, HandlerResult> {
if req.id == 0 {
return Err(HandlerResult::drop());
}
Ok(Response { ok: true })
}
The Result form is detected from the written signature, so spell it out (a type alias hiding the
Result is treated as a plain reply type). Like any handler, a publishing handler may declare an
optional second &mut Context parameter to read app state or publish manually.
If the reply publish itself fails (broker rejected it, connection lost), the incoming message is
nacked with requeue = true: the broker redelivers it instead of the reply being silently lost.
Make publishing handlers idempotent under redelivery.
Publishing from inside a handler¶
To publish to a destination other than a single reply (fan-out, side effects, routing to a different broker), register a named publisher on the application and resolve it from the context.
// register at build time
let app = RustStream::new(info)
.publisher("egress", egress_publisher)
.with_broker(broker, |b| b.include(forward));
use ruststream::codec::{Codec, JsonCodec};
use ruststream::runtime::{HandlerResult, Outgoing};
#[subscriber("ingress")]
async fn forward(event: &Event, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(publisher) = ctx.publisher("egress") {
let payload = JsonCodec.encode(event).expect("serializable");
let out = Outgoing::new("egress", payload);
if publisher.publish(out).await.is_err() {
return HandlerResult::retry();
}
}
HandlerResult::Ack
}
Handlers that publish must own their context
A closure handler cannot return a future that borrows &mut Context. Use a #[subscriber]
handler (as above) or a struct handler with async fn handle, both of which own the borrow
across awaits.
The publish pipeline¶
Two kinds of transform run before a message leaves the process, and they compose:
- Static
PublishLayeron aTypedPublisher, added with.layer(..). Zero-cost, per-destination transforms (an envelope, a fixed content type). They run first, closest to the value. - Dynamic publish middleware on the application, added with
.publish_layer(..). Cross-cutting concerns (publish metrics, a dead-letter wrapper) applied to every published message. They run outside the static layers, then the message is sent.
A static PublishLayer implements apply(&mut Outgoing<'_>):
/// A static, per-publisher transform: stamps an envelope header on every outgoing message.
struct EnvelopeLayer;
impl PublishLayer for EnvelopeLayer {
fn apply(&self, out: &mut Outgoing<'_>) {
out.headers_mut().insert("x-envelope", b"1".to_vec());
}
}
A dynamic middleware implements PublishMiddleware with an around/next signature, so it can
short-circuit, retry, or observe:
/// A dynamic, app-wide middleware: observes every publish, then passes it on.
struct AuditPublish;
impl PublishMiddleware for AuditPublish {
fn on_publish<'a>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishNext<'a>,
) -> Pin<
Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>,
> {
Box::pin(async move {
println!("publishing to {}", out.name());
next.run(out).await
})
}
}
Both levels compose on the application:
RustStream::new(AppInfo::new("publishing", "0.1.0"))
// dynamic, app-wide: wraps every published message
.publish_layer(AuditPublish)
// a named publisher, resolvable from any handler's context
.publisher("egress", egress)
.with_broker(broker, |b| {
// static, per-publisher: composed onto this TypedPublisher at compile time
let replies = TypedPublisher::new(b.broker().publisher()).layer(EnvelopeLayer);
b.include_publishing(respond, replies);
let validated = TypedPublisher::new(b.broker().publisher());
b.include_publishing(validate, validated);
b.include(forward);
// .transactional() exists only because MemoryPublisher implements
// TransactionalPublisher; without it, each reply publishes independently.
let confirmations = TypedPublisher::new(b.broker().publisher()).transactional();
b.include_batch_publishing(confirm, confirmations);
})
Manual publishes through ctx.publisher(..) run through the dynamic pipeline (the static layer is a
property of a specific TypedPublisher). The full program is
examples/publishing.rs.
Batch replies and transactions¶
A #[subscriber(batch(..), publish(..))] handler consumes a whole decoded batch and returns the
replies for it - the consume-transform-produce pattern. Ok(replies) publishes every reply to
the reply name and acks the batch; Err(result) publishes nothing and settles the whole batch
with result (all-or-nothing: selective per-element outcomes do not compose with a
transaction):
/// Confirms a whole page of orders; the replies become visible atomically on commit.
#[subscriber(batch("orders"), publish("confirmations"))]
async fn confirm(orders: &[Event]) -> Result<Vec<Event>, HandlerResult> {
if orders.is_empty() {
return Err(HandlerResult::drop()); // nothing published, whole batch settled
}
Ok(orders.iter().map(|o| Event { id: o.id }).collect())
}
Mount it with include_batch_publishing, handing it the reply wiring:
// .transactional() exists only because MemoryPublisher implements
// TransactionalPublisher; without it, each reply publishes independently.
let confirmations = TypedPublisher::new(b.broker().publisher()).transactional();
b.include_batch_publishing(confirm, confirmations);
With a plain TypedPublisher, each reply publishes independently; a mid-batch failure retries
the whole batch, so the earlier replies may be published again on redelivery (at-least-once).
Calling .transactional() on the TypedPublisher switches the wiring to one broker transaction
per batch: the runtime begins a transaction, publishes every reply, commits, and only then acks
the incoming batch; any failure aborts, so replies are never half-visible. The method exists
only when the underlying publisher implements the TransactionalPublisher capability - for
brokers without transactions the compiler rejects it. The single-message include_publishing
forms keep taking a plain TypedPublisher: a one-message transaction adds broker round-trips
for no atomicity gain.
Batch publishing¶
There is no direct batch-publish API on Publisher. For most brokers (NATS, Kafka) the client
already coalesces writes, so a per-message publish loop achieves the same throughput. Where a
broker has a genuine pipeline primitive (Redis), the broker crate exposes it as a broker-specific
capability.