Message Queue Patterns: Fan-out, Priority Queues, and Dead Letter Queues
Message queues enable reliable asynchronous processing. After implementing various queue patterns in production, here are the patterns that work.
Fan-Out Pattern
One Producer, Multiple Consumers
// Producer
const sns = require('aws-sdk').SNS();
const sqs = require('aws-sdk').SQS();
// Publish to SNS topic
await sns.publish({
TopicArn: 'arn:aws:sns:us-east-1:123456789:order-events',
Message: JSON.stringify({
event: 'order.created',
orderId: '123',
userId: '456',
total: 99.99
})
}).promise();
// Multiple SQS queues subscribe to topic
// - order-processing queue
// - email-notification queue
// - analytics-tracking queue
Implementation
// Subscribe queues to topic
const queues = [
'order-processing',
'email-notification',
'analytics-tracking'
];
for (const queueName of queues) {
const queueUrl = await sqs.getQueueUrl({ QueueName: queueName }).promise();
const queueArn = await sqs.getQueueAttributes({
QueueUrl: queueUrl.QueueUrl,
AttributeNames: ['QueueArn']
}).promise();
await sns.subscribe({
TopicArn: topicArn,
Protocol: 'sqs',
Endpoint: queueArn.Attributes.QueueArn
}).promise();
}
Priority Queues
RabbitMQ Priority
const amqp = require('amqplib');
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Declare priority queue
await channel.assertQueue('priority-queue', {
arguments: {
'x-max-priority': 10
}
});
// Publish with priority
await channel.sendToQueue('priority-queue', Buffer.from(JSON.stringify({
task: 'process-payment',
priority: 9 // High priority
})), {
priority: 9
});
// Consumer
channel.consume('priority-queue', (msg) => {
const task = JSON.parse(msg.content.toString());
console.log('Processing:', task);
channel.ack(msg);
}, { noAck: false });
SQS FIFO Priority
// Use message group ID for ordering
// Use message deduplication ID for deduplication
await sqs.sendMessage({
QueueUrl: fifoQueueUrl,
MessageBody: JSON.stringify({
task: 'process-payment',
priority: 'high'
}),
MessageGroupId: 'payments', // Same group = ordered
MessageDeduplicationId: `payment-${orderId}`, // Deduplication
MessageAttributes: {
Priority: {
DataType: 'String',
StringValue: 'high'
}
}
}).promise();
Dead Letter Queues
Setup DLQ
// Create DLQ
const dlqResponse = await sqs.createQueue({
QueueName: 'order-processing-dlq'
}).promise();
const dlqArn = await sqs.getQueueAttributes({
QueueUrl: dlqResponse.QueueUrl,
AttributeNames: ['QueueArn']
}).promise();
// Configure main queue with DLQ
await sqs.setQueueAttributes({
QueueUrl: mainQueueUrl,
Attributes: {
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: dlqArn.Attributes.QueueArn,
maxReceiveCount: 3 // Move to DLQ after 3 failures
})
}
}).promise();
Process DLQ
async function processDLQ() {
while (true) {
const messages = await sqs.receiveMessage({
QueueUrl: dlqQueueUrl,
MaxNumberOfMessages: 10
}).promise();
if (!messages.Messages) {
await sleep(5000);
continue;
}
for (const message of messages.Messages) {
// Log for manual review
console.error('Failed message:', message.Body);
// Or retry after fixing issue
// await republishToMainQueue(message);
await sqs.deleteMessage({
QueueUrl: dlqQueueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
}
}
}
Message Ordering
FIFO Queue
// SQS FIFO queue maintains order within message group
await sqs.sendMessage({
QueueUrl: fifoQueueUrl,
MessageBody: JSON.stringify({ step: 1 }),
MessageGroupId: 'order-123' // Same group = ordered
}).promise();
await sqs.sendMessage({
QueueUrl: fifoQueueUrl,
MessageBody: JSON.stringify({ step: 2 }),
MessageGroupId: 'order-123' // Processed after step 1
}).promise();
RabbitMQ Single Active Consumer
// Only one consumer processes messages
await channel.assertQueue('ordered-queue', {
arguments: {
'x-single-active-consumer': true
}
});
Retry Patterns
Exponential Backoff
async function processMessage(message, retryCount = 0) {
const maxRetries = 3;
const baseDelay = 1000; // 1 second
try {
await processOrder(message);
return; // Success
} catch (error) {
if (retryCount >= maxRetries) {
// Move to DLQ
await sendToDLQ(message, error);
return;
}
// Exponential backoff
const delay = baseDelay * Math.pow(2, retryCount);
await sleep(delay);
// Retry
return processMessage(message, retryCount + 1);
}
}
SQS Visibility Timeout
// Extend visibility timeout for long-running tasks
await sqs.changeMessageVisibility({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 600 // 10 minutes
}).promise();
Batch Processing
Batch Messages
// Send batch
const entries = messages.map((msg, index) => ({
Id: `msg-${index}`,
MessageBody: JSON.stringify(msg)
}));
await sqs.sendMessageBatch({
QueueUrl: queueUrl,
Entries: entries
}).promise();
// Receive batch
const response = await sqs.receiveMessage({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10 // Batch size
}).promise();
// Process batch
for (const message of response.Messages) {
await processMessage(message);
await sqs.deleteMessage({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
}
Message Filtering
SNS Message Filtering
// Subscribe with filter
await sns.subscribe({
TopicArn: topicArn,
Protocol: 'sqs',
Endpoint: queueArn,
Attributes: {
FilterPolicy: JSON.stringify({
event_type: ['order.created', 'order.updated'],
priority: ['high']
})
}
}).promise();
RabbitMQ Headers Exchange
// Publish with headers
await channel.publish('headers-exchange', '', Buffer.from(message), {
headers: {
event_type: 'order.created',
priority: 'high'
}
});
// Bind queue with headers
await channel.bindQueue('order-queue', 'headers-exchange', '', {
'x-match': 'all', // All headers must match
event_type: 'order.created',
priority: 'high'
});
Best Practices
- Use DLQs - Handle failed messages
- Implement retries - With exponential backoff
- Set visibility timeout - Based on processing time
- Use batching - Reduce API calls
- Monitor queue depth - Alert on backlog
- Use message attributes - For filtering
- Idempotent processing - Handle duplicates
- Order when needed - Use FIFO queues
Conclusion
Message queue patterns enable:
- Reliable processing
- Scalable architecture
- Error handling
- Flexible routing
Use fan-out for broadcasting, priority queues for ordering, and DLQs for failures. The patterns shown here handle production workloads.
Message queue patterns from October 2019, covering production patterns.