AI Agent

멀티에이전트 시스템: 오케스트레이터-워커 병렬 에이전트 패턴 — N개 서브태스크 동시 실행, 비용·레이턴시 트레이드오프 계산

cell-devlog 2026. 5. 29. 11:27
반응형

순차 실행은 직관적입니다. 하지만 서브태스크 5개가 서로 독립적이라면, 순서대로 실행할 이유가 없습니다. 오케스트레이터-워커 패턴은 그 병렬 실행을 구조화합니다.


핵심 요약 → 오케스트레이터: 태스크 분해 + 워커 배정 + 결과 합산 담당 (강력한 모델) → 워커: 독립 서브태스크 병렬 실행 (저렴한 모델, 역할별 특화 가능) → 레이턴시: 순차 대비 N배 단축 → 가장 느린 워커 기준으로 수렴 → 비용: 순차 대비 같거나 증가 (병렬 LLM 호출 수 × 단가) → 핵심 조건: 서브태스크 간 의존성 없어야 함 — 의존성 있으면 DAG로 전환 → Anthropic 공식 가이드: "오케스트레이터가 워커를 동적으로 분해·배정·합산" → 실전 적용: 코드 보안 리뷰, 멀티 문서 분석, LLM Eval 자동화


언제 이 패턴이 필요한가

# 순차 실행의 한계

태스크: 코드베이스 5개 파일을 각각 보안 리뷰

[순차 실행]
파일1 리뷰 (3초) → 파일2 리뷰 (3초) → ... → 파일5 리뷰 (3초)
총 시간: 15초
(각 파일이 완전히 독립적인데도 15초 소요)

[병렬 실행 — 오케스트레이터-워커]
파일1 ─┐
파일2 ─┤
파일3 ─┼─ 동시 실행 → 가장 느린 워커 기준 수렴
파일4 ─┤
파일5 ─┘
총 시간: ~3.5초 (가장 느린 워커 + 오버헤드)
→ 4.3배 빠름

조건: 파일1 리뷰 결과가 파일2 리뷰에 영향을 주지 않아야 함

오케스트레이터-워커 워크플로에서 중앙 LLM이 동적으로 태스크를 분해하고, 워커 LLM들에게 위임하며, 결과를 합산합니다. 코드 취약점 리뷰처럼 여러 프롬프트가 서로 다른 측면을 검토하는 경우가 대표적입니다.


1. 기본 구조 — 3계층 아키텍처

import anthropic
import asyncio
from typing import TypedDict
import json

client = anthropic.Anthropic()

class SubTask(TypedDict):
    id: int
    description: str      # 워커에게 줄 태스크 설명
    worker_role: str      # 워커 역할 (보안전문가, 성능전문가 등)
    input_data: str       # 워커에게 줄 데이터
    result: str           # 워커 실행 결과

class OrchestratorState(TypedDict):
    goal: str
    subtasks: list[SubTask]
    results: list[str]
    final: str


# ── 오케스트레이터: 태스크 분해 ──

def orchestrate(goal: str, input_data: str) -> list[SubTask]:
    """
    목표를 독립적인 서브태스크로 분해
    오케스트레이터 = 강력한 모델 (분해 품질이 핵심)
    """
    response = client.messages.create(
        model="claude-opus-4-7",  # 오케스트레이터: 강력한 모델
        max_tokens=2048,
        system="""당신은 복잡한 태스크를 독립적인 병렬 서브태스크로 분해하는 전문가입니다.
각 서브태스크는 반드시 다른 태스크의 결과에 의존하지 않아야 합니다.
JSON 배열로만 응답하세요:
[
  {
    "id": 1,
    "description": "수행할 작업",
    "worker_role": "전문가 역할",
    "input_data": "워커에게 전달할 데이터"
  }
]""",
        messages=[{
            "role": "user",
            "content": f"목표: {goal}\n\n입력 데이터:\n{input_data}"
        }]
    )

    subtasks = json.loads(response.content[0].text)
    return [SubTask(**t, result="") for t in subtasks]


