AI Agent

AI 에이전트 Durable Execution 실전 2편 — Human-in-the-Loop·멀티에이전트·Serverless

cell-devlog 2026. 5. 21. 16:32
반응형

1편에서 기본 Workflow/Activity 구조를 잡았다면, 2편은 프로덕션에서 실제로 필요한 고급 패턴입니다.

핵심 요약
→ Signal: 실행 중 워크플로우에 외부에서 이벤트 주입 — 인간 승인 대기 구현
→ wait_condition: 조건 충족 전까지 컴퓨팅 자원 0 소비하며 무한 대기
→ 승인 대기 중 서버 죽어도 → 재시작 후 대기 상태 그대로 복구
→ Child Workflow: 서브에이전트를 독립 워크플로우로 분리 → 병렬 실행
→ asyncio.gather로 여러 Child Workflow 동시 시작 → 전체 완료까지 대기
→ Workflow Streams (Replay 2026): LLM 토큰 스트리밍을 Durable하게 처리
→ Serverless Workers (Replay 2026): AWS Lambda에 Worker 배포, 트래픽 0이면 비용 0
→ 실전 조합: Workflow 오케스트레이터 + Activity 실제 작업 + Signal 인간 개입

실전 1 — Human-in-the-Loop: Signal로 인간 승인 대기

에이전트가 위험한 작업(DB 삭제, 대량 이메일, 결제)을 하기 전에 사람한테 "이거 할까요?" 묻고 기다리는 패턴.

# human_in_loop.py
import asyncio
from dataclasses import dataclass
from enum import StrEnum
from datetime import timedelta
import temporalio.workflow as workflow
import temporalio.activity as activity
from temporalio.common import RetryPolicy
import anthropic

# ─────────────────────────────────────
# Signal 데이터 타입 정의
# ─────────────────────────────────────
class ApprovalDecision(StrEnum):
    APPROVE = "APPROVE"   # 승인 → 실행
    REJECT = "REJECT"     # 거부 → 취소
    EDIT = "EDIT"         # 수정 요청 → 프롬프트 변경 후 재생성

@dataclass
class ApprovalSignal:
    decision: ApprovalDecision
    comment: str = ""           # 거부/수정 이유
    edited_prompt: str = ""     # EDIT 시 새 지시사항

# ─────────────────────────────────────
# Activities
# ─────────────────────────────────────
client = anthropic.Anthropic()

@activity.defn
async def analyze_and_propose(task: str) -> dict:
    """LLM이 태스크 분석 후 실행 계획 제안"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""
태스크: {task}

다음을 JSON으로 반환해줘:
- action: 수행할 작업 설명
- risk_level: "low" / "medium" / "high"
- requires_approval: risk_level이 medium 이상이면 true
"""
        }]
    )
    import json
    return json.loads(response.content[0].text)

@activity.defn
async def execute_action(action: str) -> str:
    """실제 작업 실행"""
    # 실제 구현: DB 작업, API 호출 등
    return f"완료: {action}"

@activity.defn
async def notify_human(workflow_id: str, proposal: dict) -> None:
    """슬랙/이메일로 승인 요청 알림 발송"""
    import httpx
    await httpx.AsyncClient().post(
        "https://hooks.slack.com/your-webhook",
        json={
            "text": f"⚠️ 에이전트 승인 요청\n"
                    f"작업: {proposal['action']}\n"
                    f"위험도: {proposal['risk_level']}\n"
                    f"승인 명령어: `temporal workflow signal "
                    f"--workflow-id {workflow_id} "
                    f"--name approval_signal "
                    f"--input '{{\"decision\":\"APPROVE\"}}'`"
        }
    )

