AI Agent

멀티에이전트 오케스트레이션 패턴 5가지 — 언제 무엇을 쓸 것인가

cell-devlog 2026. 5. 26. 16:37
반응형

"에이전트를 더 많이 쓰면 더 좋아진다." 2024년의 믿음이었습니다. 2026년 프로덕션 현장의 데이터는 다른 이야기를 합니다. Princeton NLP 연구에서 단일 에이전트가 동일한 툴과 컨텍스트를 주었을 때 멀티에이전트를 64%의 태스크에서 동등하거나 능가했습니다. 멀티에이전트는 정확도를 평균 2.1% 올리지만 비용은 약 2배입니다. Gartner가 Q1 2024에서 Q2 2025 사이 멀티에이전트 도입 문의가 1,445% 급증했다고 보고할 동안, Cognition은 "Don't Build Multi-Agents"를 발표했다가 8개월 후 "Devin이 Devin을 관리할 수 있다"로 돌아왔습니다. 이것이 이 분야의 현주소입니다. 5가지 핵심 패턴을 구조·코드·트레이드오프와 함께 정리하고, 언제 단일 에이전트로 충분한지도 명확히 말합니다.


이 포스트 한 줄 요약 → 패턴 1: Supervisor — 오케스트레이터가 서브에이전트를 스폰, 2026 프로덕션 기본값 → 패턴 2: Pipeline — 순차 핸드오프, 예측 가능한 선형 플로우 → 패턴 3: Fan-out / Fan-in (MapReduce) — 병렬 실행으로 레이턴시 60% 절감 → 패턴 4: Debate (Maker-Checker) — 두 에이전트 대립 + 심판, 정확도 우선 → 패턴 5: Dynamic Handoff (Swarm) — 에이전트가 다음 에이전트를 스스로 선택 → 단일 에이전트가 64% 태스크에서 동급 이상 — 멀티에이전트는 복잡도 비용이 있음 → 2026 기본값: Supervisor. 나머지는 구체적 요건이 있을 때만 → 가장 흔한 실패 모드: 무한 핸드오프 루프, 서브에이전트 컨텍스트 오염, 전체 트랜스크립트 전달


멀티에이전트를 쓰기 전에 물어볼 것

패턴 5가지를 배우기 전에 먼저 자문해야 할 질문이 있습니다.

단일 에이전트로 충분한가?

YES → 단일 에이전트 유지
  - 태스크가 선형이고 의존성이 명확한 경우
  - 64%의 벤치마크 태스크에서 단일이 멀티와 동급
  - 복잡도·비용·디버깅 어려움 추가 없음

NO → 멀티에이전트 고려
  - 서로 독립적인 대용량 병렬 작업 (Fan-out)
  - 다른 전문성이 필요한 구분된 도메인 (Supervisor)
  - 정확도가 최우선이고 다관점 검증 필요 (Debate)
  - 어떤 전문가가 필요한지 런타임에 결정 (Dynamic Handoff)
  - 단계마다 완전히 다른 컨텍스트·권한 필요 (Pipeline)

패턴 1 — Supervisor (2026 프로덕션 기본값)

구조: 오케스트레이터 하나가 전체 컨텍스트를 보유하고 서브에이전트를 스폰합니다. 서브에이전트는 독립된 컨텍스트에서 실행하고 요약 문자열만 반환합니다. P2P 통신 없음.

2026년 주요 5개 프레임워크가 이 패턴으로 수렴했습니다. Anthropic의 "brain/hands" 아키텍처, LangGraph의 Supervisor 패턴, OpenAI Agents SDK의 중첩 핸드오프가 모두 이 구조입니다.

from anthropic import Anthropic

client = Anthropic()

# ── 서브에이전트 정의 ──────────────────────────────
def run_subagent(
    role: str,
    system_prompt: str,
    task: str,
    tools: list | None = None,
) -> str:
    """
    격리된 서브에이전트 실행.
    중요: 전체 대화 기록이 아닌 태스크만 받고 요약만 반환.
    """
    messages = [{"role": "user", "content": task}]
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        system=f"당신은 {role} 전문가입니다.\n{system_prompt}",
        messages=messages,
        tools=tools or [],
    )
    # 전체 응답이 아닌 핵심 요약만 반환 (컨텍스트 오염 방지)
    return response.content[0].text


