Building distributed systems that stay consistent under load is one of the hardest problems in backend engineering. Event-driven architecture with Redis Streams gives you a durable, ordered message log that beats simple pub/sub when reliability matters. In this guide we'll walk through a production-grade implementation.
Why Redis Streams Over Pub/Sub
Traditional Redis Pub/Sub is fire-and-forget. If a consumer is offline when a message arrives, that message is gone. Redis Streams changes this fundamentally. Each message is stored persistently (until you explicitly trim), carries a unique auto-generated ID, and supports consumer groups for parallel processing with exactly-once-per-group semantics.
Key advantages in a microservice context:
- Durability: Messages survive consumer restarts
- Replayability: New services can read from the beginning of the stream
- Consumer Groups: Multiple instances of the same service share work without duplication
- Pending Entry List (PEL): Unacknowledged messages are tracked so they can be redelivered
Designing the Stream Schema
Before writing code, design your stream naming convention. A common pattern is service:entity:action, for example:
orders:order:created orders:order:fulfilled payments:payment:processed inventory:stock:updated
This scoping makes it trivial to subscribe specific consumers to relevant events without filtering noise.
Producing Events from a Node.js Service
Use ioredis for stream operations. The XADD command appends a message to the stream:
import Redis from 'ioredis';
const redis = new Redis({ host: process.env.REDIS_HOST, port: 6379 });
async function emitOrderCreated(order: Order) {
const streamKey = 'orders:order:created';
const messageId = await redis.xadd(
streamKey,
'*', // auto-generate ID
'orderId', order.id,
'userId', order.userId,
'amount', String(order.totalAmount),
'payload', JSON.stringify(order)
);
console.log('Event emitted:', messageId);
}
The * tells Redis to auto-generate a millisecond-precision ID like 1694012345678-0. You can also provide explicit IDs for idempotent writes, which is useful for retries.
Creating Consumer Groups
Consumer groups allow multiple worker instances to share a stream without processing the same message twice. Create the group once at startup:
async function ensureConsumerGroup(stream: string, group: string) {
try {
// '$' means start from new messages; '0' would replay from the beginning
await redis.xgroup('CREATE', stream, group, '$', 'MKSTREAM');
} catch (err: any) {
if (!err.message.includes('BUSYGROUP')) throw err;
// Group already exists — safe to ignore
}
}
Consuming Messages Reliably
The consumer reads with XREADGROUP and acknowledges with XACK only after successful processing:
async function startConsumer(stream: string, group: string, consumerId: string) {
await ensureConsumerGroup(stream, group);
while (true) {
const results = await redis.xreadgroup(
'GROUP', group, consumerId,
'COUNT', 10,
'BLOCK', 2000, // block up to 2s waiting for messages
'STREAMS', stream, '>' // '>' means undelivered messages only
);
if (!results) continue;
for (const [, messages] of results) {
for (const [id, fields] of messages) {
const payload = parseFields(fields);
try {
await processOrder(payload);
await redis.xack(stream, group, id);
} catch (err) {
console.error('Processing failed, message stays in PEL:', id, err);
// Do NOT xack — the message remains in the pending list for retry
}
}
}
}
}
Notice the critical detail: we only call XACK after the business logic succeeds. If processing throws, the message stays in the Pending Entry List and will be redelivered on the next XREADGROUP call with > changed to 0.
Handling Stale Pending Messages
A separate "reaper" routine should periodically check the PEL for messages stuck longer than your timeout threshold. Use XCLAIM to reassign them to an active consumer:
async function reclaimStaleMessages(stream: string, group: string, consumerId: string) {
const pendingInfo = await redis.xpending(stream, group, '-', '+', 20);
const staleThresholdMs = 30_000;
for (const entry of pendingInfo) {
const [msgId, , idleTime, deliveryCount] = entry;
if (idleTime > staleThresholdMs) {
if (deliveryCount > 5) {
// Dead-letter: move to a separate error stream
const fields = await redis.xrange(stream, msgId, msgId);
if (fields.length) {
await redis.xadd('dead-letter:orders', '*', ...fields[0][1]);
}
await redis.xack(stream, group, msgId);
} else {
await redis.xclaim(stream, group, consumerId, staleThresholdMs, msgId);
}
}
}
}
Stream Trimming
Without trimming, streams grow forever. Use MAXLEN with an approximate trim for performance:
// Trim to ~100,000 entries (approximate for performance) await redis.xtrim(stream, 'MAXLEN', '~', 100_000);
Alternatively, set MAXLEN directly in your XADD command so trimming is atomic with the write.
Observability and Monitoring
Track stream health with these Redis commands in your monitoring loop:
XLEN stream-key— total messages in streamXPENDING stream group - + 0— count of pending (unacknowledged) messagesXINFO GROUPS stream-key— consumer group lag
Alert when the pending count exceeds a threshold or when lag (last-delivered-id vs latest-id) grows. These are your leading indicators of consumer problems before they cascade.
Deployment Considerations
In production, run Redis with appendonly yes (AOF persistence) or use Redis Cluster with replication to avoid data loss on node failure. For extremely high throughput, partition your streams by entity ID to spread load across shards.
Consumer worker processes should run under PM2 or Kubernetes with auto-restart on crash. Because messages sit in the PEL after a crash, they'll be reclaimed by the reaper and processed once the worker restarts — giving you at-least-once delivery semantics by design.
Conclusion
Redis Streams provide the reliability gap that pub/sub leaves open. By combining consumer groups with explicit acknowledgment, stale-message reclaiming, and dead-letter routing, you get a message bus that handles real-world failures gracefully. The patterns above are production-tested at TechYantram across platforms processing hundreds of thousands of events per day.