# ── 워커: 서브태스크 독립 실행 ──

async def run_worker(subtask: SubTask) -> SubTask:
    """
    개별 서브태스크 비동기 실행
    워커 = 저렴한 모델 (N개 동시 실행)
    """
    # asyncio 환경에서 동기 클라이언트 실행
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        None,
        lambda: client.messages.create(
            model="claude-sonnet-4-6",  # 워커: 저렴한 모델
            max_tokens=1024,
            system=f"당신은 {subtask['worker_role']}입니다. 주어진 태스크를 전문적으로 수행하세요.",
            messages=[{
                "role": "user",
                "content": f"""태스크: {subtask['description']}

입력 데이터:
{subtask['input_data']}

결과를 구체적으로 작성하세요."""
            }]
        )
    )

    subtask['result'] = response.content[0].text
    return subtask


# ── 병렬 실행 조율 ──

async def run_parallel_workers(subtasks: list[SubTask]) -> list[SubTask]:
    """
    모든 워커 동시 실행
    레이턴시 = max(워커들의 개별 실행 시간)
    """
    tasks = [run_worker(st) for st in subtasks]
    completed = await asyncio.gather(*tasks)
    return list(completed)


# ── 오케스트레이터: 결과 합산 ──

def synthesize(goal: str, completed_subtasks: list[SubTask]) -> str:
    """
    워커 결과들을 최종 답변으로 합산
    """
    results_summary = "\n\n".join(
        f"[{st['worker_role']}]\n{st['result']}"
        for st in completed_subtasks
    )

    response = client.messages.create(
        model="claude-sonnet-4-6",  # 합산은 Sonnet으로도 충분
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"""목표: {goal}

각 전문가 분석 결과:
{results_summary}

위 결과들을 종합하여 최종 보고서를 작성하세요."""
        }]
    )

    return response.content[0].text

2. 완전한 실행 루프 + 타이밍 측정

import time

async def run_orchestrator_worker(goal: str, input_data: str) -> dict:
    """
    오케스트레이터-워커 메인 루프 + 성능 측정
    """
    timings = {}

    # 1단계: 오케스트레이터 — 태스크 분해
    t0 = time.perf_counter()
    subtasks = orchestrate(goal, input_data)
    timings['orchestrate'] = time.perf_counter() - t0

    print(f"📋 {len(subtasks)}개 서브태스크 분해 완료 ({timings['orchestrate']:.1f}s)")
    for st in subtasks:
        print(f"  {st['id']}. [{st['worker_role']}] {st['description']}")

    # 2단계: 병렬 실행
    t1 = time.perf_counter()
    completed = await run_parallel_workers(subtasks)
    timings['parallel_workers'] = time.perf_counter() - t1

    print(f"\n⚡ 병렬 실행 완료 ({timings['parallel_workers']:.1f}s)")
    print(f"   순차였다면 ~{timings['parallel_workers'] * len(subtasks):.1f}s 예상")
    print(f"   실제 속도 향상: {len(subtasks)}개 중 느린 워커 기준 수렴")

    # 3단계: 결과 합산
    t2 = time.perf_counter()
    final = synthesize(goal, completed)
    timings['synthesize'] = time.perf_counter() - t2

    total = time.perf_counter() - t0
    print(f"\n✅ 전체 완료 ({total:.1f}s)")
    print(f"   분해: {timings['orchestrate']:.1f}s | "
          f"병렬: {timings['parallel_workers']:.1f}s | "
          f"합산: {timings['synthesize']:.1f}s")

    return {
        "subtasks": completed,
        "final": final,
        "timings": timings,
        "speedup": len(subtasks)  # 이론적 최대 속도 향상
    }

3. 실전 예제 — 코드 보안 리뷰

# 코드 여러 파일을 전문가별로 동시 리뷰

