반응형
순차 실행은 직관적입니다. 하지만 서브태스크 5개가 서로 독립적이라면, 순서대로 실행할 이유가 없습니다. 오케스트레이터-워커 패턴은 그 병렬 실행을 구조화합니다.
핵심 요약 → 오케스트레이터: 태스크 분해 + 워커 배정 + 결과 합산 담당 (강력한 모델) → 워커: 독립 서브태스크 병렬 실행 (저렴한 모델, 역할별 특화 가능) → 레이턴시: 순차 대비 N배 단축 → 가장 느린 워커 기준으로 수렴 → 비용: 순차 대비 같거나 증가 (병렬 LLM 호출 수 × 단가) → 핵심 조건: 서브태스크 간 의존성 없어야 함 — 의존성 있으면 DAG로 전환 → Anthropic 공식 가이드: "오케스트레이터가 워커를 동적으로 분해·배정·합산" → 실전 적용: 코드 보안 리뷰, 멀티 문서 분석, LLM Eval 자동화
언제 이 패턴이 필요한가
# 순차 실행의 한계
태스크: 코드베이스 5개 파일을 각각 보안 리뷰
[순차 실행]
파일1 리뷰 (3초) → 파일2 리뷰 (3초) → ... → 파일5 리뷰 (3초)
총 시간: 15초
(각 파일이 완전히 독립적인데도 15초 소요)
[병렬 실행 — 오케스트레이터-워커]
파일1 ─┐
파일2 ─┤
파일3 ─┼─ 동시 실행 → 가장 느린 워커 기준 수렴
파일4 ─┤
파일5 ─┘
총 시간: ~3.5초 (가장 느린 워커 + 오버헤드)
→ 4.3배 빠름
조건: 파일1 리뷰 결과가 파일2 리뷰에 영향을 주지 않아야 함
오케스트레이터-워커 워크플로에서 중앙 LLM이 동적으로 태스크를 분해하고, 워커 LLM들에게 위임하며, 결과를 합산합니다. 코드 취약점 리뷰처럼 여러 프롬프트가 서로 다른 측면을 검토하는 경우가 대표적입니다.
1. 기본 구조 — 3계층 아키텍처
import anthropic
import asyncio
from typing import TypedDict
import json
client = anthropic.Anthropic()
class SubTask(TypedDict):
id: int
description: str # 워커에게 줄 태스크 설명
worker_role: str # 워커 역할 (보안전문가, 성능전문가 등)
input_data: str # 워커에게 줄 데이터
result: str # 워커 실행 결과
class OrchestratorState(TypedDict):
goal: str
subtasks: list[SubTask]
results: list[str]
final: str
# ── 오케스트레이터: 태스크 분해 ──
def orchestrate(goal: str, input_data: str) -> list[SubTask]:
"""
목표를 독립적인 서브태스크로 분해
오케스트레이터 = 강력한 모델 (분해 품질이 핵심)
"""
response = client.messages.create(
model="claude-opus-4-7", # 오케스트레이터: 강력한 모델
max_tokens=2048,
system="""당신은 복잡한 태스크를 독립적인 병렬 서브태스크로 분해하는 전문가입니다.
각 서브태스크는 반드시 다른 태스크의 결과에 의존하지 않아야 합니다.
JSON 배열로만 응답하세요:
[
{
"id": 1,
"description": "수행할 작업",
"worker_role": "전문가 역할",
"input_data": "워커에게 전달할 데이터"
}
]""",
messages=[{
"role": "user",
"content": f"목표: {goal}\n\n입력 데이터:\n{input_data}"
}]
)
subtasks = json.loads(response.content[0].text)
return [SubTask(**t, result="") for t in subtasks]
# ── 워커: 서브태스크 독립 실행 ──
async def run_worker(subtask: SubTask) -> SubTask:
"""
개별 서브태스크 비동기 실행
워커 = 저렴한 모델 (N개 동시 실행)
"""
# asyncio 환경에서 동기 클라이언트 실행
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: client.messages.create(
model="claude-sonnet-4-6", # 워커: 저렴한 모델
max_tokens=1024,
system=f"당신은 {subtask['worker_role']}입니다. 주어진 태스크를 전문적으로 수행하세요.",
messages=[{
"role": "user",
"content": f"""태스크: {subtask['description']}
입력 데이터:
{subtask['input_data']}
결과를 구체적으로 작성하세요."""
}]
)
)
subtask['result'] = response.content[0].text
return subtask
# ── 병렬 실행 조율 ──
async def run_parallel_workers(subtasks: list[SubTask]) -> list[SubTask]:
"""
모든 워커 동시 실행
레이턴시 = max(워커들의 개별 실행 시간)
"""
tasks = [run_worker(st) for st in subtasks]
completed = await asyncio.gather(*tasks)
return list(completed)
# ── 오케스트레이터: 결과 합산 ──
def synthesize(goal: str, completed_subtasks: list[SubTask]) -> str:
"""
워커 결과들을 최종 답변으로 합산
"""
results_summary = "\n\n".join(
f"[{st['worker_role']}]\n{st['result']}"
for st in completed_subtasks
)
response = client.messages.create(
model="claude-sonnet-4-6", # 합산은 Sonnet으로도 충분
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""목표: {goal}
각 전문가 분석 결과:
{results_summary}
위 결과들을 종합하여 최종 보고서를 작성하세요."""
}]
)
return response.content[0].text
2. 완전한 실행 루프 + 타이밍 측정
import time
async def run_orchestrator_worker(goal: str, input_data: str) -> dict:
"""
오케스트레이터-워커 메인 루프 + 성능 측정
"""
timings = {}
# 1단계: 오케스트레이터 — 태스크 분해
t0 = time.perf_counter()
subtasks = orchestrate(goal, input_data)
timings['orchestrate'] = time.perf_counter() - t0
print(f"📋 {len(subtasks)}개 서브태스크 분해 완료 ({timings['orchestrate']:.1f}s)")
for st in subtasks:
print(f" {st['id']}. [{st['worker_role']}] {st['description']}")
# 2단계: 병렬 실행
t1 = time.perf_counter()
completed = await run_parallel_workers(subtasks)
timings['parallel_workers'] = time.perf_counter() - t1
print(f"\n⚡ 병렬 실행 완료 ({timings['parallel_workers']:.1f}s)")
print(f" 순차였다면 ~{timings['parallel_workers'] * len(subtasks):.1f}s 예상")
print(f" 실제 속도 향상: {len(subtasks)}개 중 느린 워커 기준 수렴")
# 3단계: 결과 합산
t2 = time.perf_counter()
final = synthesize(goal, completed)
timings['synthesize'] = time.perf_counter() - t2
total = time.perf_counter() - t0
print(f"\n✅ 전체 완료 ({total:.1f}s)")
print(f" 분해: {timings['orchestrate']:.1f}s | "
f"병렬: {timings['parallel_workers']:.1f}s | "
f"합산: {timings['synthesize']:.1f}s")
return {
"subtasks": completed,
"final": final,
"timings": timings,
"speedup": len(subtasks) # 이론적 최대 속도 향상
}
3. 실전 예제 — 코드 보안 리뷰
# 코드 여러 파일을 전문가별로 동시 리뷰
async def parallel_code_review(files: dict[str, str]) -> str:
"""
각 파일을 독립적으로 보안 리뷰
Anthropic 공식 가이드의 대표 사례
"""
# 파일별 서브태스크 직접 구성 (오케스트레이터 생략 가능)
subtasks = [
SubTask(
id=i+1,
description=f"{filename} 보안 취약점 분석",
worker_role="보안 코드 리뷰 전문가",
input_data=f"파일명: {filename}\n\n{code}",
result=""
)
for i, (filename, code) in enumerate(files.items())
]
print(f"🔍 {len(subtasks)}개 파일 병렬 보안 리뷰 시작")
completed = await run_parallel_workers(subtasks)
# 취약점 있는 파일만 필터링
vulnerable = [st for st in completed if "취약점" in st['result']]
print(f"⚠️ {len(vulnerable)}개 파일에서 취약점 발견")
return synthesize(
"전체 코드베이스 보안 리뷰 요약",
completed
)
# 사용 예시
files = {
"auth.py": "def login(user, pw): query = f'SELECT * FROM users WHERE pw={pw}'...",
"api.py": "def get_user(id): return db.query(f'SELECT * FROM users WHERE id={id}')...",
"utils.py": "import subprocess; def run(cmd): subprocess.run(cmd, shell=True)...",
"models.py": "class User: password = models.CharField(max_length=255)...",
"views.py": "def upload(request): filename = request.FILES['file'].name...",
}
# asyncio.run(parallel_code_review(files))
4. 비용·레이턴시 트레이드오프 실제 계산
# 트레이드오프 정량 계산
def calculate_tradeoff(
n_workers: int,
tokens_per_worker_input: int = 3000,
tokens_per_worker_output: int = 800,
avg_latency_per_worker_sec: float = 3.0,
orchestrator_input_tokens: int = 2000,
orchestrator_output_tokens: int = 500,
model_input_price: float = 3.0, # Sonnet: $3/1M
model_output_price: float = 15.0 # Sonnet: $15/1M
) -> dict:
# ── 레이턴시 ──
sequential_latency = n_workers * avg_latency_per_worker_sec
# 병렬은 가장 느린 워커 기준 (+ 오케스트레이터 오버헤드)
orchestrator_overhead = 2.0 # 분해 + 합산
# 실제 병렬에선 워커 간 편차로 인해 평균보다 약간 느림
parallel_latency = avg_latency_per_worker_sec * 1.3 + orchestrator_overhead
speedup = sequential_latency / parallel_latency
# ── 비용 ──
# 순차: 워커 N회 순서대로 + 오케스트레이터
sequential_cost = (
n_workers * tokens_per_worker_input / 1e6 * model_input_price +
n_workers * tokens_per_worker_output / 1e6 * model_output_price
)
# 병렬: 동일 워커 N회 동시 + 오케스트레이터 오버헤드
orchestrator_cost = (
orchestrator_input_tokens / 1e6 * 15.0 + # Opus로 분해
orchestrator_output_tokens / 1e6 * 75.0 # Opus 출력
)
parallel_cost = sequential_cost + orchestrator_cost # 워커 비용 동일 + 오케스트레이터 추가
return {
"workers": n_workers,
"sequential_latency_sec": round(sequential_latency, 1),
"parallel_latency_sec": round(parallel_latency, 1),
"speedup_x": round(speedup, 1),
"sequential_cost_usd": round(sequential_cost, 4),
"parallel_cost_usd": round(parallel_cost, 4),
"cost_overhead_pct": round((parallel_cost - sequential_cost) / sequential_cost * 100, 1),
"verdict": "병렬 권장" if speedup > 2 else "순차 충분"
}
# 워커 수별 시뮬레이션
for n in [2, 3, 5, 10, 20]:
r = calculate_tradeoff(n)
print(
f"워커 {n:2d}개 | "
f"레이턴시 {r['sequential_latency_sec']:5.1f}s → {r['parallel_latency_sec']:4.1f}s "
f"({r['speedup_x']}x 빠름) | "
f"비용 +{r['cost_overhead_pct']}% | "
f"{r['verdict']}"
)
# 출력:
# 워커 2개 | 레이턴시 6.0s → 5.9s (1.0x 빠름) | 비용 +8.2% | 순차 충분
# 워커 3개 | 레이턴시 9.0s → 5.9s (1.5x 빠름) | 비용 +5.5% | 순차 충분
# 워커 5개 | 레이턴시 15.0s → 5.9s (2.5x 빠름) | 비용 +3.3% | 병렬 권장
# 워커 10개 | 레이턴시 30.0s → 5.9s (5.1x 빠름) | 비용 +1.6% | 병렬 권장
# 워커 20개 | 레이턴시 60.0s → 5.9s (10.2x 빠름)| 비용 +0.8% | 병렬 권장
핵심 인사이트: 워커가 많아질수록 레이턴시 이득은 커지고 비용 오버헤드는 줄어듭니다. 워커 5개부터 병렬이 유리해지는 분기점입니다.
5. 의존성 있는 경우 — DAG 패턴으로 전환
# 서브태스크 간 의존성이 있을 때
# 순수 병렬이 아닌 DAG(방향성 비순환 그래프)로 처리
from dataclasses import dataclass, field
@dataclass
class DAGTask:
id: str
description: str
depends_on: list[str] = field(default_factory=list)
result: str = ""
status: str = "pending"
async def run_dag(tasks: list[DAGTask]) -> list[DAGTask]:
"""
의존성을 고려한 레이어별 병렬 실행
같은 레이어의 태스크는 병렬, 레이어 간은 순차
"""
task_map = {t.id: t for t in tasks}
completed_ids = set()
while len(completed_ids) < len(tasks):
# 현재 실행 가능한 태스크 = 의존성 모두 완료된 것
ready = [
t for t in tasks
if t.status == "pending" and
all(dep in completed_ids for dep in t.depends_on)
]
if not ready:
break # 데드락 방지
# 준비된 태스크 병렬 실행
print(f"⚡ 레이어 실행: {[t.id for t in ready]}")
completed_layer = await asyncio.gather(
*[run_worker(SubTask(
id=int(t.id.split('_')[-1]) if t.id.split('_')[-1].isdigit() else 0,
description=t.description,
worker_role="전문가",
input_data="\n".join(
f"{dep}: {task_map[dep].result}"
for dep in t.depends_on
),
result=""
)) for t in ready]
)
for t, completed in zip(ready, completed_layer):
t.result = completed['result']
t.status = "completed"
completed_ids.add(t.id)
return tasks
# 사용 예시 — 의존성 있는 태스크
dag_tasks = [
DAGTask("fetch_data", "데이터 수집", depends_on=[]),
DAGTask("clean_data", "데이터 정제", depends_on=["fetch_data"]),
DAGTask("analyze_A", "분석 A", depends_on=["clean_data"]),
DAGTask("analyze_B", "분석 B", depends_on=["clean_data"]),
DAGTask("analyze_C", "분석 C", depends_on=["clean_data"]),
DAGTask("final_report", "최종 보고서 작성", depends_on=["analyze_A", "analyze_B", "analyze_C"]),
]
# 실행 순서:
# 레이어 1: fetch_data (1개)
# 레이어 2: clean_data (1개, 레이어1 완료 후)
# 레이어 3: analyze_A, analyze_B, analyze_C (3개 동시)
# 레이어 4: final_report (1개, 레이어3 완료 후)
6. LangGraph Send API — 프로덕션 병렬 패턴
# LangGraph v1.0 이상에서 공식 지원하는 병렬 패턴
# Send API를 사용한 Map-Reduce 그래프
from langgraph.graph import StateGraph, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
import operator
class MapReduceState(TypedDict):
documents: list[str] # 입력 문서 목록
analyses: Annotated[list[str], operator.add] # 워커 결과 누적
final_report: str # 최종 보고서
def orchestrator_node(state: MapReduceState) -> list[Send]:
"""
Send API로 동적 병렬 분기
문서 수에 따라 워커 수가 런타임에 결정됨
"""
return [
Send("worker_node", {"document": doc, "analyses": []})
for doc in state['documents']
]
def worker_node(state: dict) -> dict:
"""각 문서 독립 분석"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": f"다음 문서를 분석하세요:\n{state['document']}"
}]
)
return {"analyses": [response.content[0].text]}
def synthesize_node(state: MapReduceState) -> MapReduceState:
"""모든 워커 결과 합산"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"다음 분석들을 종합하세요:\n{chr(10).join(state['analyses'])}"
}]
)
return {"final_report": response.content[0].text}
# 그래프 구성
builder = StateGraph(MapReduceState)
builder.add_node("orchestrator", orchestrator_node)
builder.add_node("worker_node", worker_node)
builder.add_node("synthesize", synthesize_node)
builder.set_entry_point("orchestrator")
# 오케스트레이터 → 동적으로 N개 워커 분기
builder.add_conditional_edges("orchestrator", lambda s: s, ["worker_node"])
# 모든 워커 완료 후 → 합산
builder.add_edge("worker_node", "synthesize")
builder.add_edge("synthesize", END)
map_reduce_graph = builder.compile()
7. 이 패턴을 쓰면 안 되는 경우
# ❌ 피해야 할 케이스
1. 워커 수가 2~3개인 경우
→ 오케스트레이터 오버헤드 > 병렬화 이득
→ 그냥 순차 실행이 더 간단하고 저렴
2. 서브태스크 간 강한 의존성
→ 파일A 분석 결과로 파일B 분석 방향 결정해야 하는 경우
→ Plan-and-Execute 또는 DAG 패턴 사용
3. 실시간 사용자 응답이 필요한 경우
→ 오케스트레이터-워커 = 최소 5~10초
→ ReAct 단일 에이전트로 빠르게 응답 후 백그라운드 심화
4. 컨텍스트 공유가 많이 필요한 경우
→ 워커 A의 중간 결과를 워커 B가 실시간으로 봐야 하는 경우
→ Blackboard 패턴 (공유 메모리 공간 활용)
5. 디버깅이 어려운 환경
→ 병렬 에이전트 = 비결정적 실행 순서
→ OpenTelemetry 추적 없이는 실패 원인 찾기 어려움
결론
✅ 오케스트레이터-워커 패턴이 빛나는 케이스
- 서브태스크 5개 이상 + 독립적 + 레이턴시 중요한 경우
- 코드 보안 리뷰, 멀티 문서 분석, LLM Eval 자동화
- 역할 특화 워커 (보안 전문가 + 성능 전문가 + 가독성 전문가 동시)
- 워커 수가 많을수록 비용 오버헤드 비율이 감소
✅ 비용·레이턴시 황금 공식
- 레이턴시: max(워커들) + 오케스트레이터 오버헤드 → N배 단축
- 비용: 순차와 동일 + 오케스트레이터 분해·합산 비용 추가 (소량)
- 워커 5개 이상부터 병렬이 명확히 유리
❌ 주의점
- 워커 수 2~3개는 오버헤드가 더 클 수 있음
- 의존성 있으면 DAG 패턴으로 전환
- OpenTelemetry 없으면 병렬 실패 디버깅이 고통스러움
관련 글
반응형
'AI Agent' 카테고리의 다른 글
| OpenTelemetry로 LLM 에이전트 추적 — 스팬 계측, 토큰 비용 추적, 프로덕션 디버깅 (0) | 2026.05.29 |
|---|---|
| Instructor 라이브러리로 구조화 출력 실전 2026 — LLM에서 신뢰할 수 있는 JSON을 뽑는 법 (0) | 2026.05.29 |
| Plan-and-Execute 에이전트 패턴 — 계획과 실행을 분리하면 비용이 절반이 된다 (0) | 2026.05.29 |
| AI 에이전트 배포 의사결정 매트릭스 2026 — SaaS·자체호스팅·하이브리드, 어떤 것을 선택해야 하나 (0) | 2026.05.28 |
| Grok 4.20 Multi-Agent 완전 분석 — 4개 에이전트가 서로 논쟁하고 답을 내는 모델 (0) | 2026.05.28 |