Building Event-Driven Microservices with AWS SNS and SQS
Event-driven microservices on AWS use SNS (Simple Notification Service) and SQS (Simple Queue Service). After building production systems with these services, here’s how to architect them effectively.
AWS Messaging Services
SNS (Pub/Sub)
- Topics for broadcasting
- Multiple subscribers
- Push-based delivery
SQS (Queues)
- Message queues
- Pull-based delivery
- Guaranteed delivery
Architecture Pattern
Service A → SNS Topic → SQS Queues → Services B, C, D
SNS Topics
Create Topic
import boto3
sns = boto3.client('sns')
# Create topic
topic_response = sns.create_topic(Name='order-events')
topic_arn = topic_response['TopicArn']
print(f"Topic ARN: {topic_arn}")
Publish Message
def publish_order_created(order_id, user_id, amount):
sns = boto3.client('sns')
message = {
'event': 'order.created',
'order_id': order_id,
'user_id': user_id,
'amount': amount,
'timestamp': datetime.utcnow().isoformat()
}
response = sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:order-events',
Message=json.dumps(message),
MessageAttributes={
'event_type': {
'DataType': 'String',
'StringValue': 'order.created'
}
}
)
return response['MessageId']
SQS Queues
Create Queue
sqs = boto3.client('sqs')
# Create queue
queue_response = sqs.create_queue(
QueueName='order-processing',
Attributes={
'VisibilityTimeout': '300', # 5 minutes
'MessageRetentionPeriod': '1209600', # 14 days
'ReceiveMessageWaitTimeSeconds': '20' # Long polling
}
)
queue_url = queue_response['QueueUrl']
Subscribe Queue to SNS
# Get queue ARN
queue_attrs = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['QueueArn']
)
queue_arn = queue_attrs['Attributes']['QueueArn']
# Subscribe queue to SNS topic
sns.subscribe(
TopicArn='arn:aws:sns:us-east-1:123456789:order-events',
Protocol='sqs',
Endpoint=queue_arn
)
# Grant SNS permission to send to queue
sqs_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": queue_arn,
"Condition": {
"ArnEquals": {
"aws:SourceArn": "arn:aws:sns:us-east-1:123456789:order-events"
}
}
}]
}
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={'Policy': json.dumps(sqs_policy)}
)
Consumer Implementation
Poll Messages
import boto3
import json
from botocore.exceptions import ClientError
sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/order-processing'
def process_messages():
while True:
try:
# Receive messages (long polling)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # Long polling
MessageAttributeNames=['All']
)
messages = response.get('Messages', [])
if not messages:
continue
for message in messages:
try:
# Parse SNS message
sns_message = json.loads(message['Body'])
event_data = json.loads(sns_message['Message'])
# Process event
handle_event(event_data)
# Delete message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
except Exception as e:
print(f"Error processing message: {e}")
# Message will become visible again after visibility timeout
except ClientError as e:
print(f"AWS error: {e}")
time.sleep(5)
def handle_event(event_data):
event_type = event_data['event']
if event_type == 'order.created':
process_order_created(event_data)
elif event_type == 'order.cancelled':
process_order_cancelled(event_data)
else:
print(f"Unknown event type: {event_type}")
Dead Letter Queues
Setup DLQ
# Create DLQ
dlq_response = sqs.create_queue(
QueueName='order-processing-dlq'
)
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq_response['QueueUrl'],
AttributeNames=['QueueArn']
)['Attributes']['QueueArn']
# Configure main queue with DLQ
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 3 # Move to DLQ after 3 failures
})
}
)
Process DLQ
def process_dlq():
dlq_url = 'https://sqs.us-east-1.amazonaws.com/123456789/order-processing-dlq'
while True:
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10
)
messages = response.get('Messages', [])
for message in messages:
# Log for manual review
log_failed_message(message)
# Or retry after fixing issue
# republish_to_main_queue(message)
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=message['ReceiptHandle']
)
Filtering Messages
SNS Message Filtering
# Subscribe with filter
sns.subscribe(
TopicArn='arn:aws:sns:us-east-1:123456789:order-events',
Protocol='sqs',
Endpoint=queue_arn,
Attributes={
'FilterPolicy': json.dumps({
'event_type': ['order.created', 'order.updated']
})
}
)
SQS Message Filtering
# Receive with filter
response = sqs.receive_message(
QueueUrl=queue_url,
MessageAttributeNames=['event_type'],
MessageAttributeFilters={
'event_type': {
'StringValue': 'order.created',
'DataType': 'String'
}
}
)
Fan-Out Pattern
# One SNS topic
topic_arn = 'arn:aws:sns:us-east-1:123456789:order-events'
# Multiple SQS queues
queues = [
'order-processing',
'inventory-update',
'email-notification',
'analytics-tracking'
]
# Subscribe all queues
for queue_name in queues:
queue_url = sqs.get_queue_url(QueueName=queue_name)['QueueUrl']
queue_arn = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['QueueArn']
)['Attributes']['QueueArn']
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_arn
)
Error Handling
Retry Logic
def process_with_retry(message, max_retries=3):
for attempt in range(max_retries):
try:
handle_event(message)
return True
except TransientError as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
raise
except PermanentError as e:
# Don't retry permanent errors
log_error(e)
return False
return False
Visibility Timeout
# Extend visibility timeout for long-running tasks
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle'],
VisibilityTimeout=600 # 10 minutes
)
Monitoring
CloudWatch Metrics
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_metric(metric_name, value, unit='Count'):
cloudwatch.put_metric_data(
Namespace='OrderProcessing',
MetricData=[{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}]
)
# Track processing
publish_metric('MessagesProcessed', 1)
publish_metric('ProcessingDuration', duration_ms, 'Milliseconds')
Alarms
cloudwatch.put_metric_alarm(
AlarmName='HighDLQMessages',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='ApproximateNumberOfMessagesVisible',
Namespace='AWS/SQS',
Period=300,
Statistic='Average',
Threshold=100.0,
ActionsEnabled=True,
AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts']
)
Best Practices
- Use long polling - Reduce empty responses
- Set appropriate visibility timeout - Based on processing time
- Implement DLQ - Handle failed messages
- Use message attributes - For filtering and routing
- Monitor queue depth - Alert on backlog
- Batch operations - Process multiple messages
- Idempotent processing - Handle duplicates
- Use FIFO queues - When order matters
Conclusion
SNS + SQS enable:
- Decoupled microservices
- Reliable message delivery
- Scalable architecture
- AWS-native integration
Start with simple pub/sub, add filtering and DLQs as needed. The patterns shown here handle millions of messages in production.
Event-driven microservices with AWS SNS/SQS from March 2018, covering production patterns.