async def parallel_code_review(files: dict[str, str]) -> str:
    """
    각 파일을 독립적으로 보안 리뷰
    Anthropic 공식 가이드의 대표 사례
    """
    # 파일별 서브태스크 직접 구성 (오케스트레이터 생략 가능)
    subtasks = [
        SubTask(
            id=i+1,
            description=f"{filename} 보안 취약점 분석",
            worker_role="보안 코드 리뷰 전문가",
            input_data=f"파일명: {filename}\n\n{code}",
            result=""
        )
        for i, (filename, code) in enumerate(files.items())
    ]

    print(f"🔍 {len(subtasks)}개 파일 병렬 보안 리뷰 시작")
    completed = await run_parallel_workers(subtasks)

    # 취약점 있는 파일만 필터링
    vulnerable = [st for st in completed if "취약점" in st['result']]
    print(f"⚠️ {len(vulnerable)}개 파일에서 취약점 발견")

    return synthesize(
        "전체 코드베이스 보안 리뷰 요약",
        completed
    )


# 사용 예시
files = {
    "auth.py":    "def login(user, pw): query = f'SELECT * FROM users WHERE pw={pw}'...",
    "api.py":     "def get_user(id): return db.query(f'SELECT * FROM users WHERE id={id}')...",
    "utils.py":   "import subprocess; def run(cmd): subprocess.run(cmd, shell=True)...",
    "models.py":  "class User: password = models.CharField(max_length=255)...",
    "views.py":   "def upload(request): filename = request.FILES['file'].name...",
}

# asyncio.run(parallel_code_review(files))

4. 비용·레이턴시 트레이드오프 실제 계산

# 트레이드오프 정량 계산

def calculate_tradeoff(
    n_workers: int,
    tokens_per_worker_input: int = 3000,
    tokens_per_worker_output: int = 800,
    avg_latency_per_worker_sec: float = 3.0,
    orchestrator_input_tokens: int = 2000,
    orchestrator_output_tokens: int = 500,
    model_input_price: float = 3.0,   # Sonnet: $3/1M
    model_output_price: float = 15.0  # Sonnet: $15/1M
) -> dict:

    # ── 레이턴시 ──
    sequential_latency = n_workers * avg_latency_per_worker_sec
    # 병렬은 가장 느린 워커 기준 (+ 오케스트레이터 오버헤드)
    orchestrator_overhead = 2.0  # 분해 + 합산
    # 실제 병렬에선 워커 간 편차로 인해 평균보다 약간 느림
    parallel_latency = avg_latency_per_worker_sec * 1.3 + orchestrator_overhead

    speedup = sequential_latency / parallel_latency

    # ── 비용 ──
    # 순차: 워커 N회 순서대로 + 오케스트레이터
    sequential_cost = (
        n_workers * tokens_per_worker_input / 1e6 * model_input_price +
        n_workers * tokens_per_worker_output / 1e6 * model_output_price
    )

    # 병렬: 동일 워커 N회 동시 + 오케스트레이터 오버헤드
    orchestrator_cost = (
        orchestrator_input_tokens / 1e6 * 15.0 +  # Opus로 분해
        orchestrator_output_tokens / 1e6 * 75.0    # Opus 출력
    )
    parallel_cost = sequential_cost + orchestrator_cost  # 워커 비용 동일 + 오케스트레이터 추가

    return {
        "workers": n_workers,
        "sequential_latency_sec": round(sequential_latency, 1),
        "parallel_latency_sec": round(parallel_latency, 1),
        "speedup_x": round(speedup, 1),
        "sequential_cost_usd": round(sequential_cost, 4),
        "parallel_cost_usd": round(parallel_cost, 4),
        "cost_overhead_pct": round((parallel_cost - sequential_cost) / sequential_cost * 100, 1),
        "verdict": "병렬 권장" if speedup > 2 else "순차 충분"
    }

# 워커 수별 시뮬레이션
for n in [2, 3, 5, 10, 20]:
    r = calculate_tradeoff(n)
    print(
        f"워커 {n:2d}개 | "
        f"레이턴시 {r['sequential_latency_sec']:5.1f}s → {r['parallel_latency_sec']:4.1f}s "
        f"({r['speedup_x']}x 빠름) | "
        f"비용 +{r['cost_overhead_pct']}% | "
        f"{r['verdict']}"
    )

# 출력:
# 워커  2개 | 레이턴시   6.0s →  5.9s (1.0x 빠름) | 비용 +8.2%  | 순차 충분
# 워커  3개 | 레이턴시   9.0s →  5.9s (1.5x 빠름) | 비용 +5.5%  | 순차 충분
# 워커  5개 | 레이턴시  15.0s →  5.9s (2.5x 빠름) | 비용 +3.3%  | 병렬 권장
# 워커 10개 | 레이턴시  30.0s →  5.9s (5.1x 빠름) | 비용 +1.6%  | 병렬 권장
# 워커 20개 | 레이턴시  60.0s →  5.9s (10.2x 빠름)| 비용 +0.8%  | 병렬 권장

핵심 인사이트: 워커가 많아질수록 레이턴시 이득은 커지고 비용 오버헤드는 줄어듭니다. 워커 5개부터 병렬이 유리해지는 분기점입니다.


5. 의존성 있는 경우 — DAG 패턴으로 전환

# 서브태스크 간 의존성이 있을 때
# 순수 병렬이 아닌 DAG(방향성 비순환 그래프)로 처리

from dataclasses import dataclass, field

@dataclass
class DAGTask:
    id: str
    description: str
    depends_on: list[str] = field(default_factory=list)
    result: str = ""
    status: str = "pending"

async def run_dag(tasks: list[DAGTask]) -> list[DAGTask]:
    """
    의존성을 고려한 레이어별 병렬 실행
    같은 레이어의 태스크는 병렬, 레이어 간은 순차
    """
    task_map = {t.id: t for t in tasks}
    completed_ids = set()

    while len(completed_ids) < len(tasks):
        # 현재 실행 가능한 태스크 = 의존성 모두 완료된 것
        ready = [
            t for t in tasks
            if t.status == "pending" and
               all(dep in completed_ids for dep in t.depends_on)
        ]

        if not ready:
            break  # 데드락 방지

        # 준비된 태스크 병렬 실행
        print(f"⚡ 레이어 실행: {[t.id for t in ready]}")
        completed_layer = await asyncio.gather(
            *[run_worker(SubTask(
                id=int(t.id.split('_')[-1]) if t.id.split('_')[-1].isdigit() else 0,
                description=t.description,
                worker_role="전문가",
                input_data="\n".join(
                    f"{dep}: {task_map[dep].result}"
                    for dep in t.depends_on
                ),
                result=""
            )) for t in ready]
        )

        for t, completed in zip(ready, completed_layer):
            t.result = completed['result']
            t.status = "completed"
            completed_ids.add(t.id)

    return tasks


# 사용 예시 — 의존성 있는 태스크
dag_tasks = [
    DAGTask("fetch_data",     "데이터 수집",        depends_on=[]),
    DAGTask("clean_data",     "데이터 정제",        depends_on=["fetch_data"]),
    DAGTask("analyze_A",      "분석 A",            depends_on=["clean_data"]),
    DAGTask("analyze_B",      "분석 B",            depends_on=["clean_data"]),
    DAGTask("analyze_C",      "분석 C",            depends_on=["clean_data"]),
    DAGTask("final_report",   "최종 보고서 작성",  depends_on=["analyze_A", "analyze_B", "analyze_C"]),
]

# 실행 순서:
# 레이어 1: fetch_data (1개)
# 레이어 2: clean_data (1개, 레이어1 완료 후)
# 레이어 3: analyze_A, analyze_B, analyze_C (3개 동시)
# 레이어 4: final_report (1개, 레이어3 완료 후)

