AI Agent

AI 에이전트 Durable Execution 실전 1편 — 에이전트가 죽어도 이어지는 워크플로우 설계

cell-devlog 2026. 5. 21. 16:17
반응형

LLM 20번 호출하다 17번째에서 서버 죽으면? 처음부터 다시? 그 돈 다 날리는 거 맞습니다.

핵심 요약
→ Temporal Replay 2026 (5월): Netflix·NVIDIA·Stripe·Cursor 모두 사용 중, 고객 3,000+
→ 핵심 문제: 에이전트 47단계 중 실패 → 1단계부터 재시작 → 토큰 비용 폭탄
→ Durable Execution: 각 단계를 Event History에 저장 → 실패 지점부터 자동 재개
→ 핵심 규칙: Workflow = 결정론적 오케스트레이터 / Activity = 비결정론적 실제 작업
→ LLM 호출·툴 실행·API 요청 → 전부 Activity에 넣어야 함
→ Workflow 안에서 random, datetime.now(), 직접 API 호출 금지
→ Activity는 자동 재시도 — 횟수·간격·타임아웃 세밀하게 제어 가능
→ Replay 2026 신규: Serverless Workers(Lambda 배포), Workflow Streams(LLM 스트리밍)

 


실전 1 — 왜 일반 에이전트 코드가 프로덕션에서 죽나

# ❌ 일반 에이전트 코드 — 프로덕션에서 이렇게 짜면 안 됨
import anthropic

client = anthropic.Anthropic()

def research_agent(topic: str) -> str:
    # 1단계: 검색 쿼리 생성 (LLM 호출 1)
    queries = generate_search_queries(topic)   # 완료, 비용 발생
    
    # 2단계: 웹 검색 실행
    results = search_web(queries)              # 완료
    
    # 3단계: 내용 분석 (LLM 호출 2~5)
    analysis = analyze_results(results)        # 완료, 비용 발생
    
    # 4단계: 초안 작성 (LLM 호출 6)
    draft = write_draft(analysis)             # 완료, 비용 발생
    
    # 5단계: 검토 요청 이메일 전송
    send_email(draft)                          # 💥 SMTP 서버 타임아웃
    
    # 6단계: DB 저장
    save_to_db(draft)                          # ❌ 실행 안 됨

# 결과:
# - LLM 6번 호출 비용: $0.18 날아감
# - 이메일 안 감, DB 저장 안 됨
# - 재실행하면 1단계부터 다시 → 또 $0.18
# - 에이전트 47단계라면? → 전체 비용 × 재시도 횟수
# 왜 단순 try/except로 못 막나
def research_agent_with_retry(topic: str) -> str:
    # 이것도 불충분한 이유:
    
    # 문제 1: 어디까지 완료됐는지 추적 코드를 직접 짜야 함
    # 문제 2: 워커 프로세스 자체가 죽으면 (OOM, 배포) → 상태 사라짐
    # 문제 3: 분산 환경에서 여러 워커가 같은 작업 중복 실행 가능
    # 문제 4: 재시도 로직이 비즈니스 로직을 오염시킴
    
    completed_steps = load_checkpoint()  # 이 체크포인트 코드 자체가 복잡
    
    for step in steps:
        if step in completed_steps:
            continue
        try:
            execute(step)
            save_checkpoint(step)       # 체크포인트 저장 자체도 실패할 수 있음
        except Exception as e:
            # 지수 백오프, 재시도 횟수... 이걸 다 직접?
            ...
개념 정리
→ Durable Execution이 해결하는 핵심: 체크포인트·재시도·상태관리를 인프라가 담당
→ 개발자는 비즈니스 로직만 — "단계 5 완료했나?" 추적 코드 필요 없음
→ 워커 프로세스 죽어도 → 다른 워커가 Event History 읽고 이어서 실행
→ Temporal = "Workflow 상태를 DB에 자동 저장하는 런타임"

실전 2 — Temporal 핵심 구조: Workflow vs Activity

Temporal 아키텍처

