Agentic AI Systems: Multi-Agent Architectures
Single AI agents struggle with complex tasks. They exceed context limits, conflate responsibilities, and become brittle monoliths. Multi-agent systems decompose complexity: specialized agents handle distinct concerns, coordinate through messages, and compose into robust systems.
I built a multi-agent code analysis system: one agent parsed code structure, another reasoned about architecture, a third suggested refactorings. Each was smaller, testable, and replaceable. The coordinator orchestrated their interaction. The result was more maintainable than a single “do everything” agent.
Multi-agent systems aren’t new—distributed AI has decades of research. But LLMs make them practical: agents can understand natural language instructions, reason about tasks, and collaborate without rigid protocols.
Why Multiple Agents?
Separation of concerns - Parsing, reasoning, and execution are distinct skills. Separate agents, separate prompts, separate tests.
Context management - LLMs have finite context. Multiple focused agents stay within limits.
Specialization - Train/tune agents for specific domains (legal analysis, code review, data extraction).
Fault isolation - If the code execution agent fails, the reasoning agent continues.
Testability - Test each agent independently with unit tests.
Scalability - Scale expensive agents (GPT-4) separately from cheap ones (Claude Haiku).
Read AutoGen and LangGraph for framework approaches.
Agent Roles and Patterns
1. Coordinator (Orchestrator)
Decomposes high-level goals into subtasks, assigns to specialists, aggregates results.
from anthropic import Anthropic
from typing import List, Dict
class Coordinator:
"""Orchestrate multi-agent workflow."""
def __init__(self, specialists: Dict[str, 'Agent']):
self.client = Anthropic()
self.specialists = specialists
self.history = []
async def process(self, task: str) -> str:
"""Break down task and coordinate execution."""
# Decompose task
subtasks = await self.decompose(task)
results = {}
for subtask in subtasks:
agent_type = subtask['agent']
agent = self.specialists[agent_type]
# Execute subtask
result = await agent.execute(subtask['task'])
results[subtask['id']] = result
# Synthesize results
final_answer = await self.synthesize(task, results)
return final_answer
async def decompose(self, task: str) -> List[Dict]:
"""Decompose task into subtasks."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
system="""You are a task coordinator. Break down complex tasks into subtasks.
Output JSON array:
[
{"id": "1", "agent": "search", "task": "Find relevant documentation"},
{"id": "2", "agent": "code", "task": "Analyze code structure"}
]""",
messages=[{
"role": "user",
"content": f"Break down this task:\n\n{task}"
}]
)
import json
return json.loads(response.content[0].text)
async def synthesize(self, task: str, results: Dict) -> str:
"""Combine results into final answer."""
context = "\n\n".join([
f"Subtask {k}:\n{v}" for k, v in results.items()
])
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
system="You are a synthesizer. Combine subtask results into a coherent answer.",
messages=[{
"role": "user",
"content": f"""Original task: {task}
Subtask results:
{context}
Provide a comprehensive answer to the original task."""
}]
)
return response.content[0].text
2. Specialist Agents
Domain-specific agents with focused expertise:
class SearchAgent:
"""Specialist for web/documentation search."""
def __init__(self, search_api):
self.client = Anthropic()
self.search_api = search_api
async def execute(self, task: str) -> str:
"""Execute search task."""
# Extract search query
query = await self.extract_query(task)
# Perform search
results = await self.search_api.search(query)
# Synthesize results
return self.synthesize_results(results)
async def extract_query(self, task: str) -> str:
"""Extract search query from task description."""
response = self.client.messages.create(
model="claude-3-haiku-20240307", # Cheap model for extraction
max_tokens=256,
system="Extract the search query from the task. Return only the query text.",
messages=[{"role": "user", "content": task}]
)
return response.content[0].text.strip()
class CodeAgent:
"""Specialist for code analysis."""
def __init__(self):
self.client = Anthropic()
async def execute(self, task: str) -> str:
"""Execute code analysis task."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
system="""You are a code analysis expert. Analyze code for:
- Structure and architecture
- Potential bugs
- Performance issues
- Security vulnerabilities
Provide clear, actionable feedback.""",
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
class ExecutionAgent:
"""Specialist for executing code/commands safely."""
def __init__(self, sandbox):
self.client = Anthropic()
self.sandbox = sandbox
async def execute(self, task: str) -> str:
"""Execute code in sandbox."""
# Parse code from task
code = await self.extract_code(task)
# Execute in sandbox
result = await self.sandbox.run(code, timeout=30)
return f"Execution result:\n{result.stdout}\n\nErrors:\n{result.stderr}"
3. Message Bus Pattern
For loose coupling and extensibility:
import asyncio
from typing import Callable, Dict, List
import json
class MessageBus:
"""Pub/sub message bus for agent communication."""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, topic: str, handler: Callable):
"""Subscribe to topic."""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(handler)
async def publish(self, topic: str, message: Dict):
"""Publish message to topic."""
if topic not in self.subscribers:
return
# Add metadata
message['topic'] = topic
message['timestamp'] = time.time()
# Notify all subscribers
tasks = [
asyncio.create_task(handler(message))
for handler in self.subscribers[topic]
]
await asyncio.gather(*tasks, return_exceptions=True)
# Usage
bus = MessageBus()
# Subscribe agents
async def search_handler(message):
query = message['query']
results = await search_api.search(query)
await bus.publish('search_results', {'results': results})
async def code_handler(message):
results = message['results']
analysis = await code_agent.analyze(results)
await bus.publish('analysis_complete', {'analysis': analysis})
bus.subscribe('search_request', search_handler)
bus.subscribe('search_results', code_handler)
# Trigger workflow
await bus.publish('search_request', {'query': 'Flask security best practices'})
Production Architecture
from dataclasses import dataclass
from enum import Enum
import time
class AgentStatus(Enum):
IDLE = "idle"
WORKING = "working"
FAILED = "failed"
@dataclass
class AgentMetrics:
"""Track agent performance."""
total_tasks: int = 0
successful_tasks: int = 0
failed_tasks: int = 0
total_latency: float = 0.0
total_cost: float = 0.0
class ProductionAgent:
"""Production-ready agent with monitoring."""
def __init__(self, name: str, client):
self.name = name
self.client = client
self.status = AgentStatus.IDLE
self.metrics = AgentMetrics()
async def execute(self, task: str) -> str:
"""Execute with monitoring and error handling."""
self.status = AgentStatus.WORKING
start_time = time.time()
try:
# Execute with retries
result = await self._execute_with_retry(task, max_retries=3)
# Update metrics
self.metrics.successful_tasks += 1
self.metrics.total_latency += time.time() - start_time
self.status = AgentStatus.IDLE
return result
except Exception as e:
# Handle failure
self.metrics.failed_tasks += 1
self.status = AgentStatus.FAILED
# Log error
logger.error(f"Agent {self.name} failed: {e}")
# Raise for coordinator to handle
raise
finally:
self.metrics.total_tasks += 1
async def _execute_with_retry(self, task: str, max_retries: int) -> str:
"""Execute with exponential backoff."""
for attempt in range(max_retries):
try:
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": task}]
)
# Track cost
self.metrics.total_cost += self._calculate_cost(response)
return response.content[0].text
except Exception as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
await asyncio.sleep(2 ** attempt)
def _calculate_cost(self, response) -> float:
"""Calculate API cost."""
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
# Claude Sonnet pricing (example)
input_cost = input_tokens * 0.003 / 1000
output_cost = output_tokens * 0.015 / 1000
return input_cost + output_cost
def get_metrics(self) -> Dict:
"""Export metrics."""
return {
'agent': self.name,
'status': self.status.value,
'total_tasks': self.metrics.total_tasks,
'success_rate': self.metrics.successful_tasks / max(self.metrics.total_tasks, 1),
'avg_latency': self.metrics.total_latency / max(self.metrics.successful_tasks, 1),
'total_cost': self.metrics.total_cost
}
Observability and Monitoring
import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Set up tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Add OTLP exporter
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Structured logging
logger = structlog.get_logger()
class ObservableCoordinator(Coordinator):
"""Coordinator with full observability."""
async def process(self, task: str) -> str:
"""Process with tracing and logging."""
with tracer.start_as_current_span("coordinator.process") as span:
span.set_attribute("task", task)
logger.info("processing_task", task=task)
try:
# Decompose
with tracer.start_as_current_span("coordinator.decompose"):
subtasks = await self.decompose(task)
span.set_attribute("subtask_count", len(subtasks))
logger.info("decomposed_task", subtasks=len(subtasks))
# Execute
results = {}
for subtask in subtasks:
with tracer.start_as_current_span(f"agent.{subtask['agent']}"):
result = await self.specialists[subtask['agent']].execute(subtask['task'])
results[subtask['id']] = result
# Synthesize
with tracer.start_as_current_span("coordinator.synthesize"):
answer = await self.synthesize(task, results)
logger.info("task_completed", task=task)
return answer
except Exception as e:
logger.error("task_failed", task=task, error=str(e))
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise
Testing Multi-Agent Systems
import pytest
from unittest.mock import Mock, AsyncMock
@pytest.mark.asyncio
async def test_coordinator_decomposition():
"""Test task decomposition."""
coordinator = Coordinator({})
coordinator.client = Mock()
coordinator.client.messages.create = AsyncMock(return_value=Mock(
content=[Mock(text='[{"id": "1", "agent": "search", "task": "Search docs"}]')]
))
subtasks = await coordinator.decompose("Find Flask security info")
assert len(subtasks) == 1
assert subtasks[0]['agent'] == 'search'
@pytest.mark.asyncio
async def test_agent_execution():
"""Test agent execution with mock API."""
agent = SearchAgent(Mock())
agent.client = Mock()
agent.client.messages.create = AsyncMock(return_value=Mock(
content=[Mock(text='Flask security')]
))
agent.search_api.search = AsyncMock(return_value=['result1', 'result2'])
result = await agent.execute("Search for Flask security")
assert result is not None
agent.search_api.search.assert_called_once()
@pytest.mark.asyncio
async def test_message_bus():
"""Test pub/sub message bus."""
bus = MessageBus()
received = []
async def handler(message):
received.append(message)
bus.subscribe('test', handler)
await bus.publish('test', {'data': 'test'})
await asyncio.sleep(0.1) # Wait for async handlers
assert len(received) == 1
assert received[0]['data'] == 'test'
Best Practices
-
Design for failure - Agents will fail. Implement retries, circuit breakers, fallbacks.
-
Keep agents focused - One agent, one responsibility. Don’t build god agents.
-
Use structured outputs - JSON schemas, Pydantic models. Makes coordination reliable.
-
Monitor everything - Latency, cost, success rate, per agent.
-
Test independently - Unit test each agent with mocked dependencies.
-
Version agents - Deploy different agent versions independently.
-
Implement timeouts - Agents can hang. Set aggressive timeouts.
-
Cache expensive operations - Search results, embeddings, analysis.
-
Cost management - Track per-agent costs. Use cheaper models where possible.
-
Security boundaries - Agents may have different trust levels. Enforce permissions.
Conclusion
Multi-agent systems transform complex AI tasks into manageable, composable components. By decomposing responsibilities, you gain testability, fault isolation, and scalability—at the cost of coordination complexity.
The patterns are well-established: coordinators orchestrate, specialists execute, message buses decouple. The tooling is maturing: LangGraph, AutoGen, CrewAI provide frameworks. The economics work: scaling cheap and expensive agents independently optimizes cost.
Start simple: coordinator + 2-3 specialists. Add observability early. Measure everything. Iterate based on bottlenecks.
Multi-agent systems aren’t always the answer—sometimes a well-prompted single agent suffices. But for complex, multi-step tasks requiring different expertise, they’re the right architecture.
Further Resources:
- AutoGen Framework - Microsoft’s multi-agent framework
- LangGraph - LangChain’s graph-based agents
- CrewAI - Role-based multi-agent system
- Multi-Agent Systems Book - Academic foundation
- OpenTelemetry - Observability standard
- Anthropic Agent Patterns - Claude agent guidance
- Agent Protocol - Standardized agent communication
Agentic AI systems from November 2025, covering multi-agent architectures and production patterns.