Skip to content

Testing

RustStream services are tested at two levels:

  1. In-process tests drive your real handlers, middleware, and codecs through an in-memory broker - no server, no docker, no network. This is the default test path and it covers handler logic end to end: decode, dispatch, ack, and any replies your handlers publish.
  2. Integration tests run against a real broker, gated behind an environment variable, and cover the semantics only a real server has (durable consumers, redelivery timers, partitions).

What the test client does and does not do

An in-memory test broker is a handler-stub dispatcher: publishing fans the message out to the subscribers whose subject matches, runs your handler, and treats ack/nack as broker-side no-ops (a nack with requeue redelivers the same payload to the same subscriber). It does not model JetStream durable cursors, ack_wait redelivery, max_ack_pending, retention, Kafka offsets or consumer groups, or RabbitMQ exchanges and dead-letter routing. Those are real-broker concerns; test them in the integration suite.

The TestClient contract

Every in-memory test transport implements the TestClient trait from ruststream::testing:

Method Purpose
start() starts a fresh, isolated in-memory broker
broker() the broker handle, for registering with a RustStream app
publish(name, payload) publishes as if from an external producer
subscribe(name) opens a raw subscription, for asserting on deliveries directly
publisher() a bound publisher, for publishing with headers or in a loop
expect_published(name, count, timeout) waits until count messages were published to name
shutdown() tears the in-memory broker down

expect_published resolves as soon as count messages have been observed on name; when the timeout elapses first it returns the messages observed so far, so assert on the returned messages, not just on Ok. It records all publishes - the ones your test sends and the ones your handlers send - which is what makes reply assertions one-liners.

Two implementations ship today:

  • MemoryBroker (the memory feature of ruststream) implements TestClient itself - exact name matching, broker-agnostic.
  • ruststream-nats ships NatsTestClient / NatsTestBroker under its testing feature - real NATS subject matching (* and > wildcards), request-reply, header propagation.

Testing handlers with MemoryBroker

The handlers, middleware, and codecs under test are the production ones; only the broker is swapped. The test starts the in-memory broker, runs the app on it, publishes a message as an external producer would, and asserts on what the handler published back.

The handler under test (in a real service it lives in your handler module and the test imports it):

use ruststream::subscriber;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize)]
struct Order {
    id: u64,
    quantity: u32,
}

#[derive(Debug, Serialize)]
struct Confirmation {
    id: u64,
    accepted: bool,
}

#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
    Confirmation {
        id: order.id,
        accepted: order.quantity > 0,
    }
}

The test:

use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, RustStream, TypedPublisher};
use ruststream::testing::TestClient;
use tokio::sync::Notify;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn confirms_valid_orders() {
    let client = MemoryBroker::start().await.expect("start test broker");

    // The app under test: production wiring, in-memory broker. Cloning the broker
    // shares its state, so the client observes everything the app does.
    let ready = Arc::new(Notify::new());
    let on_ready = Arc::clone(&ready);
    let app = RustStream::new(AppInfo::new("orders-test", "0.0.0"))
        .after_startup(move |_state| async move {
            on_ready.notify_one();
            Ok::<_, Infallible>(())
        })
        .with_broker(client.broker().clone(), |b| {
            let replies = TypedPublisher::new(b.broker().publisher());
            b.include_publishing(confirm, replies);
        });

    let stop = Arc::new(Notify::new());
    let stop_signal = Arc::clone(&stop);
    let service = tokio::spawn(app.run_until(async move { stop_signal.notified().await }));

    // Subscriptions are not buffered: wait for after_startup (it runs once they are open),
    // then publish as an external producer.
    ready.notified().await;
    client
        .publish("orders", br#"{"id":1,"quantity":2}"#)
        .await
        .expect("publish");

    // The handler decoded the order and published a confirmation.
    let replies = client
        .expect_published("confirmations", 1, Duration::from_secs(1))
        .await
        .expect("expect_published");
    assert_eq!(replies.len(), 1, "expected one confirmation");
    let confirmation: serde_json::Value =
        serde_json::from_slice(replies[0].payload()).expect("valid JSON reply");
    assert_eq!(confirmation["id"], 1);
    assert_eq!(confirmation["accepted"], true);

    stop.notify_one();
    service.await.expect("join").expect("clean shutdown");
}

This test runs in this repository's CI

The code above is embedded from tests/doc_testing_memory.rs, which cargo test --all-features runs on every change - the example cannot silently rot.

Three details carry the pattern:

  • Readiness. The in-memory broker does not buffer: a message published before the subscription opens is lost. after_startup runs once every subscription is open, so a Notify signalled there is the cheapest reliable "handlers are live" gate (no sleeps, no polling).
  • Shutdown. run_until resolves when the supplied future does, so the test owns the service's lifetime; service.await then surfaces any startup or shutdown error.
  • Assertions. For a handler that publishes, assert through expect_published. For a handler with side effects only, assert on its observable effect - a value pushed to a channel, a counter, a stubbed repository inserted via shared state.

Testing handlers against in-memory NATS

For a NATS service, test against the NATS-flavoured test broker instead, so subject semantics are the real ones: orders.* matches one token, orders.> matches the tail, queue-group names are accepted, and request-reply works. Enable the broker crate's testing feature in your dev-dependencies (see the ruststream-nats documentation for the dependency line and current version).

The same pattern as above, with a wildcard subscriber that audits every order event:

use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use ruststream::runtime::{AppInfo, RustStream, TypedPublisher};
use ruststream::subscriber;
use ruststream::testing::TestClient;
use ruststream_nats::testing::NatsTestClient;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;

#[derive(Debug, Deserialize)]
struct Event {
    id: u64,
}

#[derive(Debug, Serialize)]
struct AuditRecord {
    id: u64,
}

#[subscriber("orders.*", publish("audit"))]
async fn audit(event: &Event) -> AuditRecord {
    AuditRecord { id: event.id }
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wildcard_subscriber_audits_all_order_events() {
    let client = NatsTestClient::start().await.expect("start test broker");

    let ready = Arc::new(Notify::new());
    let on_ready = Arc::clone(&ready);
    let app = RustStream::new(AppInfo::new("audit-test", "0.0.0"))
        .after_startup(move |_state| async move {
            on_ready.notify_one();
            Ok::<_, Infallible>(())
        })
        .with_broker(client.broker().clone(), |b| {
            let records = TypedPublisher::new(b.broker().publisher());
            b.include_publishing(audit, records);
        });

    let stop = Arc::new(Notify::new());
    let stop_signal = Arc::clone(&stop);
    let service = tokio::spawn(app.run_until(async move { stop_signal.notified().await }));

    ready.notified().await;
    client
        .publish("orders.created", br#"{"id":1}"#)
        .await
        .expect("publish");
    client
        .publish("orders.updated", br#"{"id":2}"#)
        .await
        .expect("publish");

    // Both subjects matched "orders.*", so the handler ran twice and audited both.
    let records = client
        .expect_published("audit", 2, Duration::from_secs(1))
        .await
        .expect("expect_published");
    assert_eq!(records.len(), 2, "expected two audit records");

    stop.notify_one();
    service.await.expect("join").expect("clean shutdown");
}

The test broker also implements RequestReply, so a handler that responds to requests is testable in-process: publish with publisher().request(..) and assert on the reply.

Handlers declared with a JetStream descriptor

A descriptor like SubscribeOptions::new("orders.*").jetstream("ORDERS").durable("workers") names a real JetStream consumer; it implements SubscriptionSource for the real NatsBroker only, because durable names and ack waits have no in-process meaning. To unit-test that handler's logic, mount the same definition on the test broker with an explicit by-name source - include_on overrides the macro's source (the codec resolves the same way as for include):

use ruststream::Name;

.with_broker(client.broker().clone(), |b| {
    b.include_on(Name::new("orders.created"), handle);
})

What that mount does not cover - durable resume, ack_wait redelivery, max_ack_pending - is exactly what the integration suite is for.

Integration tests against a real broker

Behaviour that depends on real broker semantics belongs in a separate suite gated behind an environment variable, so the default cargo test stays fast and offline:

tests/integration_nats.rs
fn test_url() -> Option<String> {
    std::env::var("NATS_TEST_URL").ok()
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn durable_consumer_resumes_after_restart() {
    let Some(url) = test_url() else {
        eprintln!("skipping: set NATS_TEST_URL to run");
        return;
    };
    // connect NatsBroker::new(url), drive the real JetStream consumer ...
}

Run it explicitly against a live server:

docker run -d -p 4222:4222 nats:latest -js
NATS_TEST_URL=nats://127.0.0.1:4222 cargo test --test integration_nats

This mirrors faststream's with_real=True split: handler logic on the in-memory path, broker semantics on the real one. Keep both suites over the same handler modules so the production code has a single source of truth.

For broker authors

If you are implementing a broker, ship a TestClient under a testing feature and prove it with the conformance harness: run_suite checks the routing surface of the test client, lifecycle checks the lazy-startup contract against the real transport. See Conformance.