# ─────────────────────────────────────
# Workflow — Signal Handler 포함
# ─────────────────────────────────────
@workflow.defn(name="ApprovalWorkflow")
class ApprovalWorkflow:
    
    def __init__(self):
        self._approval: ApprovalSignal | None = None
    
    # ✅ Signal Handler 등록
    # 외부에서 temporal workflow signal 명령으로 호출
    @workflow.signal(name="approval_signal")
    async def receive_approval(self, signal: ApprovalSignal) -> None:
        self._approval = signal
    
    # Query Handler: 현재 상태를 외부에서 조회
    @workflow.query(name="get_status")
    def get_status(self) -> str:
        if self._approval is None:
            return "승인 대기 중"
        return f"결정됨: {self._approval.decision}"
    
    @workflow.run
    async def run(self, task: str) -> str:
        
        # 1단계: LLM이 작업 분석 + 실행 계획 제안
        proposal = await workflow.execute_activity(
            analyze_and_propose,
            task,
            start_to_close_timeout=timedelta(seconds=30),
        )
        
        # 위험도 낮으면 바로 실행
        if not proposal.get("requires_approval"):
            result = await workflow.execute_activity(
                execute_action,
                proposal["action"],
                start_to_close_timeout=timedelta(minutes=5),
            )
            return f"자동 실행 완료: {result}"
        
        # 위험도 높음 → 인간 승인 필요
        # 2단계: 슬랙 알림 발송
        await workflow.execute_activity(
            notify_human,
            args=[workflow.info().workflow_id, proposal],
            start_to_close_timeout=timedelta(seconds=10),
        )
        
        # 3단계: 승인 Signal 대기
        # ✅ 핵심: wait_condition 대기 중 컴퓨팅 자원 0 소비
        # 서버 재시작 후에도 대기 상태 그대로 복구됨
        await workflow.wait_condition(
            lambda: self._approval is not None,
            timeout=timedelta(hours=24),  # 24시간 내 응답 없으면 타임아웃
        )
        
        # 4단계: 결정에 따라 분기
        decision = self._approval.decision
        
        if decision == ApprovalDecision.REJECT:
            return f"거부됨: {self._approval.comment}"
        
        if decision == ApprovalDecision.EDIT:
            # 수정된 지시사항으로 재분석
            new_proposal = await workflow.execute_activity(
                analyze_and_propose,
                self._approval.edited_prompt,
                start_to_close_timeout=timedelta(seconds=30),
            )
            proposal = new_proposal
        
        # 5단계: 승인됨 → 실행
        result = await workflow.execute_activity(
            execute_action,
            proposal["action"],
            start_to_close_timeout=timedelta(minutes=5),
        )
        return f"승인 후 실행 완료: {result}"
# 승인 Signal 전송 (CLI로 직접 또는 API로 자동화)
temporal workflow signal \
  --workflow-id "approval-wf-001" \
  --name "approval_signal" \
  --input '{"decision": "APPROVE", "comment": "확인 완료"}'

# 거부
temporal workflow signal \
  --workflow-id "approval-wf-001" \
  --name "approval_signal" \
  --input '{"decision": "REJECT", "comment": "위험도 너무 높음"}'

# 현재 상태 조회
temporal workflow query \
  --workflow-id "approval-wf-001" \
  --type "get_status"
개념 정리
→ Signal: 실행 중 워크플로우에 외부 이벤트 주입 — HTTP로도 전송 가능
→ wait_condition: 람다가 True 반환할 때까지 대기 — CPU 0% 사용
→ timeout 설정: 24시간 내 응답 없으면 자동 타임아웃 처리
→ Query: 워크플로우 상태를 읽기 전용으로 조회 — 상태 변경 없음
→ 서버 재시작 시: Signal 대기 상태를 Event History에서 복구 → 사람이 이틀 후 응답해도 OK

실전 2 — 멀티에이전트: Child Workflow로 병렬 오케스트레이션

에이전트 하나가 모든 걸 하지 말고, 전문 서브에이전트를 병렬로 돌려서 속도 극대화.

# multi_agent.py
import asyncio
from dataclasses import dataclass
from datetime import timedelta
import temporalio.workflow as workflow
import temporalio.activity as activity

# ─────────────────────────────────────
# 서브에이전트 Workflow 정의
# 각자 독립적으로 실행 + 실패해도 독립적으로 재시도
# ─────────────────────────────────────
@workflow.defn(name="SearchAgent")
class SearchAgent:
    @workflow.run
    async def run(self, query: str) -> list[str]:
        results = await workflow.execute_activity(
            web_search,
            query,
            start_to_close_timeout=timedelta(minutes=2),
            retry_policy=RetryPolicy(maximum_attempts=3),
        )
        return results