┌─────────────────────────────────────┐
│           Temporal Server           │
│  ┌─────────────────────────────┐   │
│  │       Event History         │   │  ← 모든 단계 영구 저장
│  │  [Step1: ✅] [Step2: ✅]    │   │
│  │  [Step3: ✅] [Step4: 실행중]│   │
│  └─────────────────────────────┘   │
└─────────────────────────────────────┘
           ↑ 폴링           ↓ 결과 보고
┌─────────────────────────────────────┐
│             Worker                  │
│  ┌──────────────┐ ┌──────────────┐ │
│  │   Workflow   │ │   Activity   │ │
│  │ (결정론적)   │ │ (비결정론적) │ │
│  │ - 오케스트레 │ │ - LLM 호출  │ │
│  │ - 순서 결정 │ │ - API 호출  │ │
│  │ - 상태 없음 │ │ - DB 쓰기   │ │
│  └──────────────┘ └──────────────┘ │
└─────────────────────────────────────┘
# 설치
# pip install temporalio anthropic
# activities.py — 실제 작업은 전부 여기
import temporalio.activity as activity
import anthropic
import httpx

client = anthropic.Anthropic()

# ✅ Activity: 비결정론적 작업 담당
# - LLM 호출, API 요청, DB 접근 전부 여기
# - Temporal이 자동 재시도