# ── 오케스트레이터 ────────────────────────────────
ORCHESTRATOR_SYSTEM = """당신은 코드 리뷰 오케스트레이터입니다.
서브에이전트에게 태스크를 위임하고 결과를 종합합니다.
각 서브에이전트의 출력을 그대로 전달하지 말고 핵심만 통합하세요."""

def orchestrate_code_review(code: str, language: str = "python") -> dict:
    """
    Supervisor 패턴: 코드 리뷰를 3개 전문 서브에이전트에게 위임
    """

    # 1. 오케스트레이터가 태스크 분해
    decomp_response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=512,
        system=ORCHESTRATOR_SYSTEM,
        messages=[{
            "role": "user",
            "content": f"다음 {language} 코드를 리뷰해야 합니다:\n```{language}\n{code}\n```\n"
                       "어떤 측면을 각 전문가에게 위임할지 JSON으로 결정하세요.",
        }],
    )

    # 2. 서브에이전트 병렬(또는 순차) 실행
    import asyncio

    async def run_all():
        results = await asyncio.gather(
            asyncio.to_thread(run_subagent,
                "보안 전문가",
                "코드의 보안 취약점만 분석합니다.",
                f"다음 코드의 보안 취약점을 찾아 요약하세요:\n```{language}\n{code}\n```",
            ),
            asyncio.to_thread(run_subagent,
                "성능 전문가",
                "코드의 성능 및 복잡도만 분석합니다.",
                f"다음 코드의 성능 문제를 찾아 요약하세요:\n```{language}\n{code}\n```",
            ),
            asyncio.to_thread(run_subagent,
                "가독성 전문가",
                "코드의 가독성과 유지보수성만 분석합니다.",
                f"다음 코드의 가독성 문제를 요약하세요:\n```{language}\n{code}\n```",
            ),
        )
        return results

    security_summary, performance_summary, readability_summary = asyncio.run(run_all())

    # 3. 오케스트레이터가 서브에이전트 요약을 통합 (트랜스크립트 아님)
    final_response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=1024,
        system=ORCHESTRATOR_SYSTEM,
        messages=[{
            "role": "user",
            "content": f"""세 전문가의 리뷰 요약을 통합해 최종 리포트를 작성하세요.

보안 검토: {security_summary}

성능 검토: {performance_summary}

가독성 검토: {readability_summary}

우선순위 순으로 핵심 개선사항 5개 이내로 정리하세요.""",
        }],
    )

    return {
        "pattern": "supervisor",
        "subagent_results": {
            "security": security_summary,
            "performance": performance_summary,
            "readability": readability_summary,
        },
        "final_report": final_response.content[0].text,
    }

핵심 규칙 3가지 (2026 프로덕션 검증)

1. 서브에이전트는 독립된 시스템 프롬프트 필수
   → 오케스트레이터 프롬프트 재사용 금지
   → 역할 범위를 명확히 제한

2. 첫 번째 유저 메시지는 구조화된 브리프
   → "목표, 형식, 사용 가능한 툴, 경계"를 명시
   → 자유형식 위임은 documented failure mode

3. 요약 문자열만 반환, 전체 트랜스크립트 절대 금지
   → 트랜스크립트 인라인 포함 시 컨텍스트를 15× 더 소비
   → swarm vs supervisor 성능 차이의 ~50%가 이 규칙에서 옴

패턴 2 — Pipeline (순차 핸드오프)

구조: A → B → C → D 순서로 각 에이전트가 이전 에이전트의 출력을 받아 처리합니다. 순서가 절대 바뀌지 않고 각 단계가 다음 단계에 의존할 때 씁니다.

from dataclasses import dataclass, field
from typing import Callable, Any

