Skip to content

Testing

The testing feature ships RedisTestBroker / RedisTestClient, a handler-stub dispatcher that routes by exact stream key with no server. It reproduces routing, ack/nack, and headers, and passes the framework's conformance suite. It does not simulate consumer-group cursors, XAUTOCLAIM redelivery, trimming, or dead-letter routing - exercise those against a real Redis server (see the crate's integration_fred tests and docker-compose.test.yml).

[dev-dependencies]
ruststream-fred = { version = "0.4", features = ["testing"] }

Unit-testing a handler

Because a #[subscriber] handler is wired through a RustStream app, the most realistic in-process test builds the same app around a RedisTestBroker and drives publishes through the test client. The service runs until the test signals shutdown.

Business-logic test

A real handler validates input, persists valid messages through a repository connector, and drops invalid ones. The handler has no knowledge of the test harness.

/// A repository connector. In production this would wrap a real database client;
/// the test uses the same connector with an in-memory store so the handler stays test-agnostic.
#[derive(Clone, Default)]
struct PaymentRepository {
    payments: Arc<Mutex<Vec<Payment>>>,
}

impl PaymentRepository {
    async fn save(&self, payment: Payment) {
        self.payments.lock().await.push(payment);
    }

    async fn count(&self) -> usize {
        self.payments.lock().await.len()
    }

    async fn contains(&self, id: u64) -> bool {
        self.payments.lock().await.iter().any(|p| p.id == id)
    }
}
/// A real production handler: validate the message, persist it, or drop it on validation failure.
#[subscriber(
    RedisStream::new("payments")
        .group("workers")
)]
async fn process_payment(payment: &Payment, ctx: &mut Context<'_>) -> HandlerResult {
    if payment.amount == 0 {
        // Invalid message: do not requeue, drop it.
        return HandlerResult::drop();
    }

    ctx.state()
        .get::<PaymentRepository>()
        .expect("payment repository")
        .save(payment.clone())
        .await;

    HandlerResult::ack()
}

The test publishes a valid payment and an invalid payment, then asserts that only the valid one was saved:

let client = RedisTestClient::start().await?;
let broker = client.broker().clone();
let repository = PaymentRepository::default();

let app = RustStream::new(AppInfo::new("test", "0.1.0"))
    .insert_state(repository.clone())
    .with_broker(broker, |b| {
        b.include(process_payment);
    });

let task = tokio::spawn(async move {
    app.run_until(tokio::time::sleep(Duration::from_millis(500)))
        .await
});

tokio::time::sleep(Duration::from_millis(50)).await;

// Valid payment is saved.
client
    .publish("payments", br#"{"id":1,"user_id":42,"amount":100}"#)
    .await?;
// Invalid payment (amount == 0) is dropped.
client
    .publish("payments", br#"{"id":2,"user_id":42,"amount":0}"#)
    .await?;

// Wait until the valid payment is persisted.
let deadline = Duration::from_secs(2);
let start = std::time::Instant::now();
while !repository.contains(1).await && start.elapsed() < deadline {
    tokio::time::sleep(Duration::from_millis(10)).await;
}

assert!(repository.contains(1).await, "valid payment was not saved");
assert!(
    !repository.contains(2).await,
    "invalid payment should have been dropped"
);
assert_eq!(repository.count().await, 1);

task.await??;

In your own crate you usually copy the test body into a #[tokio::test] inside a #[cfg(test)] module:

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn valid_payment_is_saved_and_invalid_is_dropped() {
        let client = RedisTestClient::start().await.unwrap();
        let broker = client.broker().clone();
        let repository = PaymentRepository::default();

        let app = RustStream::new(AppInfo::new("test", "0.1.0"))
            .insert_state(repository.clone())
            .with_broker(broker, |b| {
                b.include(process_payment);
            });

        let task = tokio::spawn(async move {
            app.run_until(tokio::time::sleep(Duration::from_millis(500)))
                .await
        });

        tokio::time::sleep(Duration::from_millis(50)).await;

        client
            .publish("payments", br#"{"id":1,"user_id":42,"amount":100}"#)
            .await
            .unwrap();
        client
            .publish("payments", br#"{"id":2,"user_id":42,"amount":0}"#)
            .await
            .unwrap();

        let deadline = Duration::from_secs(2);
        let start = std::time::Instant::now();
        while !repository.contains(1).await && start.elapsed() < deadline {
            tokio::time::sleep(Duration::from_millis(10)).await;
        }

        assert!(repository.contains(1).await);
        assert!(!repository.contains(2).await);
        assert_eq!(repository.count().await, 1);

        task.await.unwrap().unwrap();
    }
}

Transport-specific examples

#[subscriber(
    RedisStream::new("events")
        .group("workers")
)]
async fn handle_stream_event(payment: &Payment) -> HandlerResult {
    println!("stream event {}", payment.id);
    HandlerResult::Ack
}
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();

let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker, |b| {
    b.include(handle_stream_event);
});

let task = tokio::spawn(async move {
    app.run_until(tokio::time::sleep(Duration::from_millis(500)))
        .await
});

tokio::time::sleep(Duration::from_millis(50)).await;
client
    .publish("events", br#"{"id":1,"user_id":42,"amount":100}"#)
    .await?;
task.await??;
#[subscriber(
    RedisList::new("jobs")
        .reliable()
)]
async fn handle_list_job(payment: &Payment) -> HandlerResult {
    println!("list job {}", payment.id);
    HandlerResult::Ack
}
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();

let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker, |b| {
    b.include(handle_list_job);
});

let task = tokio::spawn(async move {
    app.run_until(tokio::time::sleep(Duration::from_millis(500)))
        .await
});

tokio::time::sleep(Duration::from_millis(50)).await;
client
    .publish("jobs", br#"{"id":1,"user_id":42,"amount":100}"#)
    .await?;
task.await??;
#[subscriber(RedisPubSub::new("notifications"))]
async fn handle_pubsub_notification(payment: &Payment) -> HandlerResult {
    println!("pubsub notification {}", payment.id);
    HandlerResult::Ack
}
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();

let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker, |b| {
    b.include(handle_pubsub_notification);
});

let task = tokio::spawn(async move {
    app.run_until(tokio::time::sleep(Duration::from_millis(500)))
        .await
});

tokio::time::sleep(Duration::from_millis(50)).await;
client
    .publish("notifications", br#"{"id":1,"user_id":42,"amount":100}"#)
    .await?;
task.await??;

Conformance suite

Run the framework's full conformance suite against the stub broker:

// The framework's conformance suite exercises routing, ack/nack, headers,
// and requeue against the in-memory test client - no Redis server required.
harness::run_suite(RedisTestClient::start).await;