Testing¶
RustStream services are tested at two levels:
- In-process tests drive your real handlers, middleware, and codecs through an in-memory broker - no server, no docker, no network. This is the default test path and it covers handler logic end to end: decode, dispatch, ack, and any replies your handlers publish.
- Integration tests run against a real broker, gated behind an environment variable, and cover the semantics only a real server has (durable consumers, redelivery timers, partitions).
What the test client does and does not do
An in-memory test broker is a handler-stub dispatcher: publishing fans the message out to the
subscribers whose subject matches, runs your handler, and treats ack/nack as broker-side no-ops
(a nack with requeue redelivers the same payload to the same subscriber). It does not model
JetStream durable cursors, ack_wait redelivery, max_ack_pending, retention, Kafka offsets or
consumer groups, or RabbitMQ exchanges and dead-letter routing. Those are real-broker concerns;
test them in the integration suite.
The TestClient contract¶
Every in-memory test transport implements the TestClient trait from ruststream::testing:
| Method | Purpose |
|---|---|
start() |
starts a fresh, isolated in-memory broker |
broker() |
the broker handle, for registering with a RustStream app |
publish(name, payload) |
publishes as if from an external producer |
subscribe(name) |
opens a raw subscription, for asserting on deliveries directly |
publisher() |
a bound publisher, for publishing with headers or in a loop |
expect_published(name, count, timeout) |
waits until count messages were published to name |
shutdown() |
tears the in-memory broker down |
expect_published resolves as soon as count messages have been observed on name; when the
timeout elapses first it returns the messages observed so far, so assert on the returned
messages, not just on Ok. It records all publishes - the ones your test sends and the ones
your handlers send - which is what makes reply assertions one-liners.
Two implementations ship today:
MemoryBroker(thememoryfeature ofruststream) implementsTestClientitself - exact name matching, broker-agnostic.ruststream-natsshipsNatsTestClient/NatsTestBrokerunder itstestingfeature - real NATS subject matching (*and>wildcards), request-reply, header propagation.
Testing handlers with MemoryBroker¶
The handlers, middleware, and codecs under test are the production ones; only the broker is swapped. The test starts the in-memory broker, runs the app on it, publishes a message as an external producer would, and asserts on what the handler published back.
The handler under test (in a real service it lives in your handler module and the test imports it):
use ruststream::subscriber;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
quantity: u32,
}
#[derive(Debug, Serialize)]
struct Confirmation {
id: u64,
accepted: bool,
}
#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
Confirmation {
id: order.id,
accepted: order.quantity > 0,
}
}
The test:
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, RustStream, TypedPublisher};
use ruststream::testing::TestClient;
use tokio::sync::Notify;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn confirms_valid_orders() {
let client = MemoryBroker::start().await.expect("start test broker");
// The app under test: production wiring, in-memory broker. Cloning the broker
// shares its state, so the client observes everything the app does.
let ready = Arc::new(Notify::new());
let on_ready = Arc::clone(&ready);
let app = RustStream::new(AppInfo::new("orders-test", "0.0.0"))
.after_startup(move |_state| async move {
on_ready.notify_one();
Ok::<_, Infallible>(())
})
.with_broker(client.broker().clone(), |b| {
let replies = TypedPublisher::new(b.broker().publisher());
b.include_publishing(confirm, replies);
});
let stop = Arc::new(Notify::new());
let stop_signal = Arc::clone(&stop);
let service = tokio::spawn(app.run_until(async move { stop_signal.notified().await }));
// Subscriptions are not buffered: wait for after_startup (it runs once they are open),
// then publish as an external producer.
ready.notified().await;
client
.publish("orders", br#"{"id":1,"quantity":2}"#)
.await
.expect("publish");
// The handler decoded the order and published a confirmation.
let replies = client
.expect_published("confirmations", 1, Duration::from_secs(1))
.await
.expect("expect_published");
assert_eq!(replies.len(), 1, "expected one confirmation");
let confirmation: serde_json::Value =
serde_json::from_slice(replies[0].payload()).expect("valid JSON reply");
assert_eq!(confirmation["id"], 1);
assert_eq!(confirmation["accepted"], true);
stop.notify_one();
service.await.expect("join").expect("clean shutdown");
}
This test runs in this repository's CI
The code above is embedded from
tests/doc_testing_memory.rs,
which cargo test --all-features runs on every change - the example cannot silently rot.
Three details carry the pattern:
- Readiness. The in-memory broker does not buffer: a message published before the subscription
opens is lost.
after_startupruns once every subscription is open, so aNotifysignalled there is the cheapest reliable "handlers are live" gate (no sleeps, no polling). - Shutdown.
run_untilresolves when the supplied future does, so the test owns the service's lifetime;service.awaitthen surfaces any startup or shutdown error. - Assertions. For a handler that publishes, assert through
expect_published. For a handler with side effects only, assert on its observable effect - a value pushed to a channel, a counter, a stubbed repository inserted via shared state.
Testing handlers against in-memory NATS¶
For a NATS service, test against the NATS-flavoured test broker instead, so subject semantics are
the real ones: orders.* matches one token, orders.> matches the tail, queue-group names are
accepted, and request-reply works. Enable the broker crate's testing feature in your
dev-dependencies (see the ruststream-nats documentation
for the dependency line and current version).
The same pattern as above, with a wildcard subscriber that audits every order event:
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use ruststream::runtime::{AppInfo, RustStream, TypedPublisher};
use ruststream::subscriber;
use ruststream::testing::TestClient;
use ruststream_nats::testing::NatsTestClient;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
#[derive(Debug, Deserialize)]
struct Event {
id: u64,
}
#[derive(Debug, Serialize)]
struct AuditRecord {
id: u64,
}
#[subscriber("orders.*", publish("audit"))]
async fn audit(event: &Event) -> AuditRecord {
AuditRecord { id: event.id }
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wildcard_subscriber_audits_all_order_events() {
let client = NatsTestClient::start().await.expect("start test broker");
let ready = Arc::new(Notify::new());
let on_ready = Arc::clone(&ready);
let app = RustStream::new(AppInfo::new("audit-test", "0.0.0"))
.after_startup(move |_state| async move {
on_ready.notify_one();
Ok::<_, Infallible>(())
})
.with_broker(client.broker().clone(), |b| {
let records = TypedPublisher::new(b.broker().publisher());
b.include_publishing(audit, records);
});
let stop = Arc::new(Notify::new());
let stop_signal = Arc::clone(&stop);
let service = tokio::spawn(app.run_until(async move { stop_signal.notified().await }));
ready.notified().await;
client
.publish("orders.created", br#"{"id":1}"#)
.await
.expect("publish");
client
.publish("orders.updated", br#"{"id":2}"#)
.await
.expect("publish");
// Both subjects matched "orders.*", so the handler ran twice and audited both.
let records = client
.expect_published("audit", 2, Duration::from_secs(1))
.await
.expect("expect_published");
assert_eq!(records.len(), 2, "expected two audit records");
stop.notify_one();
service.await.expect("join").expect("clean shutdown");
}
The test broker also implements RequestReply, so a handler that responds to requests is testable
in-process: publish with publisher().request(..) and assert on the reply.
Handlers declared with a JetStream descriptor¶
A descriptor like SubscribeOptions::new("orders.*").jetstream("ORDERS").durable("workers") names
a real JetStream consumer; it implements SubscriptionSource for the real NatsBroker only,
because durable names and ack waits have no in-process meaning. To unit-test that handler's logic,
mount the same definition on the test broker with an explicit by-name source - include_on
overrides the macro's source (the codec resolves the same way as for include):
use ruststream::Name;
.with_broker(client.broker().clone(), |b| {
b.include_on(Name::new("orders.created"), handle);
})
What that mount does not cover - durable resume, ack_wait redelivery, max_ack_pending - is
exactly what the integration suite is for.
Integration tests against a real broker¶
Behaviour that depends on real broker semantics belongs in a separate suite gated behind an
environment variable, so the default cargo test stays fast and offline:
fn test_url() -> Option<String> {
std::env::var("NATS_TEST_URL").ok()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn durable_consumer_resumes_after_restart() {
let Some(url) = test_url() else {
eprintln!("skipping: set NATS_TEST_URL to run");
return;
};
// connect NatsBroker::new(url), drive the real JetStream consumer ...
}
Run it explicitly against a live server:
docker run -d -p 4222:4222 nats:latest -js
NATS_TEST_URL=nats://127.0.0.1:4222 cargo test --test integration_nats
This mirrors faststream's with_real=True split: handler logic on the in-memory path, broker
semantics on the real one. Keep both suites over the same handler modules so the production code
has a single source of truth.
For broker authors¶
If you are implementing a broker, ship a TestClient under a testing feature and prove it with
the conformance harness: run_suite checks the routing surface of the test client, lifecycle
checks the lazy-startup contract against the real transport. See
Conformance.