@dataclass
class PipelineStage:
    name: str
    agent_fn: Callable
    system_prompt: str
    model: str = "claude-sonnet-4-6"

class AgentPipeline:
    """
    Pipeline 패턴: 선형 순차 핸드오프.
    각 스테이지는 이전 스테이지의 출력을 입력으로 받음.
    """

    def __init__(self, stages: list[PipelineStage]):
        self.stages = stages

    def run(self, initial_input: str) -> dict:
        current = initial_input
        history = [{"stage": "input", "output": initial_input}]

        for stage in self.stages:
            response = client.messages.create(
                model=stage.model,
                max_tokens=2048,
                system=stage.system_prompt,
                messages=[{
                    "role": "user",
                    "content": current,
                }],
            )
            current = response.content[0].text
            history.append({
                "stage": stage.name,
                "output": current,
                "tokens_used": response.usage.output_tokens,
            })

        return {
            "pattern": "pipeline",
            "final_output": current,
            "stage_history": history,
        }


# 사용 예: 기술 문서 작성 파이프라인
doc_pipeline = AgentPipeline([
    PipelineStage(
        name="researcher",
        agent_fn=None,
        system_prompt="주어진 주제를 조사해 핵심 사실과 개념을 정리합니다.",
        model="claude-sonnet-4-6",
    ),
    PipelineStage(
        name="writer",
        agent_fn=None,
        system_prompt="조사 결과를 바탕으로 기술 문서 초안을 작성합니다.",
        model="claude-sonnet-4-6",
    ),
    PipelineStage(
        name="editor",
        agent_fn=None,
        system_prompt="문서를 검토해 명확성·정확성·흐름을 개선합니다.",
        model="claude-opus-4-7",  # 편집은 더 강력한 모델
    ),
    PipelineStage(
        name="formatter",
        agent_fn=None,
        system_prompt="마크다운 형식으로 최종 포맷팅합니다. 헤더, 코드 블록, 목차를 추가합니다.",
        model="claude-haiku-4-5-20251001",  # 형식 작업은 경량 모델
    ),
])

result = doc_pipeline.run("Python 비동기 프로그래밍의 핵심 개념")

Pipeline vs Supervisor 선택 기준

Pipeline 선택:
  ✅ 단계 순서가 항상 동일 (검색→분석→작성→편집)
  ✅ 각 단계가 이전 단계 완료에 의존
  ✅ 단계별 다른 모델/비용 최적화 필요

Supervisor 선택:
  ✅ 태스크를 런타임에 분해해야 함
  ✅ 병렬 처리 가능한 독립적 서브태스크
  ✅ 어떤 전문가가 필요한지 미리 결정 불가

패턴 3 — Fan-out / Fan-in (MapReduce)

구조: 오케스트레이터가 동일한 태스크를 여러 에이전트에게 동시에 배포(Fan-out)하고, 모든 결과를 수집해 통합(Fan-in)합니다. LangGraph에서 Send API로 네이티브 지원. 레이턴시 60% 절감 가능.

import asyncio
from anthropic import Anthropic

client = Anthropic()

async def process_chunk_async(
    chunk: str,
    chunk_id: int,
    system_prompt: str,
    model: str = "claude-sonnet-4-6",
) -> dict:
    """단일 청크를 비동기로 처리하는 워커"""
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        None,
        lambda: client.messages.create(
            model=model,
            max_tokens=512,
            system=system_prompt,
            messages=[{"role": "user", "content": chunk}],
        )
    )
    return {
        "chunk_id": chunk_id,
        "result": response.content[0].text,
        "tokens": response.usage.output_tokens,
    }


async def fan_out_fan_in(
    items: list[str],
    worker_system_prompt: str,
    reducer_system_prompt: str,
    max_parallel: int = 8,    # 동시 실행 최대값
    worker_model: str = "claude-sonnet-4-6",
    reducer_model: str = "claude-opus-4-7",
) -> dict:
    """
    Fan-out/Fan-in 패턴.
    items를 병렬로 처리 후 단일 결과로 집약.
    """

    semaphore = asyncio.Semaphore(max_parallel)

    async def bounded_process(item, idx):
        async with semaphore:
            return await process_chunk_async(item, idx, worker_system_prompt, worker_model)

    # Fan-out: 병렬 실행
    start = asyncio.get_event_loop().time()
    worker_results = await asyncio.gather(
        *[bounded_process(item, i) for i, item in enumerate(items)]
    )
    parallel_time = asyncio.get_event_loop().time() - start

    # Fan-in: 결과 집약
    combined = "\n\n".join([
        f"[결과 {r['chunk_id']+1}]\n{r['result']}"
        for r in sorted(worker_results, key=lambda x: x["chunk_id"])
    ])

    loop = asyncio.get_event_loop()
    final = await loop.run_in_executor(
        None,
        lambda: client.messages.create(
            model=reducer_model,
            max_tokens=2048,
            system=reducer_system_prompt,
            messages=[{
                "role": "user",
                "content": f"다음 {len(items)}개 분석 결과를 통합해 최종 요약을 작성하세요:\n\n{combined}",
            }],
        )
    )

    total_tokens = sum(r["tokens"] for r in worker_results)
    return {
        "pattern": "fan_out_fan_in",
        "n_workers": len(items),
        "parallel_execution_seconds": round(parallel_time, 2),
        "sequential_estimate_seconds": round(parallel_time * len(items) / max_parallel, 2),
        "latency_reduction_pct": round((1 - 1/min(len(items), max_parallel)) * 100, 1),
        "worker_tokens": total_tokens,
        "final_result": final.content[0].text,
    }


# 사용 예: 대규모 코드베이스 분석
async def analyze_codebase(file_contents: list[str]) -> dict:
    return await fan_out_fan_in(
        items=file_contents,
        worker_system_prompt="주어진 코드 파일의 의존성, 책임, 잠재적 문제를 2~3문장으로 요약합니다.",
        reducer_system_prompt="각 파일 분석을 통합해 전체 아키텍처 평가와 핵심 개선사항을 작성합니다.",
    )

LangGraph 네이티브 Fan-out (Send API)

from langgraph.graph import StateGraph, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
import operator

class MapReduceState(TypedDict):
    items: list[str]
    results: Annotated[list[str], operator.add]  # 병렬 결과 자동 합산
    final_output: str

def map_node(state: dict) -> list[Send]:
    """Fan-out: 각 아이템을 독립 워커로 분산"""
    return [
        Send("worker", {"item": item, "item_id": i})
        for i, item in enumerate(state["items"])
    ]

def worker_node(state: dict) -> dict:
    """병렬 워커 — LangGraph가 자동으로 동시 실행"""
    result = process_item(state["item"])
    return {"results": [result]}

def reduce_node(state: dict) -> dict:
    """Fan-in: 모든 워커 결과를 수집해 통합"""
    final = aggregate_results(state["results"])
    return {"final_output": final}

workflow = StateGraph(MapReduceState)
workflow.add_node("dispatcher", map_node)
workflow.add_node("worker", worker_node)
workflow.add_node("reducer", reduce_node)

workflow.set_entry_point("dispatcher")
workflow.add_conditional_edges("dispatcher", map_node)  # Fan-out
workflow.add_edge("worker", "reducer")                  # Fan-in
workflow.add_edge("reducer", END)

패턴 4 — Debate (Maker-Checker)

구조: 두 에이전트가 같은 문제에 대해 독립적으로 답을 내거나 서로의 답을 비판하고, 심판 에이전트가 최종 결정합니다. 정확도가 속도보다 중요할 때, 고위험 결정에 씁니다.

from anthropic import Anthropic
import json

client = Anthropic()

def run_debate(
    question: str,
    n_rounds: int = 2,
    proponent_model: str = "claude-opus-4-7",
    critic_model: str = "claude-opus-4-7",
    judge_model: str = "claude-opus-4-7",
) -> dict:
    """
    Debate 패턴: 두 에이전트의 대립 + 심판 판정.
    n_rounds: 주장-반박 반복 횟수
    """

    debate_history = []
    current_claim = None

    # 1라운드: 초기 주장
    proponent_response = client.messages.create(
        model=proponent_model,
        max_tokens=768,
        system="""당신은 주어진 질문에 최선의 답을 제시하는 전문가입니다.
논리적 근거와 함께 명확한 입장을 밝히세요.""",
        messages=[{"role": "user", "content": question}],
    )
    current_claim = proponent_response.content[0].text
    debate_history.append({"role": "proponent", "content": current_claim, "round": 0})

    # n라운드 반박-재반박 반복
    for round_num in range(1, n_rounds + 1):
        # 비판자: 현재 주장의 약점 공격
        critic_response = client.messages.create(
            model=critic_model,
            max_tokens=512,
            system="""당신은 비판적 사고 전문가입니다.
주어진 주장의 논리적 오류, 누락된 반례, 약점을 찾아 구체적으로 반박하세요.
근거 없는 비판은 하지 마세요.""",
            messages=[{
                "role": "user",
                "content": f"질문: {question}\n\n주장: {current_claim}\n\n이 주장의 약점을 비판하세요.",
            }],
        )
        criticism = critic_response.content[0].text
        debate_history.append({"role": "critic", "content": criticism, "round": round_num})

        # 주장자: 비판에 대한 방어 및 강화
        if round_num < n_rounds:
            defense_response = client.messages.create(
                model=proponent_model,
                max_tokens=512,
                system="주장에 대한 비판을 검토하고 논거를 강화하거나 필요 시 수정하세요.",
                messages=[{
                    "role": "user",
                    "content": f"원래 주장: {current_claim}\n\n비판: {criticism}\n\n비판에 응답하세요.",
                }],
            )
            current_claim = defense_response.content[0].text
            debate_history.append({
                "role": "proponent_defense",
                "content": current_claim,
                "round": round_num,
            })

    # 심판: 토론 전체를 보고 최종 판정
    debate_summary = "\n\n".join([
        f"[{d['role'].upper()} - 라운드 {d['round']}]\n{d['content']}"
        for d in debate_history
    ])

    judge_response = client.messages.create(
        model=judge_model,
        max_tokens=1024,
        system="""당신은 공정한 심판입니다.
토론을 검토하고 가장 논리적이고 정확한 결론을 내립니다.
어느 쪽에도 치우치지 않고 오직 논거의 강도로 판단합니다.""",
        messages=[{
            "role": "user",
            "content": f"질문: {question}\n\n토론 기록:\n{debate_summary}\n\n최종 판정과 근거를 JSON으로 출력하세요:\n"
                       '{"verdict": "결론", "reasoning": "근거", "confidence": "high|medium|low"}',
        }],
    )

    try:
        judgment = json.loads(judge_response.content[0].text)
    except Exception:
        judgment = {"verdict": judge_response.content[0].text, "confidence": "medium"}

    return {
        "pattern": "debate",
        "question": question,
        "debate_history": debate_history,
        "judgment": judgment,
        "n_api_calls": len(debate_history) + 1,
        "cost_multiplier": f"~{len(debate_history) + 1}×",
    }


# Maker-Checker 변형 — 더 단순한 버전
def maker_checker(
    task: str,
    maker_model: str = "claude-sonnet-4-6",
    checker_model: str = "claude-opus-4-7",
) -> dict:
    """
    Maker-Checker: 생성 → 독립 검증.
    코드 리뷰, 수학 검증, 팩트 체크에 적합.
    """

    # Maker: 결과물 생성
    made = client.messages.create(
        model=maker_model,
        max_tokens=1024,
        messages=[{"role": "user", "content": task}],
    ).content[0].text

    # Checker: 독립적으로 검증 (Maker 결과를 모르는 척)
    check_prompt = f"""다음 결과물을 독립적으로 검증하세요.
오류, 엣지 케이스, 개선점을 찾으세요.
문제가 없으면 "PASS"를 출력하세요.

[원래 태스크]
{task}

[검증할 결과물]
{made}"""

    checked = client.messages.create(
        model=checker_model,
        max_tokens=512,
        messages=[{"role": "user", "content": check_prompt}],
    ).content[0].text

    passed = "PASS" in checked.upper() and len(checked) < 100

    return {
        "pattern": "maker_checker",
        "result": made,
        "check_result": checked,
        "passed": passed,
        "needs_revision": not passed,
    }

비용 경고: Debate는 구조적으로 최소 2~4× 비용이 발생합니다. 정확도가 속도·비용보다 중요한 경우에만 사용합니다. 모든 태스크에 Debate를 쓰는 것은 가장 흔한 멀티에이전트 안티패턴 중 하나입니다.


패턴 5 — Dynamic Handoff (Swarm)

구조: 에이전트가 자신의 역할을 완수하면 스스로 다음 에이전트를 선택합니다. 중앙 오케스트레이터 없이 에이전트 간 직접 제어권 이전. 어떤 전문가가 필요한지 런타임에만 알 수 있을 때 씁니다.

from anthropic import Anthropic
from typing import Callable
import json

client = Anthropic()

class Agent:
    def __init__(
        self,
        name: str,
        system_prompt: str,
        available_handoffs: list["Agent"] | None = None,
        model: str = "claude-sonnet-4-6",
    ):
        self.name = name
        self.system_prompt = system_prompt
        self.handoffs = available_handoffs or []
        self.model = model

    def run(self, message: str, max_handoffs: int = 5) -> dict:
        """
        에이전트 실행 + 필요 시 다음 에이전트로 핸드오프.
        max_handoffs: 무한 루프 방지
        """
        handoff_count = 0
        current_agent = self
        current_message = message
        trace = []

        while handoff_count <= max_handoffs:
            # 핸드오프 툴 정의
            handoff_tools = [
                {
                    "name": f"handoff_to_{agent.name}",
                    "description": f"{agent.name}에게 제어권을 이전합니다.",
                    "input_schema": {
                        "type": "object",
                        "properties": {
                            "reason": {"type": "string", "description": "핸드오프 이유"},
                            "context": {"type": "string", "description": "다음 에이전트에게 전달할 컨텍스트"},
                        },
                        "required": ["reason", "context"],
                    },
                }
                for agent in current_agent.handoffs
            ]

            response = client.messages.create(
                model=current_agent.model,
                max_tokens=1024,
                system=current_agent.system_prompt,
                tools=handoff_tools,
                messages=[{"role": "user", "content": current_message}],
            )

            # 핸드오프 발생 여부 확인
            handoff_call = None
            text_output = ""
            for block in response.content:
                if hasattr(block, "text"):
                    text_output = block.text
                elif hasattr(block, "name") and block.name.startswith("handoff_to_"):
                    handoff_call = block

            trace.append({
                "agent": current_agent.name,
                "output": text_output,
                "handoff": handoff_call.name if handoff_call else None,
            })

            if not handoff_call:
                # 핸드오프 없음 → 최종 결과
                return {
                    "pattern": "dynamic_handoff",
                    "final_agent": current_agent.name,
                    "final_output": text_output,
                    "trace": trace,
                    "n_handoffs": handoff_count,
                }

            # 다음 에이전트 찾기
            target_name = handoff_call.name.replace("handoff_to_", "")
            next_agent = next(
                (a for a in current_agent.handoffs if a.name == target_name), None
            )

            if not next_agent:
                break  # 핸드오프 대상을 못 찾으면 현재 에이전트가 마무리

            # 핸드오프 실행
            handoff_input = json.loads(handoff_call.input or "{}")
            current_message = (
                f"이전 에이전트({current_agent.name})로부터 핸드오프:\n"
                f"이유: {handoff_input.get('reason', '')}\n"
                f"컨텍스트: {handoff_input.get('context', '')}"
            )
            current_agent = next_agent
            handoff_count += 1

        return {
            "pattern": "dynamic_handoff",
            "final_agent": current_agent.name,
            "final_output": text_output,
            "trace": trace,
            "max_handoffs_reached": handoff_count > max_handoffs,
        }


