Skip to content

Tutorial: build your first service

This tutorial builds an orders service from scratch, explaining each piece. It uses the in-memory broker so there is nothing external to run; swapping in a real broker is a one-line change covered at the end.

1. Create the crate

cargo new orders-service
cd orders-service
Cargo.toml
[package]
name = "orders-service"
version = "0.1.0"
edition = "2024"

[dependencies]
ruststream = { version = "0.4", features = ["macros", "memory", "json", "asyncapi"] }
serde = { version = "1", features = ["derive"] }

2. Define a message and a handler

A handler is an async fn whose first parameter is the decoded payload. The #[subscriber] macro turns it into a mountable definition named after the function.

src/orders.rs
use ruststream::runtime::HandlerResult;
use ruststream::subscriber;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize)]
pub(crate) struct Order {
    pub(crate) id: u64,
    pub(crate) quantity: u32,
}

#[subscriber("orders")]
pub(crate) async fn handle(order: &Order) -> HandlerResult {
    println!("order {} x{}", order.id, order.quantity);
    HandlerResult::Ack
}

A handler returns a HandlerResult: Ack, or a nack that drops or requeues the message. Returning () or Result<(), E> also works - they convert into a result (Ok acks, Err drops).

3. Wire it into an app

src/main.rs
mod orders;

use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, RustStream};

use crate::orders::handle;

#[ruststream::app]
fn app() -> RustStream {
    RustStream::new(AppInfo::new("orders", "0.1.0"))
        .with_broker(MemoryBroker::new(), |b| b.include(handle))
}

The macro turns handle into a value named after the function, so you import and pass it directly.

Codec defaults

include decodes with the default codec - json if enabled, otherwise cbor, otherwise msgpack - so it needs no codec argument. To decode with a different one everywhere, set it once with with_broker_codec(broker, codec, |b| ...). See Codecs for the full resolution rules.

Run it:

cargo run -- run

4. Reply to messages

To publish a reply, return the reply value and name the destination with publish(..):

src/orders.rs
#[derive(Debug, Serialize)]
pub(crate) struct Confirmation {
    pub(crate) id: u64,
    pub(crate) accepted: bool,
}

#[subscriber("orders", publish("confirmations"))]
pub(crate) async fn confirm(order: &Order) -> Confirmation {
    Confirmation {
        id: order.id,
        accepted: order.quantity > 0,
    }
}

Mount it with a publisher that carries the reply codec:

use ruststream::runtime::TypedPublisher;

// inside with_broker(...), with `confirm` imported from the orders module
let replies = TypedPublisher::new(b.broker().publisher());
b.include_publishing(confirm, replies);

See Publishing & replies for the full picture, including publishing from inside a handler.

5. Organize with a router

As handlers grow, keep them in their own module and collect them into a Router:

src/routes.rs
use ruststream::memory::MemoryBroker;
use ruststream::runtime::{Router, RouterDef, TypedPublisher};

use crate::orders;

pub(crate) fn orders(broker: &MemoryBroker) -> impl RouterDef<MemoryBroker> + use<> {
    let replies = TypedPublisher::new(broker.publisher());
    Router::new()
        .include_publishing(orders::confirm, replies)
        .include(orders::handle)
}
src/main.rs
mod orders;
mod routes;

use ruststream::memory::MemoryBroker;
use ruststream::runtime::{AppInfo, RustStream};

#[ruststream::app]
fn app() -> RustStream {
    RustStream::new(AppInfo::new("orders-service", "0.1.0")).with_broker(MemoryBroker::new(), |b| {
        let router = routes::orders(b.broker());
        b.include_router(router);
    })
}

6. Inspect the AsyncAPI document

cargo run -- asyncapi gen --yaml

Every subscriber becomes a channel and a receive operation; payload types that derive schemars::JsonSchema also contribute schemas. See AsyncAPI.

7. Swap in a real broker

Nothing above is tied to the in-memory broker. The handlers, router, and codecs are unchanged; only the broker construction differs. Add the broker crate as a dependency and swap the with_broker line:

use ruststream::memory::MemoryBroker;

.with_broker(MemoryBroker::new(), |b| {
    let router = routes::orders(b.broker());
    b.include_router(router);
})

use ruststream_nats::NatsBroker;

.with_broker(NatsBroker::new("nats://localhost:4222"), |b| {
    let router = routes::orders(b.broker());
    b.include_router(router);
})

Each broker crate documents its own Config. Subscriptions that need broker-specific options (consumer groups, durable names) use that broker's descriptor in the decorator, see broker-specific descriptors. The available brokers are listed under Brokers.

The complete service is a compiled example

Every snippet on this page is embedded from examples/tutorial in the repository, which CI builds on every change. Run it yourself with cargo run --example tutorial --features macros,memory,json -- run.

Next steps

  • Middleware - cross-cutting logic around handlers.
  • Lifespan - shared state and startup/shutdown hooks.
  • Testing - test the handlers you just wrote, in-process.
  • Metrics - Prometheus counters and histograms.