Skip to content

Metrics

The metrics feature collects Prometheus metrics for consumed and published messages. It is built on the prometheus crate directly and exposes the data in the Prometheus exposition format.

ruststream = { version = "0.4", features = ["macros", "memory", "metrics"] }

Wiring it up

Create a Metrics, install its consume and publish layers, and keep the handle to export later:

let metrics = Metrics::new()?;
let broker = MemoryBroker::new();
let ingest = broker.publisher();
let app = RustStream::new(AppInfo::new("orders", "0.1.0"))
    .layer(metrics.consume_layer())
    .publish_layer(metrics.publish_layer())
    .with_broker(broker, |b| {
        let replies = TypedPublisher::new(b.broker().publisher());
        b.include_publishing(confirm, replies);
    });

consume_layer records every handled message; publish_layer records every published message. To collect into an existing registry instead of a fresh one, use Metrics::with_registry(registry).

Metrics emitted

Metric Type Labels
ruststream_messages_consumed_total counter name, status
ruststream_consume_duration_seconds histogram name
ruststream_messages_published_total counter name, status

name is the subscription or destination name; status is the outcome (ack or nack for consume; ok or error for publish).

Exporting

export renders the current values in the Prometheus exposition format:

let body = metrics.export()?;

Hosting is your responsibility, as with AsyncAPI: serve export() from a /metrics route in your own HTTP stack, or push it to a gateway. metrics.registry() returns the underlying prometheus::Registry if you want to register your own collectors alongside RustStream's or use an existing exporter.

A complete server

The metrics_http example serves /metrics with axum and publishes orders through a /orders route, so an HTTP client drives the counters. Run it with cargo run --example metrics_http --features macros,memory,metrics, then:

curl -X POST http://127.0.0.1:8080/orders -d '{"id":1,"quantity":3}'
curl http://127.0.0.1:8080/metrics
//! Serve Prometheus metrics over HTTP and drive them from an HTTP client.
//!
//! ```text
//! cargo run --example metrics_http --features macros,memory,metrics
//! ```
//!
//! Publish an order, then read the metrics:
//!
//! ```text
//! curl -X POST http://127.0.0.1:8080/orders -d '{"id":1,"quantity":3}'
//! curl http://127.0.0.1:8080/metrics
//! ```
//!
//! Each published order is consumed (incrementing the consume counter) and replied to on
//! `confirmations` through the metrics publish layer (incrementing the publish counter).

use std::future::pending;
use std::sync::Arc;

use axum::Router;
use axum::body::Bytes;
use axum::extract::State;
use axum::routing::{get, post};
use ruststream::memory::{MemoryBroker, MemoryPublisher};
use ruststream::metrics::Metrics;
use ruststream::runtime::{AppInfo, RustStream, TypedPublisher};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize)]
struct Order {
    id: u64,
    quantity: u32,
}

#[derive(Debug, Serialize)]
struct Confirmation {
    id: u64,
    accepted: bool,
}

#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
    Confirmation {
        id: order.id,
        accepted: order.quantity > 0,
    }
}

struct AppState {
    metrics: Metrics,
    ingest: MemoryPublisher,
}

async fn publish_order(State(state): State<Arc<AppState>>, body: Bytes) -> &'static str {
    let _ = state
        .ingest
        .publish(OutgoingMessage::new("orders", body.as_ref()))
        .await;
    "published\n"
}

async fn serve_metrics(State(state): State<Arc<AppState>>) -> String {
    state.metrics.export().unwrap_or_default()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let metrics = Metrics::new()?;
    let broker = MemoryBroker::new();
    let ingest = broker.publisher();
    let app = RustStream::new(AppInfo::new("orders", "0.1.0"))
        .layer(metrics.consume_layer())
        .publish_layer(metrics.publish_layer())
        .with_broker(broker, |b| {
            let replies = TypedPublisher::new(b.broker().publisher());
            b.include_publishing(confirm, replies);
        });

    // Run the service in the background; it shares the metric collectors with the HTTP state.
    tokio::spawn(async move {
        let _ = app.run_until(pending::<()>()).await;
    });

    let state = Arc::new(AppState { metrics, ingest });
    let router = Router::new()
        .route("/orders", post(publish_order))
        .route("/metrics", get(serve_metrics))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
    println!("metrics on http://127.0.0.1:8080/metrics");
    axum::serve(listener, router).await?;
    Ok(())
}