# 사용 예: 고객 지원 라우팅
billing_agent = Agent(
    name="billing",
    system_prompt="결제·청구 문의를 처리합니다. 기술 문제는 tech 에이전트로 핸드오프하세요.",
)
tech_agent = Agent(
    name="tech_support",
    system_prompt="기술 지원을 담당합니다. 환불 요청은 billing 에이전트로 핸드오프하세요.",
)
triage_agent = Agent(
    name="triage",
    system_prompt="고객 문의를 분류해 적절한 전문 에이전트로 핸드오프합니다.",
    available_handoffs=[billing_agent, tech_agent],
)

# 연결 설정
billing_agent.handoffs = [tech_agent]
tech_agent.handoffs = [billing_agent]

result = triage_agent.run("지난 달 결제가 두 번 되었고 앱도 계속 오류가 나요.")

무한 루프 방지가 핵심입니다. A → B → A → B 순환은 Dynamic Handoff에서 가장 흔한 실패 모드입니다. max_handoffs 설정과 방문한 에이전트 기록으로 반드시 방어해야 합니다.


패턴 선택 결정 트리

태스크가 있을 때:

[단계가 순서대로 정해져 있는가?]
  YES → Pipeline
  NO  ↓

[독립적으로 병렬 처리 가능한 서브태스크인가?]
  YES → Fan-out / Fan-in
  NO  ↓

[정확도가 최우선이고 비용 2~4× 감수 가능한가?]
  YES → Debate / Maker-Checker
  NO  ↓

[어떤 전문가가 필요한지 런타임에만 알 수 있는가?]
  YES → Dynamic Handoff
  NO  ↓

[태스크를 오케스트레이터가 분해하고 전문가에게 위임?]
  YES → Supervisor (기본값)

[모두 해당 없음?]
  → 단일 에이전트 재검토

패턴별 비교 요약

패턴 API 비용 배수 레이턴시 복잡도 최적 상황

Supervisor 1~3× 중간 낮음 도메인 격리, 전문화 위임
Pipeline 높음 (순차) 최저 단계 순서 고정, 각 단계 의존성
Fan-out/Fan-in 낮음 (병렬) 중간 독립적 대용량 병렬 처리
Debate 3~5× 높음 중간 고위험 결정, 정확도 최우선
Dynamic Handoff 1~n× 예측 불가 높음 동적 라우팅, 전문가 불확실

✅ 결론

항목 핵심

2026 기본값 ✅ Supervisor — 5개 주요 프레임워크가 수렴
단일 에이전트 우선 ✅ 64% 태스크에서 단일이 멀티와 동급 이상
가장 흔한 실패 ❌ 무한 핸드오프 루프, 전체 트랜스크립트 전달
서브에이전트 황금 규칙 ✅ 요약만 반환, 독립 시스템 프롬프트, 구조화 브리프
비용 현실 ⚠️ Debate는 3~5×, Fan-out은 n× — 필요할 때만
하이브리드 ✅ Supervisor 안에 Pipeline, Fan-out 안에 Debate 중첩 가능

멀티에이전트는 복잡도에 대한 투자입니다. 단순한 문제를 멀티에이전트로 해결하려 하면 비용과 디버깅 난이도만 올라갑니다. 패턴을 알면 "더 많은 에이전트"가 아닌 "올바른 패턴"을 선택할 수 있게 됩니다.


https://cell-devlog.tistory.com/256

 

에이전트 코드 다 짤 필요 없습니다 — Managed Agents vs 직접 오케스트레이션 실전 비교

에이전트를 만들려면 LangGraph 그래프를 설계하고, 샌드박스를 프로비저닝하고, 에이전트 루프를 유지하고, 상태를 관리해야 한다 — 2026년 5월 18일까지는 맞는 말이었습니다. 5월 19일 Google I/O에

cell-devlog.tistory.com

 

반응형