Skip to content

Publishing and replies

There are two ways to publish: return a reply from a handler, or publish explicitly from inside a handler through a named publisher. Both run through the publish pipeline.

Replying from a handler

Name a reply destination with publish(..) and return the reply value. The runtime encodes it and sends it:

use ruststream::subscriber;

#[subscriber("requests", publish("responses"))]
async fn respond(req: &Request) -> Response {
    println!("responding to request {}", req.id);
    Response { ok: true }
}

Mount it with include_publishing, handing it a [TypedPublisher] that carries the broker connection and the reply codec (TypedPublisher::new uses the default codec; name one with TypedPublisher::with_codec). include_publishing reuses that codec to decode the request too:

use ruststream::runtime::TypedPublisher;

RustStream::new(info).with_broker(broker, |b| {
    let replies = TypedPublisher::new(b.broker().publisher());
    b.include_publishing(respond, replies);
});

The TypedPublisher's codec encodes the reply, and include_publishing reuses it to decode the incoming request. To decode the request with a different codec, set a scope default with with_broker_codec (or Router::with_codec on a router chain); see Codecs.

Controlling the acknowledgement

A plain reply form always publishes and acks. Return Result<Reply, HandlerResult> instead to take control: Ok(reply) publishes and acks, Err(result) publishes nothing and the dispatcher acts on the returned HandlerResult (HandlerResult::drop() to dead-letter, HandlerResult::retry() to ask for redelivery):

// `Ok` publishes the reply and acks; `Err` publishes nothing and the dispatcher acts on the
// returned HandlerResult (here: drop the malformed request instead of replying).
#[subscriber("validated-requests", publish("responses"))]
async fn validate(req: &Request) -> Result<Response, HandlerResult> {
    if req.id == 0 {
        return Err(HandlerResult::drop());
    }
    Ok(Response { ok: true })
}

The Result form is detected from the written signature, so spell it out (a type alias hiding the Result is treated as a plain reply type). Like any handler, a publishing handler may declare an optional second &mut Context parameter to read app state or publish manually.

If the reply publish itself fails (broker rejected it, connection lost), the incoming message is nacked with requeue = true: the broker redelivers it instead of the reply being silently lost. Make publishing handlers idempotent under redelivery.

Publishing from inside a handler

To publish to a destination other than a single reply (fan-out, side effects, routing to a different broker), register a named publisher on the application and resolve it from the context.

// register at build time
let app = RustStream::new(info)
    .publisher("egress", egress_publisher)
    .with_broker(broker, |b| b.include(forward));
use ruststream::codec::{Codec, JsonCodec};
use ruststream::runtime::{HandlerResult, Outgoing};

#[subscriber("ingress")]
async fn forward(event: &Event, ctx: &mut Context<'_>) -> HandlerResult {
    if let Some(publisher) = ctx.publisher("egress") {
        let payload = JsonCodec.encode(event).expect("serializable");
        let out = Outgoing::new("egress", payload);
        if publisher.publish(out).await.is_err() {
            return HandlerResult::retry();
        }
    }
    HandlerResult::Ack
}

Handlers that publish must own their context

A closure handler cannot return a future that borrows &mut Context. Use a #[subscriber] handler (as above) or a struct handler with async fn handle, both of which own the borrow across awaits.

The publish pipeline

Two kinds of transform run before a message leaves the process, and they compose:

  • Static PublishLayer on a TypedPublisher, added with .layer(..). Zero-cost, per-destination transforms (an envelope, a fixed content type). They run first, closest to the value.
  • Dynamic publish middleware on the application, added with .publish_layer(..). Cross-cutting concerns (publish metrics, a dead-letter wrapper) applied to every published message. They run outside the static layers, then the message is sent.

A static PublishLayer implements apply(&mut Outgoing<'_>):

/// A static, per-publisher transform: stamps an envelope header on every outgoing message.
struct EnvelopeLayer;

impl PublishLayer for EnvelopeLayer {
    fn apply(&self, out: &mut Outgoing<'_>) {
        out.headers_mut().insert("x-envelope", b"1".to_vec());
    }
}

A dynamic middleware implements PublishMiddleware with an around/next signature, so it can short-circuit, retry, or observe:

/// A dynamic, app-wide middleware: observes every publish, then passes it on.
struct AuditPublish;

impl PublishMiddleware for AuditPublish {
    fn on_publish<'a>(
        &'a self,
        out: &'a mut Outgoing<'a>,
        next: PublishNext<'a>,
    ) -> Pin<
        Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>,
    > {
        Box::pin(async move {
            println!("publishing to {}", out.name());
            next.run(out).await
        })
    }
}

Both levels compose on the application:

RustStream::new(AppInfo::new("publishing", "0.1.0"))
    // dynamic, app-wide: wraps every published message
    .publish_layer(AuditPublish)
    // a named publisher, resolvable from any handler's context
    .publisher("egress", egress)
    .with_broker(broker, |b| {
        // static, per-publisher: composed onto this TypedPublisher at compile time
        let replies = TypedPublisher::new(b.broker().publisher()).layer(EnvelopeLayer);
        b.include_publishing(respond, replies);
        let validated = TypedPublisher::new(b.broker().publisher());
        b.include_publishing(validate, validated);
        b.include(forward);
        // .transactional() exists only because MemoryPublisher implements
        // TransactionalPublisher; without it, each reply publishes independently.
        let confirmations = TypedPublisher::new(b.broker().publisher()).transactional();
        b.include_batch_publishing(confirm, confirmations);
    })

Manual publishes through ctx.publisher(..) run through the dynamic pipeline (the static layer is a property of a specific TypedPublisher). The full program is examples/publishing.rs.

Batch replies and transactions

A #[subscriber(batch(..), publish(..))] handler consumes a whole decoded batch and returns the replies for it - the consume-transform-produce pattern. Ok(replies) publishes every reply to the reply name and acks the batch; Err(result) publishes nothing and settles the whole batch with result (all-or-nothing: selective per-element outcomes do not compose with a transaction):

/// Confirms a whole page of orders; the replies become visible atomically on commit.
#[subscriber(batch("orders"), publish("confirmations"))]
async fn confirm(orders: &[Event]) -> Result<Vec<Event>, HandlerResult> {
    if orders.is_empty() {
        return Err(HandlerResult::drop()); // nothing published, whole batch settled
    }
    Ok(orders.iter().map(|o| Event { id: o.id }).collect())
}

Mount it with include_batch_publishing, handing it the reply wiring:

// .transactional() exists only because MemoryPublisher implements
// TransactionalPublisher; without it, each reply publishes independently.
let confirmations = TypedPublisher::new(b.broker().publisher()).transactional();
b.include_batch_publishing(confirm, confirmations);

With a plain TypedPublisher, each reply publishes independently; a mid-batch failure retries the whole batch, so the earlier replies may be published again on redelivery (at-least-once). Calling .transactional() on the TypedPublisher switches the wiring to one broker transaction per batch: the runtime begins a transaction, publishes every reply, commits, and only then acks the incoming batch; any failure aborts, so replies are never half-visible. The method exists only when the underlying publisher implements the TransactionalPublisher capability - for brokers without transactions the compiler rejects it. The single-message include_publishing forms keep taking a plain TypedPublisher: a one-message transaction adds broker round-trips for no atomicity gain.

Batch publishing

There is no direct batch-publish API on Publisher. For most brokers (NATS, Kafka) the client already coalesces writes, so a per-message publish loop achieves the same throughput. Where a broker has a genuine pipeline primitive (Redis), the broker crate exposes it as a broker-specific capability.