_SH Log's
Back to Root
EST: 5 min read

Multi-Agent AI Systems: Architecture Patterns

Multi-agent AI systems — where multiple LLM agents collaborate on complex tasks — require careful orchestration design. Here are the patterns that work in production.

#ai-agents#systems#llm#architecture

Multi-agent AI systems — where multiple LLM-powered agents collaborate to complete complex tasks — are the next frontier in applied AI. A single agent handles one task; a multi-agent system handles tasks too complex for any single agent: long research tasks, parallel code generation, adversarial self-review. Here are the architecture patterns I've built and run in production.

Why multi-agent?

Single-agent limits:

  • Context window: one agent can only hold ~100k tokens of context
  • Quality: an agent can't effectively critique its own output
  • Parallelism: one agent is sequential
  • Specialization: a generalist agent is mediocre at everything

Multi-agent solutions:

  • Parallel execution: multiple agents work on independent sub-tasks simultaneously
  • Specialization: each agent is prompted as an expert in a narrow domain
  • Critique loops: one agent generates, another critiques, the first revises
  • Context distribution: spread a large problem across multiple agents

Pattern 1: Orchestrator + Workers

The most common pattern. One orchestrator agent decomposes the task and delegates; worker agents execute:

class OrchestratorAgent:
    def __init__(self, llm, workers: dict[str, WorkerAgent]):
        self.llm = llm
        self.workers = workers

    async def execute(self, task: str) -> str:
        # Decompose into subtasks
        plan = await self.llm.complete(f"""
        Decompose this task into subtasks for specialized agents.
        Available agents: {list(self.workers.keys())}
        Task: {task}
        Output JSON: {{"subtasks": [{{"agent": "...", "task": "..."}}]}}
        """)
        subtasks = json.loads(plan)["subtasks"]

        # Execute in parallel if independent
        results = await asyncio.gather(*[
            self.workers[st["agent"]].execute(st["task"])
            for st in subtasks
        ])

        # Synthesize
        return await self.llm.complete(
            f"Synthesize these results into a final answer:\n" +
            "\n".join(f"[{st['agent']}]: {r}" for st, r in zip(subtasks, results))
        )

Worker agents are specialized: code_writer, test_writer, security_reviewer, docs_writer. Each has a focused system prompt.

Used in: softco (my Claude Code skill), QuantumSketch video pipeline.

Pattern 2: Generator-Critic Loop

One agent generates; another critiques; the generator revises. Produces dramatically better output than a single agent's first pass:

async def generator_critic_loop(
    generator: Agent,
    critic: Agent,
    task: str,
    max_rounds: int = 3
) -> str:
    content = await generator.complete(task)

    for round in range(max_rounds):
        critique = await critic.complete(f"""
        Review this output for the task: {task}
        
        Output to review:
        {content}
        
        Identify specific problems. If acceptable, respond "APPROVED".
        """)

        if "APPROVED" in critique:
            break

        content = await generator.complete(f"""
        Revise your output based on this critique:
        {critique}
        
        Original task: {task}
        """)

    return content

The critic's system prompt is different from the generator's — often adversarial ("find problems, don't accept mediocrity"). This models the peer review process.

Used in: BikroyBuddy negotiation quality, ChessGoddess explanation quality.

Pattern 3: Map-Reduce for Large Inputs

When input is too large for one agent's context, distribute across multiple agents and aggregate:

func MapReduce(ctx context.Context, agents []Agent, input []Chunk, task string) (string, error) {
    // Map: each agent processes a chunk
    mapResults := make([]string, len(input))
    var wg sync.WaitGroup
    
    for i, chunk := range input {
        wg.Add(1)
        go func(idx int, c Chunk) {
            defer wg.Done()
            agent := agents[idx % len(agents)] // round-robin agent assignment
            mapResults[idx], _ = agent.Complete(ctx, fmt.Sprintf(
                "Task: %s\n\nInput chunk:\n%s", task, c.Text,
            ))
        }(i, chunk)
    }
    wg.Wait()

    // Reduce: aggregate all map results
    reducer := agents[0]
    return reducer.Complete(ctx, fmt.Sprintf(
        "Synthesize these partial results:\n%s",
        strings.Join(mapResults, "\n---\n"),
    ))
}

Used in: common-knowledge large document ingestion, exoplanet research paper analysis.

Pattern 4: Agent Swarm (Blackboard)

Multiple specialist agents share a "blackboard" — a shared state they all read and write. No central orchestrator; agents act based on what they see on the blackboard:

class Blackboard:
    def __init__(self):
        self.state = {}
        self.lock = asyncio.Lock()
    
    async def write(self, key: str, value: any, agent_id: str):
        async with self.lock:
            self.state[key] = {"value": value, "written_by": agent_id, "ts": time.time()}
    
    async def read(self, key: str) -> any:
        return self.state.get(key, {}).get("value")

class SpecialistAgent:
    def __init__(self, specialty: str, blackboard: Blackboard):
        self.specialty = specialty
        self.blackboard = blackboard

    async def run(self):
        while True:
            # Check if there's work in my specialty
            task = await self.blackboard.read(f"task_{self.specialty}")
            if task:
                result = await self.complete(task)
                await self.blackboard.write(f"result_{self.specialty}", result, self.specialty)
            await asyncio.sleep(0.1)

This pattern is more autonomous but harder to debug — use only when task decomposition isn't known upfront.

Observability: the hardest part

Multi-agent systems fail in complex ways. You need:

class TracingAgent:
    def __init__(self, agent: Agent, tracer: Tracer):
        self.agent = agent
        self.tracer = tracer

    async def complete(self, prompt: str) -> str:
        with self.tracer.start_span(f"agent.{self.agent.name}") as span:
            span.set_attribute("prompt_length", len(prompt))
            result = await self.agent.complete(prompt)
            span.set_attribute("response_length", len(result))
            span.set_attribute("tokens_used", self.agent.last_tokens_used)
            return result

OpenTelemetry traces that show the full agent call tree are essential for debugging.

FAQ

What is a multi-agent AI system? A multi-agent AI system uses multiple LLM-powered agents that collaborate on a task too complex for any single agent — through parallelism, specialization, critique loops, or distributed context.

What's the difference between orchestrator and swarm patterns? Orchestrator patterns have a central coordinating agent that delegates to workers (controlled, debuggable). Swarm patterns have agents acting autonomously on shared state (more flexible, harder to debug). Use orchestrators for deterministic tasks, swarms for emergent tasks.

How do you prevent agent loops? Set explicit turn limits, use a critic that can output "APPROVED" to terminate, and add timeouts on every agent call. Never rely on an agent to self-terminate a reasoning loop.

What's the biggest challenge with multi-agent systems? Observability — when something goes wrong, you need to trace which agent produced what output, in what order, with what prompts. Full OpenTelemetry tracing from the start saves enormous debugging time later.


Written by Shihab Shahriar Antor — AI Engineer & Founder of Shahriar Labs. See also: MCP Explained: Model Context Protocol for AI Agents · RAG in Production: Architecture That Actually Scales.