Building a Queue Worker System with Beanstalk
Beanstalkd is a simple, fast work queue that’s perfect for background job processing. After building queue systems handling millions of jobs, I’ve learned that simplicity often beats complexity. Here’s how to build a production-ready queue worker system with Beanstalkd.
Why Beanstalkd?
Beanstalkd offers:
- Simplicity: No complex setup, just works
- Speed: Very fast, handles thousands of jobs/second
- Persistence: Jobs survive restarts
- Priority: Built-in priority support
- Delays: Schedule jobs for later execution
Installation
# Using Docker
docker run -d --name beanstalkd \
-p 11300:11300 \
schickling/beanstalkd
# Or install directly
apt-get install beanstalkd
Basic Concepts
- Tube: Like a queue (e.g., “emails”, “images”)
- Job: A unit of work
- Producer: Puts jobs into tubes
- Worker: Reserves and processes jobs
- Priority: Lower number = higher priority
- Delay: Time before job becomes ready
- TTR: Time To Run - max time worker has to process
Producer Implementation
Python Producer
from beanstalkc import Connection
import json
import time
class JobProducer:
def __init__(self, host='localhost', port=11300):
self.conn = Connection(host, port)
def put_job(self, tube, data, priority=1024, delay=0, ttr=300):
"""
Put job into tube
priority: Lower = higher priority (default 1024)
delay: Seconds before job becomes ready
ttr: Time To Run in seconds
"""
self.conn.use(tube)
job_data = json.dumps(data)
job_id = self.conn.put(job_data, priority=priority, delay=delay, ttr=ttr)
return job_id
def close(self):
self.conn.close()
# Usage
producer = JobProducer()
# Send email job
producer.put_job(
'emails',
{
'to': 'user@example.com',
'subject': 'Welcome',
'template': 'welcome'
},
priority=512, # Higher priority
ttr=60 # 60 seconds to process
)
# Schedule job for later
producer.put_job(
'reports',
{'report_type': 'daily'},
delay=3600 # Run in 1 hour
)
PHP Producer
<?php
use Pheanstalk\Pheanstalk;
class JobProducer
{
private $pheanstalk;
public function __construct($host = 'localhost', $port = 11300)
{
$this->pheanstalk = Pheanstalk::create($host, $port);
}
public function putJob($tube, $data, $priority = 1024, $delay = 0, $ttr = 300)
{
$this->pheanstalk->useTube($tube);
$jobData = json_encode($data);
return $this->pheanstalk
->put($jobData, $priority, $delay, $ttr);
}
}
// Usage
$producer = new JobProducer();
$producer->putJob('emails', [
'to' => 'user@example.com',
'subject' => 'Welcome',
'template' => 'welcome'
], 512, 0, 60);
Worker Implementation
Python Worker
from beanstalkc import Connection
import json
import signal
import sys
class QueueWorker:
def __init__(self, host='localhost', port=11300):
self.conn = Connection(host, port)
self.running = True
self.setup_signal_handlers()
def setup_signal_handlers(self):
signal.signal(signal.SIGTERM, self.handle_shutdown)
signal.signal(signal.SIGINT, self.handle_shutdown)
def handle_shutdown(self, signum, frame):
print("Shutting down gracefully...")
self.running = False
def watch_tube(self, tube):
"""Watch a tube for jobs"""
self.conn.watch(tube)
self.conn.ignore('default') # Ignore default tube
def reserve_job(self, timeout=1):
"""Reserve a job with timeout"""
try:
job = self.conn.reserve(timeout=timeout)
return job
except:
return None
def process_job(self, job, handler):
"""Process a job with handler function"""
try:
data = json.loads(job.body)
handler(data)
job.delete()
return True
except Exception as e:
print(f"Error processing job: {e}")
# Bury job for manual inspection
job.bury()
return False
def run(self, tube, handler):
"""Run worker loop"""
self.watch_tube(tube)
print(f"Worker started, watching tube: {tube}")
while self.running:
job = self.reserve_job(timeout=1)
if job:
self.process_job(job, handler)
# Allow signal handling
if not self.running:
break
print("Worker stopped")
# Email handler
def send_email_handler(data):
print(f"Sending email to {data['to']}")
# Actual email sending logic
send_email(data['to'], data['subject'], data['template'])
# Run worker
worker = QueueWorker()
worker.run('emails', send_email_handler)
PHP Worker
<?php
use Pheanstalk\Pheanstalk;
class QueueWorker
{
private $pheanstalk;
private $running = true;
public function __construct($host = 'localhost', $port = 11300)
{
$this->pheanstalk = Pheanstalk::create($host, $port);
// Handle shutdown signals
pcntl_signal(SIGTERM, [$this, 'handleShutdown']);
pcntl_signal(SIGINT, [$this, 'handleShutdown']);
}
public function handleShutdown($signal)
{
echo "Shutting down gracefully...\n";
$this->running = false;
}
public function watchTube($tube)
{
$this->pheanstalk->watch($tube);
$this->pheanstalk->ignore('default');
}
public function run($tube, $handler)
{
$this->watchTube($tube);
echo "Worker started, watching tube: {$tube}\n";
while ($this->running) {
pcntl_signal_dispatch();
$job = $this->pheanstalk
->reserveWithTimeout(1); // 1 second timeout
if ($job) {
$this->processJob($job, $handler);
}
}
echo "Worker stopped\n";
}
private function processJob($job, $handler)
{
try {
$data = json_decode($job->getData(), true);
$handler($data);
$this->pheanstalk->delete($job);
} catch (Exception $e) {
echo "Error processing job: {$e->getMessage()}\n";
$this->pheanstalk->bury($job);
}
}
}
// Handler function
function sendEmailHandler($data)
{
echo "Sending email to {$data['to']}\n";
// Actual email sending logic
}
// Run worker
$worker = new QueueWorker();
$worker->run('emails', 'sendEmailHandler');
Advanced Patterns
Priority-Based Processing
class PriorityWorker(QueueWorker):
def put_priority_job(self, tube, data, priority_level='normal'):
priorities = {
'critical': 1,
'high': 256,
'normal': 1024,
'low': 2048
}
priority = priorities.get(priority_level, 1024)
return self.put_job(tube, data, priority=priority)
# Usage
producer = PriorityWorker()
producer.put_priority_job('emails', email_data, 'critical') # Processed first
producer.put_priority_job('emails', email_data, 'low') # Processed last
Delayed Jobs
class ScheduledJobProducer(JobProducer):
def schedule_job(self, tube, data, delay_seconds):
"""Schedule job to run after delay"""
return self.put_job(tube, data, delay=delay_seconds)
def schedule_at(self, tube, data, run_at_timestamp):
"""Schedule job to run at specific time"""
delay = max(0, run_at_timestamp - time.time())
return self.put_job(tube, data, delay=int(delay))
# Usage
producer = ScheduledJobProducer()
# Run in 1 hour
producer.schedule_job('reports', report_data, delay_seconds=3600)
# Run at specific time
import datetime
run_time = datetime.datetime(2016, 12, 25, 9, 0, 0).timestamp()
producer.schedule_at('reports', report_data, run_time)
Job Retries
class RetryWorker(QueueWorker):
def process_job(self, job, handler, max_retries=3):
try:
data = json.loads(job.body)
# Check retry count
retry_count = data.get('_retry_count', 0)
if retry_count >= max_retries:
print(f"Job failed after {max_retries} retries")
job.bury()
return False
# Process job
handler(data)
job.delete()
return True
except Exception as e:
print(f"Error: {e}, retrying...")
# Increment retry count
data['_retry_count'] = retry_count + 1
# Release job back to queue with delay
job.release(delay=60 * (retry_count + 1)) # Exponential backoff
return False
Multiple Tubes
class MultiTubeWorker(QueueWorker):
def run_multiple(self, tubes, handlers):
"""Watch multiple tubes"""
for tube in tubes:
self.conn.watch(tube)
self.conn.ignore('default')
print(f"Watching tubes: {', '.join(tubes)}")
while self.running:
job = self.reserve_job(timeout=1)
if job:
tube = job.stats()['tube']
handler = handlers.get(tube)
if handler:
self.process_job(job, handler)
else:
job.release()
if not self.running:
break
# Usage
handlers = {
'emails': send_email_handler,
'images': process_image_handler,
'reports': generate_report_handler
}
worker = MultiTubeWorker()
worker.run_multiple(['emails', 'images', 'reports'], handlers)
Monitoring and Management
Job Statistics
class QueueMonitor:
def __init__(self, host='localhost', port=11300):
self.conn = Connection(host, port)
def get_tube_stats(self, tube):
"""Get statistics for a tube"""
self.conn.watch(tube)
stats = self.conn.stats_tube(tube)
return {
'name': stats['name'],
'current_jobs_ready': stats['current-jobs-ready'],
'current_jobs_urgent': stats['current-jobs-urgent'],
'current_jobs_reserved': stats['current-jobs-reserved'],
'current_jobs_delayed': stats['current-jobs-delayed'],
'current_jobs_buried': stats['current-jobs-buried'],
'total_jobs': stats['total-jobs'],
}
def get_server_stats(self):
"""Get server-wide statistics"""
return self.conn.stats()
def list_tubes(self):
"""List all tubes"""
return self.conn.list_tubes()
# Usage
monitor = QueueMonitor()
stats = monitor.get_tube_stats('emails')
print(f"Ready jobs: {stats['current_jobs_ready']}")
print(f"Buried jobs: {stats['current_jobs_buried']}")
Buried Job Management
class BuriedJobManager:
def __init__(self, host='localhost', port=11300):
self.conn = Connection(host, port)
def list_buried_jobs(self, tube):
"""List buried jobs in a tube"""
self.conn.watch(tube)
buried_jobs = []
while True:
try:
job = self.conn.peek_buried()
if not job:
break
buried_jobs.append({
'id': job.jid,
'body': job.body
})
job.kick() # Move to ready
except:
break
return buried_jobs
def kick_buried_jobs(self, tube, count=1):
"""Kick buried jobs back to ready"""
self.conn.use(tube)
return self.conn.kick(count)
# Usage
manager = BuriedJobManager()
buried = manager.list_buried_jobs('emails')
for job in buried:
print(f"Buried job {job['id']}: {job['body']}")
# Kick jobs back
manager.kick_buried_jobs('emails', count=10)
Production Deployment
Supervisor Configuration
[program:beanstalk-worker]
command=/usr/bin/python /app/worker.py
directory=/app
user=www-data
autostart=true
autorestart=true
stderr_logfile=/var/log/beanstalk-worker.err.log
stdout_logfile=/var/log/beanstalk-worker.out.log
numprocs=4
process_name=%(program_name)s_%(process_num)02d
Systemd Service
[Unit]
Description=Beanstalk Queue Worker
After=network.target beanstalkd.service
[Service]
Type=simple
User=www-data
WorkingDirectory=/app
ExecStart=/usr/bin/python /app/worker.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Best Practices
- Set appropriate TTR - Give workers enough time
- Handle failures gracefully - Bury or retry failed jobs
- Monitor buried jobs - Alert on high buried count
- Use priorities wisely - Don’t overuse high priority
- Scale workers horizontally - Multiple workers per tube
- Log job processing - Track success/failure rates
- Use delays for retries - Exponential backoff
- Clean up old jobs - Monitor job age
Conclusion
Beanstalkd provides a simple, fast queue system:
- Easy to set up and use
- Handles high throughput
- Built-in priority and delays
- Reliable job processing
Start with basic producers and workers, then add retries, monitoring, and scaling as needed. The simplicity of Beanstalkd makes it perfect for most queue use cases.
Beanstalkd queue patterns from November 2016, using beanstalkd 1.10.