Skip to content

Codecs and serialization

A codec turns wire bytes into your typed payload and back. It is a separate seam from the broker: the pipeline on the consume side is bytes -> Codec -> typed payload -> handler, and the publish side runs it in reverse. Codecs are compile-time types, so decoding has no dynamic dispatch.

Built-in codecs

Codec Feature Pulls in Wire format
JsonCodec json (default) serde_json JSON
MsgpackCodec msgpack rmp-serde MessagePack
CborCodec cbor ciborium CBOR

Codec features are strictly additive; enable as many as you need. Message types only need to derive serde::Deserialize (and Serialize for replies).

The default codec

DefaultCodec is a feature-selected alias: json if enabled, otherwise cbor, otherwise msgpack. It is what include(def) and TypedPublisher::new(publisher) use when nothing names a codec, which is why neither takes a codec argument. It exists only when at least one codec feature is enabled; with no codec features, only the explicit-codec methods are available.

Where the decode codec comes from

The decode codec is fixed at compile time. include takes no codec argument; it resolves one from the most specific level you set, from narrowest to widest:

Per handler

Override a single mounting:

router.with_codec(CborCodec).include(handle);

// name the codec for this one handler by mounting it through a router
b.include_router(Router::new().with_codec(JsonCodec).include(handle));

Per scope

Set one codec for every handler in a with_broker scope:

use ruststream::codec::CborCodec;

RustStream::new(info)
    .with_broker_codec(MemoryBroker::new(), CborCodec, |b| {
        b.include(handle); // decodes with CborCodec
        b.include(audit); // also CborCodec
    })

Default

When nothing above names a codec, include uses DefaultCodec.

The publish side

Publishers mirror the same rules: TypedPublisher::new(publisher) encodes replies with the default codec, and TypedPublisher::with_codec(publisher, codec) names one. include_publishing(def, publisher) reuses the publisher's codec to decode the incoming request, so one mounting names one codec. The request and reply formats can still differ: set the decode codec on the scope (with_broker_codec) or on the router chain (Router::with_codec) and keep the reply codec on the TypedPublisher.

There is no per-message-type codec (no associated codec on a message trait): the codec is a property of the mounting, not of the type.

Decode failures

When decoding fails, the message is dropped by default (a nack without requeue). The policy is configurable on the typed adapter when you build handlers by hand: typed(codec, handler) returns a Typed wrapper whose on_decode_failure accepts a FailurePolicy (Drop, Retry, RetryAfter(..), Skip, or FailFast). On a macro handler the same policy is set with the on_failure(decode = ..) clause (see the failure-policy guide).

use ruststream::runtime::{FailurePolicy, typed};

// inside with_broker(...):
let strict = typed(JsonCodec, |_order: &Order, _ctx: &mut Context| async {
    HandlerResult::Ack
})
.on_decode_failure(FailurePolicy::Retry);
b.handle(
    b.broker().subscribe("orders"),
    strict,
    HandlerMetadata::typed::<Order>("orders"),
);

Retry with care: a payload that can never decode will redeliver forever unless the broker has a dead-letter or max-deliveries policy. The codec examples above are examples/codecs.rs.

Custom codecs

A codec is any type implementing the Codec trait, so you can supply your own (a schema-registry envelope, an encrypting wrapper) and pass it anywhere a built-in codec goes: with_broker_codec, Router::with_codec, or TypedPublisher::with_codec.