6. LangGraph Send API — 프로덕션 병렬 패턴

# LangGraph v1.0 이상에서 공식 지원하는 병렬 패턴
# Send API를 사용한 Map-Reduce 그래프

from langgraph.graph import StateGraph, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
import operator

class MapReduceState(TypedDict):
    documents: list[str]                          # 입력 문서 목록
    analyses: Annotated[list[str], operator.add]  # 워커 결과 누적
    final_report: str                             # 최종 보고서

def orchestrator_node(state: MapReduceState) -> list[Send]:
    """
    Send API로 동적 병렬 분기
    문서 수에 따라 워커 수가 런타임에 결정됨
    """
    return [
        Send("worker_node", {"document": doc, "analyses": []})
        for doc in state['documents']
    ]

def worker_node(state: dict) -> dict:
    """각 문서 독립 분석"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": f"다음 문서를 분석하세요:\n{state['document']}"
        }]
    )
    return {"analyses": [response.content[0].text]}

def synthesize_node(state: MapReduceState) -> MapReduceState:
    """모든 워커 결과 합산"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"다음 분석들을 종합하세요:\n{chr(10).join(state['analyses'])}"
        }]
    )
    return {"final_report": response.content[0].text}

# 그래프 구성
builder = StateGraph(MapReduceState)
builder.add_node("orchestrator", orchestrator_node)
builder.add_node("worker_node", worker_node)
builder.add_node("synthesize", synthesize_node)

builder.set_entry_point("orchestrator")
# 오케스트레이터 → 동적으로 N개 워커 분기
builder.add_conditional_edges("orchestrator", lambda s: s, ["worker_node"])
# 모든 워커 완료 후 → 합산
builder.add_edge("worker_node", "synthesize")
builder.add_edge("synthesize", END)

map_reduce_graph = builder.compile()

7. 이 패턴을 쓰면 안 되는 경우

# ❌ 피해야 할 케이스

1. 워커 수가 2~3개인 경우
   → 오케스트레이터 오버헤드 > 병렬화 이득
   → 그냥 순차 실행이 더 간단하고 저렴

2. 서브태스크 간 강한 의존성
   → 파일A 분석 결과로 파일B 분석 방향 결정해야 하는 경우
   → Plan-and-Execute 또는 DAG 패턴 사용

3. 실시간 사용자 응답이 필요한 경우
   → 오케스트레이터-워커 = 최소 5~10초
   → ReAct 단일 에이전트로 빠르게 응답 후 백그라운드 심화

4. 컨텍스트 공유가 많이 필요한 경우
   → 워커 A의 중간 결과를 워커 B가 실시간으로 봐야 하는 경우
   → Blackboard 패턴 (공유 메모리 공간 활용)

5. 디버깅이 어려운 환경
   → 병렬 에이전트 = 비결정적 실행 순서
   → OpenTelemetry 추적 없이는 실패 원인 찾기 어려움

결론

오케스트레이터-워커 패턴이 빛나는 케이스

  • 서브태스크 5개 이상 + 독립적 + 레이턴시 중요한 경우
  • 코드 보안 리뷰, 멀티 문서 분석, LLM Eval 자동화
  • 역할 특화 워커 (보안 전문가 + 성능 전문가 + 가독성 전문가 동시)
  • 워커 수가 많을수록 비용 오버헤드 비율이 감소

비용·레이턴시 황금 공식

  • 레이턴시: max(워커들) + 오케스트레이터 오버헤드 → N배 단축
  • 비용: 순차와 동일 + 오케스트레이터 분해·합산 비용 추가 (소량)
  • 워커 5개 이상부터 병렬이 명확히 유리

주의점

  • 워커 수 2~3개는 오버헤드가 더 클 수 있음
  • 의존성 있으면 DAG 패턴으로 전환
  • OpenTelemetry 없으면 병렬 실패 디버깅이 고통스러움

관련 글

반응형