Skip to content

Writing a broker

A broker is an independent crate that implements the core traits. It depends on ruststream with default features off, so it pulls in the trait surface and runtime without the bundled JSON codec or any other broker:

[dependencies]
ruststream = { version = "0.4", default-features = false }

This page is the contract. Implement the required traits, expose your own Config, add capability traits for the features your broker supports, and prove the result with the conformance harness. For a complete implementation on a real client, see the worked NATS example.

The required traits

Broker

The broker is pure lifecycle: connect and shut down. It carries no subscriber or publisher type, so a single application can mix broker kinds.

pub trait Broker: Send + Sync {
    type Error: std::error::Error + Send + Sync + 'static;
    async fn connect(&self) -> Result<(), Self::Error>;
    async fn shutdown(&self) -> Result<(), Self::Error>;
}

shutdown must never block or panic; do all fallible teardown here and return a Result.

Subscribe

Implement Subscribe to support subscribing by name. This is what #[subscriber("name")] uses.

pub trait Subscribe: Broker {
    type Subscriber: Subscriber;
    async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error>;
}

Subscriber

A subscriber is a Stream of incoming messages. Back-pressure comes for free from the stream.

pub trait Subscriber: Send {
    type Message: IncomingMessage;
    type Error: std::error::Error + Send + Sync + 'static;
    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_;
}

stream takes &mut self, so any state buffered between polls lives behind the mutable borrow, which keeps it cancel-safe.

IncomingMessage

A delivered message exposes its payload and headers, and is acked or nacked. Ack consumes self, so double-ack is a compile error.

pub trait IncomingMessage: Send + Sync {
    fn payload(&self) -> &[u8];
    fn headers(&self) -> &Headers;
    async fn ack(self) -> Result<(), AckError>;
    async fn nack(self, requeue: bool) -> Result<(), AckError>;

    // Defaulted: a plain nack(true). Override when the transport has native
    // delayed redelivery (JetStream NAK with delay); handlers reach it through
    // HandlerResult::retry_after.
    async fn nack_after(self, delay: Duration) -> Result<(), AckError>;

    // Defaulted: None. Override (with the Partitioned capability) to feed the
    // runtime's keyed worker lanes, workers(n, by_key).
    fn partition_key(&self) -> Option<&[u8]>;
}

The two defaulted methods are how optional broker behaviour degrades gracefully: a broker that overrides neither still works with every runtime feature, with retry_after falling back to an immediate requeue and keyed lanes rotating keyless messages.

Publisher

pub trait Publisher: Send + Sync {
    type Error: std::error::Error + Send + Sync + 'static;
    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error>;
}

OutgoingMessage borrows its name and payload, so publishing does not force an allocation.

Subscription sources

Subscribe covers the by-name case. When a subscription needs broker-specific options (a consumer group, a durable name, a delivery policy), expose a descriptor type that implements SubscriptionSource:

pub trait SubscriptionSource<B: Broker> {
    type Subscriber: Subscriber;
    fn name(&self) -> &str;
    fn subscribe(self, broker: &B) -> impl Future<Output = Result<Self::Subscriber, B::Error>> + Send;
}

Give the descriptor an associated constructor (OrdersStream::new(..)) rather than a free function, so users can name it directly in the decorator: #[subscriber(OrdersStream::new("orders", "workers"))]. The macro reads the type out of the constructor call, and also accepts a builder chain on it (#[subscriber(OrdersStream::new("orders").durable("workers"))]) as long as each method returns Self. Because type Subscriber lives on the source, one broker can offer several subscription kinds (pub/sub versus streams) with different subscriber types - or, as the NATS example does, serve them all from one descriptor that branches internally.

Capability traits

Implement only the capabilities your broker supports; none are part of the mandatory interface.

Trait For brokers that support
BatchSubscriber receiving messages in batches
TransactionalPublisher begin / commit / abort around publishes
RequestReply native request-reply (NATS yes, Kafka no)
Partitioned a partition key on outgoing messages
DescribeServer reporting a ServerSpec for AsyncAPI

Config and defaults

Your crate owns its Config. The core carries no broker-specific config, which is what keeps an upstream change scoped to one broker crate. If a config field has no sane default, do not implement Default for it; force the user to set it explicitly rather than shipping a default that might break later.

Errors

Use thiserror for a single crate-level error enum, with variants by source. Mark public error enums #[non_exhaustive]. Never use anyhow in a library crate.

Test support

Ship a TestClient under a testing feature so users can unit-test handlers against your broker in-process. The test client does core routing only: it dispatches published messages to matching subscribers and treats ack/nack as effectively a no-op. Do not simulate broker-specific semantics (durable cursors, redelivery timers, offsets, dead-letter routing) in it; those are verified end to end against a real server. See Testing for the user-facing side, and Conformance to prove the implementation.