@workflow.defn(name="AnalysisAgent")
class AnalysisAgent:
    @workflow.run
    async def run(self, data: list[str]) -> str:
        analysis = await workflow.execute_activity(
            analyze_data,
            data,
            start_to_close_timeout=timedelta(minutes=5),
        )
        return analysis

@workflow.defn(name="WritingAgent")
class WritingAgent:
    @workflow.run
    async def run(self, topic: str, research: str) -> str:
        draft = await workflow.execute_activity(
            write_content,
            args=[topic, research],
            start_to_close_timeout=timedelta(minutes=10),
        )
        return draft

# ─────────────────────────────────────
# 오케스트레이터 Workflow
# 서브에이전트들을 병렬로 실행하고 결과 취합
# ─────────────────────────────────────
@workflow.defn(name="ResearchOrchestratorWorkflow")
class ResearchOrchestratorWorkflow:
    
    @workflow.run
    async def run(self, topic: str) -> str:
        
        # 1단계: 검색 쿼리 여러 개 생성
        queries = [
            f"{topic} 최신 동향",
            f"{topic} 사례 연구",
            f"{topic} 기술 분석",
        ]
        
        # ✅ 핵심: 검색 에이전트 3개 동시 병렬 실행
        # Child Workflow마다 독립적 Event History → 각자 실패해도 독립 재시도
        search_handles = await asyncio.gather(*[
            workflow.start_child_workflow(
                SearchAgent.run,
                query,
                id=f"search-{workflow.info().workflow_id}-{i}",
                task_queue="agent-queue",
            )
            for i, query in enumerate(queries)
        ])
        
        # 모든 검색 완료 대기
        search_results = await asyncio.gather(*[
            handle.result() for handle in search_handles
        ])
        
        # 검색 결과 평탄화
        all_results = [r for results in search_results for r in results]
        
        # 2단계: 분석 + 작성 병렬 실행
        # 분석은 오래 걸리고, 목차 초안은 별도로 먼저 생성
        analysis_handle = await workflow.start_child_workflow(
            AnalysisAgent.run,
            all_results,
            id=f"analysis-{workflow.info().workflow_id}",
            task_queue="agent-queue",
        )
        
        # 분석 완료 대기
        analysis = await analysis_handle.result()
        
        # 3단계: 최종 작성
        final_draft = await workflow.execute_child_workflow(
            WritingAgent.run,
            args=[topic, analysis],
            id=f"writing-{workflow.info().workflow_id}",
            task_queue="agent-queue",
        )
        
        return final_draft

# ─────────────────────────────────────
# Worker — 모든 Workflow + Activity 등록
# ─────────────────────────────────────
async def run_worker():
    client = await temporal_client.Client.connect("localhost:7233")
    
    worker = temporal_worker.Worker(
        client,
        task_queue="agent-queue",
        workflows=[
            ResearchOrchestratorWorkflow,
            SearchAgent,
            AnalysisAgent,
            WritingAgent,
        ],
        activities=[web_search, analyze_data, write_content],
    )
    await worker.run()
개념 정리
→ start_child_workflow: Child Workflow 시작 후 handle 반환 — 비동기 병렬 시작 가능
→ execute_child_workflow: 시작 + 완료 대기 한 번에
→ asyncio.gather: 여러 Child Workflow 동시 시작 / 동시 완료 대기
→ Child Workflow ID: 유니크해야 함 — 부모 ID + 인덱스 조합 권장
→ Child Workflow 실패 → 기본적으로 부모도 실패 (Parent Close Policy로 조정 가능)
→ 병렬 3개 검색 → 순차 대비 속도 3x → LLM 비용은 동일

실전 3 — Workflow Streams: LLM 스트리밍을 Durable하게

Replay 2026 신규 기능. LLM 토큰 스트리밍 출력을 Temporal의 내구성 보장과 결합.

