Testing¶
The testing feature ships RedisTestBroker / RedisTestClient, a handler-stub dispatcher that
routes by exact stream key with no server. It reproduces routing, ack/nack, and headers, and passes
the framework's conformance suite. It does not simulate consumer-group cursors, XAUTOCLAIM
redelivery, trimming, or dead-letter routing - exercise those against a real Redis server (see the
crate's integration_fred tests and docker-compose.test.yml).
Unit-testing a handler¶
Because a #[subscriber] handler is wired through a RustStream app, the most realistic in-process
test builds the same app around a RedisTestBroker and drives publishes through the test client.
The service runs until the test signals shutdown.
Business-logic test¶
A real handler validates input, persists valid messages through a repository connector, and drops invalid ones. The handler has no knowledge of the test harness.
/// A repository connector. In production this would wrap a real database client;
/// the test uses the same connector with an in-memory store so the handler stays test-agnostic.
#[derive(Clone, Default)]
struct PaymentRepository {
payments: Arc<Mutex<Vec<Payment>>>,
}
impl PaymentRepository {
async fn save(&self, payment: Payment) {
self.payments.lock().await.push(payment);
}
async fn count(&self) -> usize {
self.payments.lock().await.len()
}
async fn contains(&self, id: u64) -> bool {
self.payments.lock().await.iter().any(|p| p.id == id)
}
}
/// A real production handler: validate the message, persist it, or drop it on validation failure.
#[subscriber(
RedisStream::new("payments")
.group("workers")
)]
async fn process_payment(payment: &Payment, ctx: &mut Context<'_>) -> HandlerResult {
if payment.amount == 0 {
// Invalid message: do not requeue, drop it.
return HandlerResult::drop();
}
ctx.state()
.get::<PaymentRepository>()
.expect("payment repository")
.save(payment.clone())
.await;
HandlerResult::ack()
}
The test publishes a valid payment and an invalid payment, then asserts that only the valid one was saved:
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();
let repository = PaymentRepository::default();
let app = RustStream::new(AppInfo::new("test", "0.1.0"))
.insert_state(repository.clone())
.with_broker(broker, |b| {
b.include(process_payment);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
// Valid payment is saved.
client
.publish("payments", br#"{"id":1,"user_id":42,"amount":100}"#)
.await?;
// Invalid payment (amount == 0) is dropped.
client
.publish("payments", br#"{"id":2,"user_id":42,"amount":0}"#)
.await?;
// Wait until the valid payment is persisted.
let deadline = Duration::from_secs(2);
let start = std::time::Instant::now();
while !repository.contains(1).await && start.elapsed() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(repository.contains(1).await, "valid payment was not saved");
assert!(
!repository.contains(2).await,
"invalid payment should have been dropped"
);
assert_eq!(repository.count().await, 1);
task.await??;
In your own crate you usually copy the test body into a #[tokio::test] inside a #[cfg(test)]
module:
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn valid_payment_is_saved_and_invalid_is_dropped() {
let client = RedisTestClient::start().await.unwrap();
let broker = client.broker().clone();
let repository = PaymentRepository::default();
let app = RustStream::new(AppInfo::new("test", "0.1.0"))
.insert_state(repository.clone())
.with_broker(broker, |b| {
b.include(process_payment);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
client
.publish("payments", br#"{"id":1,"user_id":42,"amount":100}"#)
.await
.unwrap();
client
.publish("payments", br#"{"id":2,"user_id":42,"amount":0}"#)
.await
.unwrap();
let deadline = Duration::from_secs(2);
let start = std::time::Instant::now();
while !repository.contains(1).await && start.elapsed() < deadline {
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(repository.contains(1).await);
assert!(!repository.contains(2).await);
assert_eq!(repository.count().await, 1);
task.await.unwrap().unwrap();
}
}
Transport-specific examples¶
#[subscriber(
RedisStream::new("events")
.group("workers")
)]
async fn handle_stream_event(payment: &Payment) -> HandlerResult {
println!("stream event {}", payment.id);
HandlerResult::Ack
}
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();
let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker, |b| {
b.include(handle_stream_event);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
client
.publish("events", br#"{"id":1,"user_id":42,"amount":100}"#)
.await?;
task.await??;
#[subscriber(
RedisList::new("jobs")
.reliable()
)]
async fn handle_list_job(payment: &Payment) -> HandlerResult {
println!("list job {}", payment.id);
HandlerResult::Ack
}
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();
let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker, |b| {
b.include(handle_list_job);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
client
.publish("jobs", br#"{"id":1,"user_id":42,"amount":100}"#)
.await?;
task.await??;
#[subscriber(RedisPubSub::new("notifications"))]
async fn handle_pubsub_notification(payment: &Payment) -> HandlerResult {
println!("pubsub notification {}", payment.id);
HandlerResult::Ack
}
let client = RedisTestClient::start().await?;
let broker = client.broker().clone();
let app = RustStream::new(AppInfo::new("test", "0.1.0")).with_broker(broker, |b| {
b.include(handle_pubsub_notification);
});
let task = tokio::spawn(async move {
app.run_until(tokio::time::sleep(Duration::from_millis(500)))
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
client
.publish("notifications", br#"{"id":1,"user_id":42,"amount":100}"#)
.await?;
task.await??;
Conformance suite¶
Run the framework's full conformance suite against the stub broker: