Metrics¶
The metrics feature collects Prometheus metrics for consumed and published messages. It is built on
the prometheus crate directly and exposes the data in the Prometheus exposition format.
Wiring it up¶
Create a Metrics, install its consume and publish layers, and keep the handle to export later:
let metrics = Metrics::new()?;
let broker = MemoryBroker::new();
let ingest = broker.publisher();
let app = RustStream::new(AppInfo::new("orders", "0.1.0"))
.layer(metrics.consume_layer())
.publish_layer(metrics.publish_layer())
.with_broker(broker, |b| {
let replies = TypedPublisher::new(b.broker().publisher());
b.include_publishing(confirm, replies);
});
consume_layer records every handled message; publish_layer records every published message. To
collect into an existing registry instead of a fresh one, use Metrics::with_registry(registry).
Metrics emitted¶
| Metric | Type | Labels |
|---|---|---|
ruststream_messages_consumed_total |
counter | name, status |
ruststream_consume_duration_seconds |
histogram | name |
ruststream_messages_published_total |
counter | name, status |
name is the subscription or destination name; status is the outcome (ack or nack for
consume; ok or error for publish).
Exporting¶
export renders the current values in the Prometheus exposition format:
Hosting is your responsibility, as with AsyncAPI: serve export() from a /metrics route in your
own HTTP stack, or push it to a gateway. metrics.registry() returns the underlying
prometheus::Registry if you want to register your own collectors alongside RustStream's or use an
existing exporter.
A complete server¶
The metrics_http
example serves /metrics with axum and publishes orders through
a /orders route, so an HTTP client drives the counters. Run it with
cargo run --example metrics_http --features macros,memory,metrics, then:
curl -X POST http://127.0.0.1:8080/orders -d '{"id":1,"quantity":3}'
curl http://127.0.0.1:8080/metrics
//! Serve Prometheus metrics over HTTP and drive them from an HTTP client.
//!
//! ```text
//! cargo run --example metrics_http --features macros,memory,metrics
//! ```
//!
//! Publish an order, then read the metrics:
//!
//! ```text
//! curl -X POST http://127.0.0.1:8080/orders -d '{"id":1,"quantity":3}'
//! curl http://127.0.0.1:8080/metrics
//! ```
//!
//! Each published order is consumed (incrementing the consume counter) and replied to on
//! `confirmations` through the metrics publish layer (incrementing the publish counter).
use std::future::pending;
use std::sync::Arc;
use axum::Router;
use axum::body::Bytes;
use axum::extract::State;
use axum::routing::{get, post};
use ruststream::memory::{MemoryBroker, MemoryPublisher};
use ruststream::metrics::Metrics;
use ruststream::runtime::{AppInfo, RustStream, TypedPublisher};
use ruststream::{OutgoingMessage, Publisher, subscriber};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
struct Order {
id: u64,
quantity: u32,
}
#[derive(Debug, Serialize)]
struct Confirmation {
id: u64,
accepted: bool,
}
#[subscriber("orders", publish("confirmations"))]
async fn confirm(order: &Order) -> Confirmation {
Confirmation {
id: order.id,
accepted: order.quantity > 0,
}
}
struct AppState {
metrics: Metrics,
ingest: MemoryPublisher,
}
async fn publish_order(State(state): State<Arc<AppState>>, body: Bytes) -> &'static str {
let _ = state
.ingest
.publish(OutgoingMessage::new("orders", body.as_ref()))
.await;
"published\n"
}
async fn serve_metrics(State(state): State<Arc<AppState>>) -> String {
state.metrics.export().unwrap_or_default()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let metrics = Metrics::new()?;
let broker = MemoryBroker::new();
let ingest = broker.publisher();
let app = RustStream::new(AppInfo::new("orders", "0.1.0"))
.layer(metrics.consume_layer())
.publish_layer(metrics.publish_layer())
.with_broker(broker, |b| {
let replies = TypedPublisher::new(b.broker().publisher());
b.include_publishing(confirm, replies);
});
// Run the service in the background; it shares the metric collectors with the HTTP state.
tokio::spawn(async move {
let _ = app.run_until(pending::<()>()).await;
});
let state = Arc::new(AppState { metrics, ingest });
let router = Router::new()
.route("/orders", post(publish_order))
.route("/metrics", get(serve_metrics))
.with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
println!("metrics on http://127.0.0.1:8080/metrics");
axum::serve(listener, router).await?;
Ok(())
}