@activity.defn(name="generate_search_queries")
async def generate_search_queries(topic: str) -> list[str]:
    """LLM으로 검색 쿼리 생성"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=200,
        messages=[{
            "role": "user",
            "content": f"'{topic}'를 조사하기 위한 검색 쿼리 5개를 JSON 배열로 반환해줘"
        }]
    )
    import json
    return json.loads(response.content[0].text)

@activity.defn(name="search_web")
async def search_web(queries: list[str]) -> list[dict]:
    """웹 검색 실행"""
    results = []
    async with httpx.AsyncClient() as http:
        for query in queries:
            resp = await http.get(
                "https://api.search-provider.com/search",
                params={"q": query},
                timeout=10.0
            )
            results.extend(resp.json().get("results", []))
    return results

@activity.defn(name="analyze_and_write")
async def analyze_and_write(
    topic: str,
    search_results: list[dict]
) -> str:
    """검색 결과 분석 후 초안 작성"""
    context = "\n".join([r["snippet"] for r in search_results[:10]])
    
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2000,
        messages=[{
            "role": "user",
            "content": f"주제: {topic}\n\n참고자료:\n{context}\n\n위 자료를 바탕으로 상세 보고서를 작성해줘"
        }]
    )
    return response.content[0].text

@activity.defn(name="save_report")
async def save_report(topic: str, content: str) -> str:
    """DB 저장 (예시)"""
    import asyncpg
    conn = await asyncpg.connect("postgresql://localhost/reports")
    try:
        result = await conn.fetchrow(
            "INSERT INTO reports (topic, content) VALUES ($1, $2) RETURNING id",
            topic, content
        )
        return str(result["id"])
    finally:
        await conn.close()
# workflows.py — 오케스트레이션 로직
import temporalio.workflow as workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
from activities import (
    generate_search_queries,
    search_web,
    analyze_and_write,
    save_report,
)

# ✅ Workflow: 결정론적 오케스트레이터
# 금지 사항:
#   ❌ random, uuid4() → workflow.uuid4() 사용
#   ❌ datetime.now() → workflow.now() 사용
#   ❌ 직접 API 호출 → Activity로 위임
#   ❌ asyncio.sleep() → workflow.sleep() 사용

@workflow.defn(name="ResearchWorkflow")
class ResearchWorkflow:
    
    @workflow.run
    async def run(self, topic: str) -> dict:
        
        # 1단계: 검색 쿼리 생성
        # execute_activity가 Event History에 결과 저장
        # 실패 → 자동 재시도, 성공 후 재실행 → 저장된 결과 그대로 사용
        queries = await workflow.execute_activity(
            generate_search_queries,
            topic,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=1),
                backoff_coefficient=2.0,
                maximum_attempts=3,
            )
        )
        
        # 2단계: 웹 검색
        search_results = await workflow.execute_activity(
            search_web,
            queries,
            start_to_close_timeout=timedelta(minutes=2),
            retry_policy=RetryPolicy(
                maximum_attempts=5,
                non_retryable_error_types=["InvalidQueryError"],
            )
        )
        
        # 3단계: 분석 및 초안 작성 (LLM 호출 — 오래 걸릴 수 있음)
        draft = await workflow.execute_activity(
            analyze_and_write,
            args=[topic, search_results],
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=2),
                maximum_attempts=3,
            )
        )
        
        # 4단계: DB 저장
        report_id = await workflow.execute_activity(
            save_report,
            args=[topic, draft],
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=RetryPolicy(maximum_attempts=10),  # DB는 많이 재시도
        )
        
        return {
            "report_id": report_id,
            "topic": topic,
            "draft_length": len(draft),
        }
개념 정리
→ execute_activity: 결과를 Event History에 저장 → 재실행 시 재사용
→ start_to_close_timeout: Activity 한 번 실행의 최대 시간
→ retry_policy: 실패 시 재시도 설정 (간격·배수·최대 횟수)
→ non_retryable_error_types: 이 에러는 재시도 안 함 (영구 실패로 처리)
→ Workflow 안에서 crash 나도 → 다른 워커가 Event History 읽고 queries부터 재개

실전 3 — Worker 실행 및 워크플로우 시작

# worker.py — Worker 프로세스
import asyncio
import temporalio.client as temporal_client
import temporalio.worker as temporal_worker
from workflows import ResearchWorkflow
from activities import (
    generate_search_queries,
    search_web,
    analyze_and_write,
    save_report,
)

async def run_worker():
    # Temporal Server에 연결 (로컬 개발)
    client = await temporal_client.Client.connect("localhost:7233")
    
    # Worker 생성 — Workflow와 Activity 등록
    worker = temporal_worker.Worker(
        client,
        task_queue="research-queue",       # 작업 큐 이름
        workflows=[ResearchWorkflow],
        activities=[
            generate_search_queries,
            search_web,
            analyze_and_write,
            save_report,
        ],
    )
    
    print("Worker 시작됨. Ctrl+C로 종료.")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(run_worker())
# starter.py — 워크플로우 실행 요청
import asyncio
import temporalio.client as temporal_client
from workflows import ResearchWorkflow

async def start_research(topic: str):
    client = await temporal_client.Client.connect("localhost:7233")
    
    # 워크플로우 실행 요청
    # Temporal Server가 워커에 작업 분배
    handle = await client.start_workflow(
        ResearchWorkflow.run,
        topic,
        id=f"research-{topic.replace(' ', '-')}",  # 고유 ID (중복 실행 방지)
        task_queue="research-queue",
    )
    
    print(f"워크플로우 시작: {handle.id}")
    
    # 결과 대기 (선택 — 비동기로 실행하고 나중에 조회도 가능)
    result = await handle.result()
    print(f"완료: {result}")
    return result

asyncio.run(start_research("2026년 AI 에이전트 시장 동향"))
# 로컬 Temporal Server 실행 (개발용)
# Docker 없이 단일 바이너리로 실행 가능
brew install temporal       # macOS
temporal server start-dev   # 포트 7233

# 또는 Docker
docker run --rm -p 7233:7233 temporalio/auto-setup:latest

# Web UI: http://localhost:8233
# → 워크플로우 실행 상태, Event History, 재시도 현황 실시간 확인
개념 정리
→ task_queue: Worker와 Workflow를 연결하는 채널 — 같은 큐 이름으로 매칭
→ workflow id: 고유 식별자 — 같은 ID로 중복 실행 시 기존 실행 재사용 (멱등성)
→ handle.result(): 완료 대기 — 백그라운드 실행도 가능 (start_workflow만 호출)
→ Web UI (localhost:8233): 실행 중 워크플로우 실시간 모니터링, 강제 종료·신호 전송 가능

실전 4 — Activity Retry Policy 실전 패턴

# retry_patterns.py — 실무에서 자주 쓰는 재시도 패턴
from temporalio.common import RetryPolicy
from datetime import timedelta

# ─────────────────────────────────────
# 패턴 1: LLM API 호출 — rate limit 대응
# ─────────────────────────────────────
LLM_RETRY_POLICY = RetryPolicy(
    initial_interval=timedelta(seconds=2),    # 첫 재시도 2초 후
    backoff_coefficient=2.0,                  # 매번 2배 증가 (2→4→8→16...)
    maximum_interval=timedelta(seconds=60),   # 최대 60초 대기
    maximum_attempts=5,                       # 최대 5회
    non_retryable_error_types=[
        "InvalidRequestError",                # 잘못된 프롬프트 → 재시도 의미 없음
        "AuthenticationError",                # API 키 오류 → 재시도 의미 없음
    ]
)

# ─────────────────────────────────────
# 패턴 2: DB 쓰기 — 낙관적 재시도
# ─────────────────────────────────────
DB_RETRY_POLICY = RetryPolicy(
    initial_interval=timedelta(milliseconds=500),
    backoff_coefficient=1.5,
    maximum_attempts=10,                      # DB는 많이 재시도
    non_retryable_error_types=[
        "UniqueViolationError",               # 중복 키 → 재시도 안 함
        "ForeignKeyViolationError",           # 참조 오류 → 재시도 안 함
    ]
)

# ─────────────────────────────────────
# 패턴 3: 이메일 발송 — 한 번만
# ─────────────────────────────────────
EMAIL_RETRY_POLICY = RetryPolicy(
    maximum_attempts=1,                       # 중복 발송 방지
)

# ─────────────────────────────────────
# 패턴 4: 외부 API — 멱등성 확인 후 재시도
# ─────────────────────────────────────
EXTERNAL_API_RETRY_POLICY = RetryPolicy(
    initial_interval=timedelta(seconds=5),
    maximum_attempts=3,
    non_retryable_error_types=[
        "HTTP_400",   # 잘못된 요청
        "HTTP_401",   # 인증 실패
        "HTTP_403",   # 권한 없음
        "HTTP_404",   # 리소스 없음
    ]
    # HTTP 429, 500, 502, 503은 재시도 O
)

# 워크플로우에서 적용
@workflow.defn
class SmartResearchWorkflow:
    @workflow.run
    async def run(self, topic: str) -> str:
        
        # LLM 호출
        draft = await workflow.execute_activity(
            analyze_and_write,
            args=[topic, []],
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=LLM_RETRY_POLICY,       # LLM 전용 정책
        )
        
        # DB 저장
        report_id = await workflow.execute_activity(
            save_report,
            args=[topic, draft],
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=DB_RETRY_POLICY,         # DB 전용 정책
        )
        
        # 이메일 발송 (중복 방지)
        await workflow.execute_activity(
            send_notification_email,
            report_id,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=EMAIL_RETRY_POLICY,      # 1회만
        )
        
        return report_id
개념 정리
→ backoff_coefficient: 재시도 간격 증가 배율 — 2.0이면 지수적 증가
→ maximum_interval: 지수 증가에 상한선 — 무한정 늘어나지 않게
→ non_retryable_error_types: 이 에러는 즉시 실패 처리 → 불필요한 재시도 차단
→ 이메일·결제처럼 중복 실행이 위험한 작업 → maximum_attempts=1 또는 멱등키 사용
→ Activity는 원자적: 3단계 중 2단계에서 실패 → 전체 재시도 → 멱등성 설계 필수

마무리 (1편)

✅ Temporal Durable Execution 적용 후
→ 에이전트 47단계 중 실패 → 실패 지점부터 자동 재개 (1단계부터 재시작 없음)
→ LLM 호출 비용 중복 발생 없음 — 이미 완료된 Activity 결과 재사용
→ 워커 프로세스 죽어도 → 다른 워커가 이어받아 계속 실행
→ 재시도 로직을 비즈니스 코드에 안 씀 — Retry Policy로 선언만
→ Web UI에서 실행 중 워크플로우 실시간 모니터링

❌ 일반 코드로 계속 에이전트 짜면
→ 매 실패마다 처음부터 → LLM 비용 N배 낭비
→ 복잡한 체크포인트 코드가 비즈니스 로직을 오염
→ "어디까지 완료됐나" 추적이 곧 새 프로젝트 수준
→ 분산 환경에서 중복 실행, 레이스 컨디션 직접 해결

 

반응형