Writing a broker¶
A broker is an independent crate that implements the core traits. It depends on ruststream with
default features off, so it pulls in the trait surface and runtime without the bundled JSON codec or
any other broker:
This page is the contract. Implement the required traits, expose your own Config, add capability
traits for the features your broker supports, and prove the result with the
conformance harness. For a complete implementation on a real client, see the
worked NATS example.
The required traits¶
Broker¶
The broker is pure lifecycle: connect and shut down. It carries no subscriber or publisher type, so a single application can mix broker kinds.
pub trait Broker: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn connect(&self) -> Result<(), Self::Error>;
async fn shutdown(&self) -> Result<(), Self::Error>;
}
shutdown must never block or panic; do all fallible teardown here and return a Result.
Subscribe¶
Implement Subscribe to support subscribing by name. This is what #[subscriber("name")] uses.
pub trait Subscribe: Broker {
type Subscriber: Subscriber;
async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error>;
}
Subscriber¶
A subscriber is a Stream of incoming messages. Back-pressure comes for free from the stream.
pub trait Subscriber: Send {
type Message: IncomingMessage;
type Error: std::error::Error + Send + Sync + 'static;
fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_;
}
stream takes &mut self, so any state buffered between polls lives behind the mutable borrow,
which keeps it cancel-safe.
IncomingMessage¶
A delivered message exposes its payload and headers, and is acked or nacked. Ack consumes self, so
double-ack is a compile error.
pub trait IncomingMessage: Send + Sync {
fn payload(&self) -> &[u8];
fn headers(&self) -> &Headers;
async fn ack(self) -> Result<(), AckError>;
async fn nack(self, requeue: bool) -> Result<(), AckError>;
// Defaulted: a plain nack(true). Override when the transport has native
// delayed redelivery (JetStream NAK with delay); handlers reach it through
// HandlerResult::retry_after.
async fn nack_after(self, delay: Duration) -> Result<(), AckError>;
// Defaulted: None. Override (with the Partitioned capability) to feed the
// runtime's keyed worker lanes, workers(n, by_key).
fn partition_key(&self) -> Option<&[u8]>;
}
The two defaulted methods are how optional broker behaviour degrades gracefully: a broker that
overrides neither still works with every runtime feature, with retry_after falling back to an
immediate requeue and keyed lanes rotating keyless messages.
Publisher¶
pub trait Publisher: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error>;
}
OutgoingMessage borrows its name and payload, so publishing does not force an allocation.
Subscription sources¶
Subscribe covers the by-name case. When a subscription needs broker-specific options (a consumer
group, a durable name, a delivery policy), expose a descriptor type that implements
SubscriptionSource:
pub trait SubscriptionSource<B: Broker> {
type Subscriber: Subscriber;
fn name(&self) -> &str;
fn subscribe(self, broker: &B) -> impl Future<Output = Result<Self::Subscriber, B::Error>> + Send;
}
Give the descriptor an associated constructor (OrdersStream::new(..)) rather than a free function,
so users can name it directly in the decorator: #[subscriber(OrdersStream::new("orders", "workers"))].
The macro reads the type out of the constructor call, and also accepts a builder chain on it
(#[subscriber(OrdersStream::new("orders").durable("workers"))]) as long as each method returns
Self. Because type Subscriber lives on the source, one broker can offer several subscription
kinds (pub/sub versus streams) with different subscriber types - or, as the
NATS example does, serve them all from one descriptor that branches internally.
Capability traits¶
Implement only the capabilities your broker supports; none are part of the mandatory interface.
| Trait | For brokers that support |
|---|---|
BatchSubscriber |
receiving messages in batches |
TransactionalPublisher |
begin / commit / abort around publishes |
RequestReply |
native request-reply (NATS yes, Kafka no) |
Partitioned |
a partition key on outgoing messages |
DescribeServer |
reporting a ServerSpec for AsyncAPI |
Config and defaults¶
Your crate owns its Config. The core carries no broker-specific config, which is what keeps an
upstream change scoped to one broker crate. If a config field has no sane default, do not implement
Default for it; force the user to set it explicitly rather than shipping a default that might break
later.
Errors¶
Use thiserror for a single crate-level error enum, with variants by source. Mark public error
enums #[non_exhaustive]. Never use anyhow in a library crate.
Test support¶
Ship a TestClient under a testing feature so users can unit-test handlers against your broker
in-process. The test client does core routing only: it dispatches published messages to matching
subscribers and treats ack/nack as effectively a no-op. Do not simulate broker-specific semantics
(durable cursors, redelivery timers, offsets, dead-letter routing) in it; those are verified end to
end against a real server. See Testing for the user-facing side, and
Conformance to prove the implementation.