# workflow_streams.py
# Replay 2026 신규: Workflow Streams (Public Preview)
import temporalio.workflow as workflow
from temporalio.workflow import UpdateInfo

@workflow.defn(name="StreamingAgentWorkflow")
class StreamingAgentWorkflow:
    
    def __init__(self):
        self._tokens: list[str] = []      # 누적 토큰
        self._is_complete: bool = False
        self._error: str | None = None
    
    # Update Handler: 클라이언트가 토큰 청크를 폴링
    @workflow.update(name="get_tokens")
    async def get_tokens(self, since_index: int) -> dict:
        """since_index 이후 새 토큰 반환"""
        return {
            "tokens": self._tokens[since_index:],
            "is_complete": self._is_complete,
            "error": self._error,
        }
    
    @workflow.run
    async def run(self, prompt: str) -> str:
        
        # LLM 스트리밍 Activity 실행
        # Activity가 토큰을 하나씩 Signal로 워크플로우에 전송
        await workflow.execute_activity(
            stream_llm_response,
            args=[prompt, workflow.info().workflow_id],
            start_to_close_timeout=timedelta(minutes=10),
        )
        
        self._is_complete = True
        return "".join(self._tokens)
    
    # Signal: Activity가 토큰을 실시간으로 전송
    @workflow.signal(name="token_chunk")
    async def receive_token(self, chunk: str) -> None:
        self._tokens.append(chunk)

@activity.defn
async def stream_llm_response(prompt: str, workflow_id: str) -> None:
    """LLM 스트리밍 응답을 토큰 단위로 Workflow에 Signal 전송"""
    client_temporal = await temporal_client.Client.connect("localhost:7233")
    handle = client_temporal.get_workflow_handle(workflow_id)
    
    anthropic_client = anthropic.Anthropic()
    
    # 스트리밍 모드로 LLM 호출
    with anthropic_client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2000,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        buffer = ""
        for text in stream.text_stream:
            buffer += text
            # 단어 단위로 묶어서 Signal 전송 (토큰 하나씩은 너무 빈번)
            if len(buffer) > 20 or text in ".!?\n":
                await handle.signal("token_chunk", buffer)
                buffer = ""
        
        if buffer:  # 남은 버퍼 flush
            await handle.signal("token_chunk", buffer)

# 클라이언트 측 스트리밍 수신
async def display_streaming_response(workflow_id: str):
    client = await temporal_client.Client.connect("localhost:7233")
    handle = client.get_workflow_handle(workflow_id)
    
    index = 0
    print("에이전트 응답: ", end="", flush=True)
    
    while True:
        result = await handle.execute_update("get_tokens", index)
        
        new_tokens = result["tokens"]
        for token in new_tokens:
            print(token, end="", flush=True)
        index += len(new_tokens)
        
        if result["is_complete"]:
            print()  # 줄바꿈
            break
        
        if result["error"]:
            print(f"\n오류: {result['error']}")
            break
        
        await asyncio.sleep(0.1)  # 100ms 폴링
개념 정리
→ Workflow Streams: Signal + Update 기반 스트리밍 — Temporal 내구성 그대로 유지
→ Signal(token_chunk): Activity → Workflow 실시간 토큰 전송
→ Update(get_tokens): 클라이언트가 폴링으로 새 토큰 수신
→ 스트리밍 중 서버 재시작 → 누적된 _tokens에서 이어서 전송 가능
→ 토큰 하나당 Signal은 너무 빈번 → 20자 단위 버퍼링 권장

실전 4 — Serverless Workers: AWS Lambda 배포 (Replay 2026 신규)

상시 서버 안 띄우고, 트래픽 있을 때만 Lambda 실행 → 비용 극소화.

# lambda_worker.py — AWS Lambda용 Temporal Worker
import asyncio
import temporalio.client as temporal_client
import temporalio.worker as temporal_worker
from workflows import ResearchWorkflow, ApprovalWorkflow
from activities import (
    generate_search_queries, search_web,
    analyze_and_write, save_report,
)

# Lambda 핸들러
def handler(event, context):
    asyncio.run(run_worker_once())

