
The future of AI isn't just about making individual agents smarter - it's about making them work together. Multi-agent systems (MAS) represent a paradigm shift from monolithic AI to distributed, collaborative intelligence. Let's explore how to architect and implement systems where multiple AI agents coordinate to solve complex problems.
A multi-agent system consists of multiple autonomous agents that interact, coordinate, and collaborate to achieve individual or collective goals. Unlike single-agent systems, MAS can:
A central orchestrator manages agent interactions:
1from typing import List, Dict, Any 2from dataclasses import dataclass 3from enum import Enum 4 5class AgentStatus(Enum): 6 IDLE = "idle" 7 WORKING = "working" 8 WAITING = "waiting" 9 COMPLETED = "completed" 10 FAILED = "failed" 11 12@dataclass 13class Agent: 14 id: str 15 role: str 16 capabilities: List[str] 17 status: AgentStatus = AgentStatus.IDLE 18 current_task: Optional[str] = None 19 20class Orchestrator: 21 def __init__(self): 22 self.agents: Dict[str, Agent] = {} 23 self.task_queue: List[Task] = [] 24 self.completed_tasks: List[Task] = [] 25 26 def register_agent(self, agent: Agent): 27 """Register an agent with the orchestrator""" 28 self.agents[agent.id] = agent 29 print(f"Agent {agent.id} registered with role: {agent.role}") 30 31 def assign_task(self, task: Task) -> Optional[Agent]: 32 """Assign task to most suitable available agent""" 33 # Find agents with required capabilities 34 capable_agents = [ 35 agent for agent in self.agents.values() 36 if agent.status == AgentStatus.IDLE 37 and any(cap in agent.capabilities for cap in task.required_capabilities) 38 ] 39 40 if not capable_agents: 41 self.task_queue.append(task) 42 return None 43 44 # Select best agent (simple: first available) 45 selected_agent = capable_agents[0] 46 selected_agent.status = AgentStatus.WORKING 47 selected_agent.current_task = task.id 48 49 return selected_agent 50 51 def coordinate(self, task: Task): 52 """Coordinate multiple agents for complex task""" 53 # Break task into subtasks 54 subtasks = self.decompose_task(task) 55 56 # Assign subtasks to agents 57 assignments = {} 58 for subtask in subtasks: 59 agent = self.assign_task(subtask) 60 if agent: 61 assignments[subtask.id] = agent.id 62 63 # Monitor execution 64 results = self.monitor_execution(assignments) 65 66 # Aggregate results 67 return self.aggregate_results(results) 68 69 def decompose_task(self, task: Task) -> List[Task]: 70 """Break complex task into subtasks""" 71 # Use LLM to decompose task 72 decomposition_prompt = f""" 73 Task: {task.description} 74 75 Break this task into smaller, independent subtasks that can be 76 executed by different specialized agents. 77 78 For each subtask, specify: 79 - Description 80 - Required capabilities 81 - Dependencies on other subtasks 82 83 Return as structured JSON. 84 """ 85 86 # Get decomposition from LLM 87 # Parse and create subtask objects 88 pass 89 90 def monitor_execution(self, assignments: Dict[str, str]) -> Dict[str, Any]: 91 """Monitor agent execution""" 92 results = {} 93 while assignments: 94 for task_id, agent_id in list(assignments.items()): 95 agent = self.agents[agent_id] 96 97 if agent.status == AgentStatus.COMPLETED: 98 results[task_id] = agent.get_result() 99 assignments.pop(task_id) 100 agent.status = AgentStatus.IDLE 101 102 elif agent.status == AgentStatus.FAILED: 103 # Handle failure - retry or reassign 104 self.handle_failure(task_id, agent_id) 105 assignments.pop(task_id) 106 107 return results 108
Agents communicate directly without central coordination:
1import asyncio 2from typing import Set 3 4class Message: 5 def __init__(self, sender: str, receiver: str, content: Any, msg_type: str): 6 self.sender = sender 7 self.receiver = receiver 8 self.content = content 9 self.type = msg_type 10 self.timestamp = datetime.now() 11 12class P2PAgent: 13 def __init__(self, agent_id: str, role: str): 14 self.id = agent_id 15 self.role = role 16 self.peers: Set[str] = set() 17 self.inbox: asyncio.Queue = asyncio.Queue() 18 self.knowledge_base: Dict[str, Any] = {} 19 20 def connect_to_peer(self, peer_id: str): 21 """Establish connection with another agent""" 22 self.peers.add(peer_id) 23 self.broadcast_message({ 24 "type": "peer_connected", 25 "peer_id": self.id 26 }) 27 28 async def send_message(self, receiver: str, content: Any, msg_type: str): 29 """Send message to specific agent""" 30 message = Message(self.id, receiver, content, msg_type) 31 # Route message to receiver 32 await self.route_message(message) 33 34 async def broadcast_message(self, content: Any): 35 """Broadcast message to all peers""" 36 for peer_id in self.peers: 37 await self.send_message(peer_id, content, "broadcast") 38 39 async def process_messages(self): 40 """Process incoming messages""" 41 while True: 42 message = await self.inbox.get() 43 await self.handle_message(message) 44 45 async def handle_message(self, message: Message): 46 """Handle different message types""" 47 if message.type == "request": 48 response = await self.process_request(message.content) 49 await self.send_message( 50 message.sender, 51 response, 52 "response" 53 ) 54 55 elif message.type == "share_knowledge": 56 self.update_knowledge(message.content) 57 58 elif message.type == "collaboration_request": 59 await self.handle_collaboration(message) 60 61 async def collaborate(self, task: str, required_roles: List[str]): 62 """Initiate collaboration with peers""" 63 # Find suitable peers 64 collaborators = await self.find_collaborators(required_roles) 65 66 # Propose collaboration 67 collaboration_id = f"collab_{uuid.uuid4()}" 68 proposals = [] 69 70 for peer_id in collaborators: 71 proposal = await self.send_message( 72 peer_id, 73 { 74 "collaboration_id": collaboration_id, 75 "task": task, 76 "initiator": self.id 77 }, 78 "collaboration_request" 79 ) 80 proposals.append(proposal) 81 82 # Wait for responses 83 responses = await self.collect_responses(collaboration_id) 84 85 # Form team 86 team = [peer for peer, accepted in responses.items() if accepted] 87 88 return CollaborationSession(collaboration_id, team, task) 89 90 async def find_collaborators(self, required_roles: List[str]) -> List[str]: 91 """Find peers with required capabilities""" 92 suitable_peers = [] 93 94 for peer_id in self.peers: 95 # Query peer capabilities 96 response = await self.send_message( 97 peer_id, 98 {"query": "capabilities"}, 99 "request" 100 ) 101 102 if any(role in response.get("roles", []) for role in required_roles): 103 suitable_peers.append(peer_id) 104 105 return suitable_peers 106
Agents organized in hierarchical structure:
1class HierarchicalAgent: 2 def __init__(self, agent_id: str, role: str, level: int): 3 self.id = agent_id 4 self.role = role 5 self.level = level # 0 = worker, 1 = manager, 2 = director 6 self.subordinates: List[HierarchicalAgent] = [] 7 self.manager: Optional[HierarchicalAgent] = None 8 9 def assign_subordinate(self, agent: 'HierarchicalAgent'): 10 """Add agent as subordinate""" 11 self.subordinates.append(agent) 12 agent.manager = self 13 14 def delegate_task(self, task: Task): 15 """Delegate task to subordinates""" 16 if not self.subordinates: 17 # Execute task directly 18 return self.execute(task) 19 20 # Decompose and assign to subordinates 21 subtasks = self.decompose_task(task) 22 results = [] 23 24 for subtask, subordinate in zip(subtasks, self.subordinates): 25 result = subordinate.delegate_task(subtask) 26 results.append(result) 27 28 # Aggregate results 29 return self.aggregate_subordinate_results(results) 30 31 def report_to_manager(self, report: Dict[str, Any]): 32 """Report progress to manager""" 33 if self.manager: 34 self.manager.receive_report(self.id, report) 35 36 def receive_report(self, subordinate_id: str, report: Dict[str, Any]): 37 """Receive report from subordinate""" 38 # Process report 39 if report.get("needs_help"): 40 self.provide_assistance(subordinate_id, report["issue"]) 41 42 # Aggregate and report upward if needed 43 if self.should_escalate(report): 44 self.report_to_manager({ 45 "from": subordinate_id, 46 "issue": report, 47 "escalated_by": self.id 48 }) 49
1from enum import Enum 2from dataclasses import dataclass 3 4class MessageType(Enum): 5 REQUEST = "request" 6 RESPONSE = "response" 7 INFORM = "inform" 8 QUERY = "query" 9 PROPOSE = "propose" 10 ACCEPT = "accept" 11 REJECT = "reject" 12 13@dataclass 14class AgentMessage: 15 sender: str 16 receiver: str 17 message_type: MessageType 18 content: Dict[str, Any] 19 conversation_id: str 20 timestamp: datetime 21 22class MessageBus: 23 def __init__(self): 24 self.queues: Dict[str, asyncio.Queue] = {} 25 self.message_history: List[AgentMessage] = [] 26 27 def register_agent(self, agent_id: str): 28 """Register agent's message queue""" 29 self.queues[agent_id] = asyncio.Queue() 30 31 async def send(self, message: AgentMessage): 32 """Send message to recipient""" 33 if message.receiver not in self.queues: 34 raise ValueError(f"Agent {message.receiver} not registered") 35 36 await self.queues[message.receiver].put(message) 37 self.message_history.append(message) 38 39 async def receive(self, agent_id: str) -> AgentMessage: 40 """Receive message for agent""" 41 if agent_id not in self.queues: 42 raise ValueError(f"Agent {agent_id} not registered") 43 44 return await self.queues[agent_id].get() 45 46 def get_conversation(self, conversation_id: str) -> List[AgentMessage]: 47 """Retrieve full conversation""" 48 return [ 49 msg for msg in self.message_history 50 if msg.conversation_id == conversation_id 51 ] 52
Agents read from and write to shared knowledge space:
1from threading import Lock 2 3class Blackboard: 4 def __init__(self): 5 self.data: Dict[str, Any] = {} 6 self.locks: Dict[str, Lock] = {} 7 self.subscribers: Dict[str, List[callable]] = {} 8 9 def write(self, key: str, value: Any, agent_id: str): 10 """Write data to blackboard""" 11 if key not in self.locks: 12 self.locks[key] = Lock() 13 14 with self.locks[key]: 15 old_value = self.data.get(key) 16 self.data[key] = { 17 "value": value, 18 "written_by": agent_id, 19 "timestamp": datetime.now() 20 } 21 22 # Notify subscribers 23 self.notify_subscribers(key, value, old_value) 24 25 def read(self, key: str) -> Any: 26 """Read data from blackboard""" 27 return self.data.get(key, {}).get("value") 28 29 def subscribe(self, key: str, callback: callable): 30 """Subscribe to changes on a key""" 31 if key not in self.subscribers: 32 self.subscribers[key] = [] 33 self.subscribers[key].append(callback) 34 35 def notify_subscribers(self, key: str, new_value: Any, old_value: Any): 36 """Notify subscribers of changes""" 37 if key in self.subscribers: 38 for callback in self.subscribers[key]: 39 callback(key, new_value, old_value) 40 41class BlackboardAgent: 42 def __init__(self, agent_id: str, blackboard: Blackboard): 43 self.id = agent_id 44 self.blackboard = blackboard 45 46 def contribute_knowledge(self, key: str, value: Any): 47 """Write knowledge to blackboard""" 48 self.blackboard.write(key, value, self.id) 49 50 def access_knowledge(self, key: str) -> Any: 51 """Access shared knowledge""" 52 return self.blackboard.read(key) 53 54 def watch_for_changes(self, key: str): 55 """Subscribe to knowledge changes""" 56 self.blackboard.subscribe(key, self.on_knowledge_updated) 57 58 def on_knowledge_updated(self, key: str, new_value: Any, old_value: Any): 59 """Handle knowledge updates""" 60 print(f"Agent {self.id}: {key} updated from {old_value} to {new_value}") 61 # React to changes 62
1from collections import Counter 2 3class VotingSystem: 4 def __init__(self, agents: List[Agent]): 5 self.agents = agents 6 self.votes: Dict[str, Any] = {} 7 8 async def call_vote(self, proposal: Dict[str, Any]) -> Dict[str, Any]: 9 """Conduct vote among agents""" 10 self.votes = {} 11 12 # Collect votes 13 vote_tasks = [ 14 self.get_agent_vote(agent, proposal) 15 for agent in self.agents 16 ] 17 await asyncio.gather(*vote_tasks) 18 19 # Tally votes 20 return self.tally_votes() 21 22 async def get_agent_vote(self, agent: Agent, proposal: Dict[str, Any]): 23 """Get individual agent's vote""" 24 vote = await agent.vote_on_proposal(proposal) 25 self.votes[agent.id] = vote 26 27 def tally_votes(self) -> Dict[str, Any]: 28 """Count votes and determine outcome""" 29 vote_counts = Counter(self.votes.values()) 30 total_votes = len(self.votes) 31 32 majority_threshold = total_votes // 2 + 1 33 winning_vote = vote_counts.most_common(1)[0] 34 35 return { 36 "outcome": winning_vote[0], 37 "vote_count": winning_vote[1], 38 "is_majority": winning_vote[1] >= majority_threshold, 39 "detailed_votes": dict(vote_counts) 40 } 41 42class VotingAgent: 43 async def vote_on_proposal(self, proposal: Dict[str, Any]) -> str: 44 """Agent evaluates proposal and votes""" 45 # Analyze proposal 46 analysis = await self.analyze_proposal(proposal) 47 48 # Make decision based on agent's goals and constraints 49 if analysis["benefits"] > analysis["costs"]: 50 return "approve" 51 elif analysis["benefits"] < analysis["costs"]: 52 return "reject" 53 else: 54 return "abstain" 55
1class ConsensusAgent: 2 def __init__(self, agent_id: str, initial_position: Any): 3 self.id = agent_id 4 self.position = initial_position 5 self.confidence = 1.0 6 7 async def negotiate(self, other_agents: List['ConsensusAgent'], rounds: int = 5): 8 """Negotiate with other agents to reach consensus""" 9 for round_num in range(rounds): 10 # Share position 11 positions = [agent.position for agent in other_agents] + [self.position] 12 13 # Evaluate positions 14 evaluation = await self.evaluate_positions(positions) 15 16 # Update position if convinced 17 if evaluation["should_update"]: 18 self.position = evaluation["new_position"] 19 self.confidence = evaluation["confidence"] 20 21 # Check for consensus 22 if self.check_consensus(other_agents): 23 return True 24 25 return False 26 27 async def evaluate_positions(self, positions: List[Any]) -> Dict[str, Any]: 28 """Evaluate other agents' positions""" 29 prompt = f""" 30 My current position: {self.position} 31 Other positions: {positions} 32 33 Should I update my position based on the arguments presented? 34 Consider: 35 1. Strength of arguments 36 2. Evidence provided 37 3. Consensus building 38 39 Return: should_update (bool), new_position (if updating), confidence (0-1) 40 """ 41 42 # Get evaluation from LLM 43 # Return structured decision 44 pass 45 46 def check_consensus(self, other_agents: List['ConsensusAgent']) -> bool: 47 """Check if consensus is reached""" 48 positions = [agent.position for agent in other_agents] + [self.position] 49 # Check if all positions are similar enough 50 return len(set(positions)) == 1 51
1class ContractNetManager: 2 async def announce_task(self, task: Task, agents: List[Agent]): 3 """Announce task and collect bids""" 4 # Announce task 5 bids = await self.collect_bids(task, agents) 6 7 # Evaluate bids 8 winner = self.select_winner(bids) 9 10 # Award contract 11 if winner: 12 await self.award_contract(winner, task) 13 return winner 14 15 return None 16 17 async def collect_bids(self, task: Task, agents: List[Agent]) -> List[Bid]: 18 """Collect bids from capable agents""" 19 bids = [] 20 21 for agent in agents: 22 if agent.can_perform(task): 23 bid = await agent.submit_bid(task) 24 if bid: 25 bids.append(bid) 26 27 return bids 28 29 def select_winner(self, bids: List[Bid]) -> Optional[Agent]: 30 """Select best bid""" 31 if not bids: 32 return None 33 34 # Evaluate bids (cost, quality, time) 35 scored_bids = [ 36 (bid, self.score_bid(bid)) 37 for bid in bids 38 ] 39 40 winner = max(scored_bids, key=lambda x: x[1]) 41 return winner[0].agent 42 43 def score_bid(self, bid: Bid) -> float: 44 """Score bid based on multiple criteria""" 45 # Normalize and weight different factors 46 cost_score = 1.0 / (bid.cost + 1) # Lower cost is better 47 quality_score = bid.quality # Higher quality is better 48 time_score = 1.0 / (bid.estimated_time + 1) # Faster is better 49 50 return ( 51 0.4 * cost_score + 52 0.4 * quality_score + 53 0.2 * time_score 54 ) 55 56class BiddingAgent: 57 async def submit_bid(self, task: Task) -> Optional[Bid]: 58 """Evaluate task and submit bid""" 59 # Assess capability 60 capability = self.assess_capability(task) 61 62 if capability < 0.5: # Not capable enough 63 return None 64 65 # Estimate cost and time 66 cost = self.estimate_cost(task) 67 time = self.estimate_time(task) 68 69 return Bid( 70 agent=self, 71 task=task, 72 cost=cost, 73 quality=capability, 74 estimated_time=time 75 ) 76
1class CoalitionAgent: 2 def __init__(self, agent_id: str, capabilities: List[str]): 3 self.id = agent_id 4 self.capabilities = capabilities 5 self.current_coalition: Optional[Coalition] = None 6 7 async def form_coalition(self, task: Task, available_agents: List['CoalitionAgent']): 8 """Form coalition to accomplish task""" 9 required_capabilities = task.required_capabilities 10 11 # Find complementary agents 12 coalition_members = self.find_complementary_agents( 13 available_agents, 14 required_capabilities 15 ) 16 17 # Propose coalition 18 coalition = Coalition(task, coalition_members) 19 20 # Get agreement from all members 21 if await coalition.get_consensus(): 22 self.current_coalition = coalition 23 return coalition 24 25 return None 26 27 def find_complementary_agents( 28 self, 29 agents: List['CoalitionAgent'], 30 required_capabilities: List[str] 31 ) -> List['CoalitionAgent']: 32 """Find agents that together cover required capabilities""" 33 selected = [self] 34 covered_capabilities = set(self.capabilities) 35 36 for agent in agents: 37 if agent.id == self.id: 38 continue 39 40 # Check if agent adds new capabilities 41 new_capabilities = set(agent.capabilities) - covered_capabilities 42 43 if new_capabilities and len(selected) < 10: # Max coalition size 44 selected.append(agent) 45 covered_capabilities.update(agent.capabilities) 46 47 # Check if all capabilities covered 48 if all(cap in covered_capabilities for cap in required_capabilities): 49 break 50 51 return selected if covered_capabilities.issuperset(required_capabilities) else [] 52 53class Coalition: 54 def __init__(self, task: Task, members: List[CoalitionAgent]): 55 self.task = task 56 self.members = members 57 self.agreements: Dict[str, bool] = {} 58 59 async def get_consensus(self) -> bool: 60 """Get agreement from all coalition members""" 61 for member in self.members: 62 agreed = await member.agree_to_coalition(self) 63 self.agreements[member.id] = agreed 64 65 return all(self.agreements.values()) 66 67 async def execute_task(self): 68 """Execute task collaboratively""" 69 # Assign subtasks to members 70 assignments = self.assign_subtasks() 71 72 # Execute in parallel 73 results = await asyncio.gather(*[ 74 member.execute_subtask(subtask) 75 for member, subtask in assignments.items() 76 ]) 77 78 # Aggregate results 79 return self.aggregate_results(results) 80
1class SelfOrganizingSystem: 2 def __init__(self, agents: List[Agent]): 3 self.agents = agents 4 self.environment = Environment() 5 6 def simulate(self, steps: int): 7 """Simulate system to observe emergent behavior""" 8 for step in range(steps): 9 # Each agent observes environment 10 for agent in self.agents: 11 observations = agent.observe(self.environment) 12 13 # Agent makes local decision 14 action = agent.decide(observations) 15 16 # Agent acts on environment 17 agent.act(action, self.environment) 18 19 # Environment updates 20 self.environment.update() 21 22 # Analyze emergent patterns 23 self.analyze_patterns(step) 24 25 def analyze_patterns(self, step: int): 26 """Detect emergent patterns in agent behavior""" 27 # Analyze agent positions, states, interactions 28 patterns = { 29 "clusters": self.detect_clusters(), 30 "coordination_level": self.measure_coordination(), 31 "efficiency": self.measure_efficiency() 32 } 33 34 print(f"Step {step}: {patterns}") 35 36 def detect_clusters(self) -> int: 37 """Detect agent clustering""" 38 # Implementation depends on agent properties 39 pass 40 41 def measure_coordination(self) -> float: 42 """Measure how well agents coordinate""" 43 # Analyze agent interactions and outcomes 44 pass 45
1# Scientific research collaboration 2research_system = MultiAgentSystem() 3research_system.add_agents([ 4 ResearchAgent("literature_reviewer", "Literature Review"), 5 ResearchAgent("data_analyst", "Data Analysis"), 6 ResearchAgent("hypothesis_generator", "Hypothesis Generation"), 7 ResearchAgent("experiment_designer", "Experiment Design"), 8 ResearchAgent("writer", "Paper Writing") 9]) 10 11result = await research_system.collaborate_on_task( 12 "Investigate the effects of XYZ on ABC" 13) 14
1# Supply chain optimization 2supply_chain = MultiAgentSystem() 3supply_chain.add_agents([ 4 SupplierAgent("supplier_1"), 5 ManufacturerAgent("manufacturer_1"), 6 DistributorAgent("distributor_1"), 7 RetailerAgent("retailer_1"), 8 LogisticsAgent("logistics_1") 9]) 10 11# Agents negotiate and coordinate automatically 12supply_chain.optimize_operations() 13
1# Traffic management system 2city_system = MultiAgentSystem() 3city_system.add_agents([ 4 TrafficLightAgent(intersection_id) for intersection_id in intersections 5] + [ 6 EmergencyVehicleAgent(vehicle_id) for vehicle_id in emergency_vehicles 7]) 8 9# Agents coordinate to optimize traffic flow 10city_system.run_real_time_optimization() 11
Multi-agent systems represent the cutting edge of AI architecture. By distributing intelligence across specialized agents that can communicate, coordinate, and collaborate, we can:
The key to successful multi-agent systems is thoughtful design of:
Start simple, with 2-3 agents, master the coordination patterns, then scale to more complex systems. The future of AI is collaborative, distributed, and emergent.
Build your multi-agent system today. Start with a clear problem, design specialized agents, implement communication protocols, and watch emergent intelligence unfold.
An in-depth look at how AI agents are revolutionizing software development and enterprise operations in 2025.

Fortan Pireva
November 8, 2025

Master CrewAI to build collaborative multi-agent systems where specialized AI agents work together to solve complex tasks.

Fortan Pireva
November 28, 2024

A comprehensive guide to the latest advancements in LLMs , including key architectures, techniques, and applications.

Fortan Pireva
May 27, 2024