Transactions¶
On standalone and sentinel the stream publisher is transactional (begin_transaction buffers,
commit flushes the whole batch in publish order through a single fred pipeline, abort
discards). The idiomatic way to use it is a batch-publishing handler wired with a .transactional()
publisher: every reply is committed atomically.
// A batch-publishing handler: each reply in the returned Vec is published to `processed`, all
// committed atomically when the transactional publisher commits.
#[subscriber(batch("orders"), publish("processed"))]
async fn process(orders: &[Order]) -> Result<Vec<Order>, HandlerResult> {
if orders.is_empty() {
return Err(HandlerResult::drop());
}
Ok(orders.iter().map(|o| Order { id: o.id }).collect())
}
// .transactional() uses RedisPublisher's TransactionalPublisher impl: the batch's replies
// are buffered and flushed as one pipeline on commit.
let processed = TypedPublisher::new(b.broker().publisher()).transactional();
b.include_batch_publishing(process, processed);
Cluster does not offer this (buffered keys may hash to different nodes), so the transaction returns an error there.