async def run_worker_once():
    """Lambda 실행마다 Worker 시작 → 작업 처리 → 종료"""
    client = await temporal_client.Client.connect(
        "your-namespace.tmprl.cloud:7233",   # Temporal Cloud 엔드포인트
        namespace="your-namespace",
        tls=True,
    )
    
    # Serverless Worker: max_concurrent_activities 낮게 설정
    # Lambda 메모리·실행 시간 제한 고려
    worker = temporal_worker.Worker(
        client,
        task_queue="research-queue",
        workflows=[ResearchWorkflow, ApprovalWorkflow],
        activities=[
            generate_search_queries, search_web,
            analyze_and_write, save_report,
        ],
        max_concurrent_activities=5,        # Lambda 동시성 제한
        max_concurrent_workflow_tasks=10,
    )
    
    # ✅ Serverless 핵심: 작업 있을 때만 실행, 없으면 종료
    # Temporal Server가 Lambda를 자동 호출·종료 관리
    await worker.run()
# serverless.yml (AWS SAM / Serverless Framework)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  TemporalWorker:
    Type: AWS::Serverless::Function
    Properties:
      Handler: lambda_worker.handler
      Runtime: python3.12
      Timeout: 900          # Lambda 최대 15분
      MemorySize: 1024      # LLM 호출엔 1GB 권장
      
      Environment:
        Variables:
          ANTHROPIC_API_KEY: !Ref AnthropicApiKey
          TEMPORAL_NAMESPACE: your-namespace
          TEMPORAL_HOST: your-namespace.tmprl.cloud:7233
      
      # Temporal Cloud가 Lambda를 자동 호출
      # 별도 EventBridge/SQS 트리거 불필요
# 기존 상시 Worker vs Serverless Worker 비용 비교

ALWAYS_ON_COST = {
    "EC2 t3.medium": 0.0416,           # $0.0416/시간
    "월 비용": 0.0416 * 24 * 30,       # $29.95/월
    "트래픽 없어도": "동일 비용 발생",
}

SERVERLESS_COST = {
    "Lambda 실행": 0.0000166667,        # $0.0000166667/GB-초
    "1GB 메모리 × 60초": 0.001,         # 요청당 $0.001
    "월 1000회 실행": 1.0,              # $1/월
    "트래픽 없으면": "$0",
}

# ✅ 일 평균 100회 미만 에이전트 실행 → Serverless가 90%+ 저렴
# ❌ 상시 고빈도 트래픽 → EC2/ECS 상시 Worker가 유리
개념 정리
→ Serverless Workers (Replay 2026, Pre-release): Lambda에 Worker 배포
→ Temporal Server가 자동으로 Lambda 호출·종료 → 인프라 관리 불필요
→ 스케일 to zero: 트래픽 없으면 Lambda 안 돌아감 → 비용 0
→ 적합한 경우: 간헐적 에이전트 (일 100회 미만), 배치 작업, 개발/스테이징
→ 부적합한 경우: 상시 고빈도 트래픽 → Lambda cold start 레이턴시 이슈
→ 현재 AWS Lambda 지원, 추후 Google Cloud Functions / Azure Functions 예정

마무리 (2편 최종)

✅ 2편 패턴 모두 적용하면
→ 위험 작업 전 자동 승인 요청 — 사람이 이틀 후 응답해도 에이전트 대기 중
→ 검색·분석·작성 서브에이전트 병렬 실행 → 속도 3x
→ LLM 스트리밍 응답을 내구성 보장하며 실시간 전달
→ 트래픽 없으면 서버 비용 $0 — Serverless Worker

❌ 이 패턴 없이 프로덕션 에이전트 운영하면
→ 위험 작업 승인 체계 없음 → 에이전트가 DB 통째로 날릴 수도
→ 순차 실행으로 속도 느림 → 병렬화 직접 구현하면 복잡도 폭발
→ 상시 서버 비용 월 $30+ → 개발 환경에도 동일 청구
→ 에이전트 = 아직도 블랙박스 → 어디서 무슨 결정 했는지 불명

관련글


 

반응형