Memory¶
The memory feature provides MemoryBroker, an in-process broker. It needs no external service,
which makes it ideal for examples, unit tests, and prototypes; the CLI scaffold uses it by default
so a fresh project runs with zero dependencies.
Semantics¶
- Exact name matching. A subscription to
ordersreceives messages published toorders; no wildcard or pattern matching (those are broker-specific; the NATS test broker has real subject matching). - Fan-out. Every subscriber of a name receives every message published to it after the subscription opened; messages published earlier are not buffered.
- Ack is a no-op;
nack(requeue: true)redelivers the same payload to the same subscriber. - Cheap to clone. Clones share state, so a clone held by a test observes everything the app publishes.
It does core routing only and does not emulate any real broker's delivery semantics (durable cursors, redelivery timers, partitions, dead-letter routing).
Capabilities¶
Every capability trait has a native implementation - a first-class feature of the broker's own in-process semantics, not a simulation of another broker's:
- Request / reply.
broker.requester()returns aMemoryRequesterwhoserequestpublishes with a unique in-process inbox in thereply-toheader and resolves on the first message delivered there. A responder readsreply-tofrom the request and publishes its reply to that name. Requests nobody answers fail withRequestError::Timeout. - Batches.
MemorySubscriberimplementsBatchSubscriber: a batch is the first awaited delivery plus everything already buffered, capped byset_batch_limit(default 64). Partial batches ship immediately, so no deadline timer is involved. - Transactions.
MemoryPublisherimplementsTransactionalPublisher: publishes betweenbegin_transactionandcommitare buffered and fan out together in publish order;abortdiscards them. Clones of a publisher handle do not share its transaction. - Partition keys.
MemoryMessageimplementsPartitioned, reading the key from the well-knownpartition-keyheader (memory::PARTITION_KEY_HEADER).
DescribeServer stays deliberately unimplemented: the in-memory broker has no network
coordinates, and that asymmetry is part of the contract documentation.
Subscription source¶
MemoryBroker implements Subscribe, so #[subscriber("orders")] works directly. Its descriptor
type is MemorySource - it carries no extra options (the in-memory broker has none) but keeps the
descriptor form uniform across brokers. From the
routed_service
example:
use ruststream::memory::MemorySource;
#[subscriber(MemorySource::new("orders"), publish("confirmations"))]
pub(crate) async fn confirm(
order: &Order,
ctx: &mut Context<'_>,
) -> Result<Confirmation, HandlerResult> {
let repo = ctx
.state()
.get::<Repository>()
.expect("repository set in on_startup");
tracing::debug!(
order = order.id,
customer = %order.customer,
item = %order.item,
"confirming order"
);
match repo.record_order(order.id).await {
Ok(()) => Ok(Confirmation {
order_id: order.id,
accepted: order.quantity > 0,
}),
Err(e) if e.is_transient() => {
tracing::warn!(order = order.id, "store busy, asking for redelivery");
Err(HandlerResult::retry())
}
Err(e) => {
tracing::error!(order = order.id, error = %e, "dropping order");
Err(HandlerResult::drop())
}
}
}
As a test client¶
MemoryBroker implements the TestClient trait itself: MemoryBroker::start() gives a client
whose publish and expect_published drive and observe the broker from the outside. See
Testing for the full pattern.