본문 바로가기

AI Agent

LangGraph 상태 영속성(Checkpointing) — 에이전트가 죽어도 이어서 실행하는 법

반응형

에이전트가 10분째 실행 중이었습니다. 서버가 죽었습니다. 처음부터 다시입니다. Checkpointing을 붙이면 마지막 노드에서 이어서 실행합니다.

[핵심 요약]
→ Checkpointing: 매 노드 실행 후 그래프 상태를 DB에 저장
→ Thread: 대화/작업 단위 식별자 (thread_id로 상태 분리)
→ 백엔드: MemorySaver(개발) → SQLite(단일서버) → PostgreSQL(프로덕션)
→ 3가지 핵심 기능: 크래시 복구 / 멀티턴 메모리 / 타임트래블
→ Human-in-the-Loop: interrupt_before로 중간 승인 게이트 구현 가능
→ langgraph-checkpoint 최신: 4.1.0 (2026년 5월 12일 릴리즈)

Checkpointing이 없으면 생기는 일

Checkpointing 없는 에이전트:
→ 실행 중 서버 크래시 → 처음부터 재시작
→ 세션 종료 → 대화 기록 증발
→ 장시간 작업 → 중간 결과 저장 불가
→ 멀티 유저 → 상태 분리 불가
→ 디버깅 → 실행 흐름 추적 불가

Checkpointing 붙인 에이전트:
→ 크래시 → 마지막 노드부터 재개
→ 세션 종료 → thread_id로 이어서 대화
→ 장시간 작업 → 각 노드 완료마다 저장
→ 멀티 유저 → thread_id로 상태 완전 분리
→ 디버깅 → 과거 체크포인트로 타임트래블

실전 1 — MemorySaver (개발 환경)

가장 빠르게 시작하는 방법입니다. 메모리에 저장하므로 프로세스 종료 시 사라집니다. 개발/테스트 전용입니다.

pip install langgraph langchain-anthropic
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
import operator

# 상태 정의
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]  # 메시지 누적
    task_status: str
    retry_count: int

# 노드 함수
llm = ChatAnthropic(model="claude-sonnet-4-5")

def process_node(state: AgentState):
    response = llm.invoke(state["messages"])
    return {
        "messages": [response],
        "task_status": "processing"
    }

def review_node(state: AgentState):
    # 결과 검토 로직
    return {"task_status": "completed"}

# 그래프 구성
builder = StateGraph(AgentState)
builder.add_node("process", process_node)
builder.add_node("review", review_node)
builder.set_entry_point("process")
builder.add_edge("process", "review")
builder.add_edge("review", END)

# ✅ Checkpointer 주입
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# 실행 — thread_id로 대화 단위 구분
config = {"configurable": {"thread_id": "user-123-session-1"}}

result = graph.invoke(
    {"messages": [("user", "데이터 분석 작업 시작해줘")],
     "task_status": "started",
     "retry_count": 0},
    config=config
)
[MemorySaver 핵심 개념]
→ thread_id: 대화/작업 단위 식별자. 같은 thread_id면 상태 이어받음
→ 매 노드 전환 시 상태 자동 스냅샷 저장
→ 프로세스 재시작 시 상태 소멸 (개발/테스트 전용)
→ 멀티스레드 안전 (동일 thread_id 동시 접근 차단)

이어서 대화하려면 같은 thread_id로 호출합니다.

# 같은 thread_id → 이전 대화 기억
result2 = graph.invoke(
    {"messages": [("user", "아까 분석 결과 요약해줘")]},
    config=config  # thread_id: "user-123-session-1" 동일
)

# 현재 상태 조회
state = graph.get_state(config)
print(state.values)        # 현재 상태값
print(state.next)          # 다음 실행 노드
print(state.metadata)      # 체크포인트 메타데이터

# 체크포인트 히스토리 전체 조회
for checkpoint in graph.get_state_history(config):
    print(checkpoint.metadata["step"], checkpoint.values["task_status"])

실전 2 — SQLiteSaver (단일 서버)

프로세스 재시작 후에도 상태가 유지됩니다. 단일 서버 프로덕션 또는 로컬 에이전트에 적합합니다.

pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver

# SQLite 파일 기반 체크포인터
with SqliteSaver.from_conn_string("./agent_checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "task-456"}}

    # 첫 실행
    graph.invoke(
        {"messages": [("user", "긴 작업 시작")],
         "task_status": "started",
         "retry_count": 0},
        config=config
    )

# --- 서버 재시작 후 ---
with SqliteSaver.from_conn_string("./agent_checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

    # ✅ 같은 thread_id → 중단된 지점부터 재개
    state = graph.get_state({"configurable": {"thread_id": "task-456"}})
    print(f"재개 위치: {state.next}")
    print(f"저장된 상태: {state.values['task_status']}")
[SQLiteSaver 특징]
→ 파일 기반 (.db) → 프로세스 재시작 후에도 상태 유지
→ 설치 간단, 외부 DB 불필요
→ 동시 접속 제한 (SQLite 특성상 단일 프로세스 권장)
→ 적합: 로컬 에이전트, 단일 서버, 소규모 프로덕션
→ 한계: 분산 환경 불가, 동시 다중 사용자 부적합

실전 3 — PostgresSaver (프로덕션)

멀티 유저, 분산 환경, 고가용성이 필요한 프로덕션 환경의 표준입니다.

pip install langgraph-checkpoint-postgres psycopg
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg

DB_URI = "postgresql://user:password@localhost:5432/agent_db"

# 연결 풀 설정
conn = psycopg.connect(DB_URI, autocommit=True)
checkpointer = PostgresSaver(conn)

# 최초 1회 — 체크포인트 테이블 생성
checkpointer.setup()

graph = builder.compile(checkpointer=checkpointer)

# 멀티 유저 — thread_id로 완전 분리
users = ["user-001", "user-002", "user-003"]

for user_id in users:
    config = {"configurable": {"thread_id": f"session-{user_id}"}}
    graph.invoke(
        {"messages": [("user", f"{user_id}의 작업")],
         "task_status": "started",
         "retry_count": 0},
        config=config
    )

# 특정 유저 상태 조회
state = graph.get_state({"configurable": {"thread_id": "session-user-001"}})
print(state.values)
[PostgresSaver 프로덕션 설정]
→ connection pool: psycopg_pool.AsyncConnectionPool 사용 권장
→ 테이블 자동 생성: checkpointer.setup() 최초 1회 실행
→ 분산 환경: 여러 워커 노드에서 동일 DB 공유 가능
→ 장애 복구: 워커 죽어도 다른 워커가 동일 thread 재개
→ 인덱스: thread_id, checkpoint_id 자동 인덱싱

비동기 환경(FastAPI 등)에서는 AsyncPostgresSaver를 사용합니다.

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool

async def setup_graph():
    pool = AsyncConnectionPool(
        conninfo=DB_URI,
        max_size=20,  # 최대 동시 연결
        kwargs={"autocommit": True}
    )
    async with pool.connection() as conn:
        checkpointer = AsyncPostgresSaver(conn)
        await checkpointer.setup()

    graph = builder.compile(checkpointer=checkpointer)
    return graph

실전 4 — 타임트래블 & Human-in-the-Loop

타임트래블 — 과거 체크포인트로 롤백

# 체크포인트 히스토리 전체 조회
history = list(graph.get_state_history(config))

# 히스토리 출력
for i, checkpoint in enumerate(history):
    print(f"Step {checkpoint.metadata['step']}: "
          f"{checkpoint.values.get('task_status')} "
          f"→ next: {checkpoint.next}")

# Step 3으로 롤백
target_checkpoint = history[2]  # step 3
rollback_config = target_checkpoint.config

# ✅ 과거 시점부터 재실행 (다른 입력으로 분기 가능)
graph.invoke(
    {"messages": [("user", "다른 방향으로 다시 시도")]},
    config=rollback_config
)

Human-in-the-Loop — 중간 승인 게이트

# interrupt_before로 승인 게이트 설정
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["review"]  # review 노드 전에 일시정지
)

config = {"configurable": {"thread_id": "approval-task-789"}}

# 1단계: process 노드까지 실행 후 자동 중단
result = graph.invoke(
    {"messages": [("user", "중요한 작업 실행")],
     "task_status": "started",
     "retry_count": 0},
    config=config
)

# 현재 상태 확인 (review 노드 앞에서 대기 중)
state = graph.get_state(config)
print(f"대기 중인 노드: {state.next}")  # ('review',)
print(f"현재 결과: {state.values['messages'][-1].content}")

# 사람이 검토 후 승인 → None 전달로 재개
graph.invoke(None, config=config)

# 또는 상태 수정 후 재개
graph.update_state(
    config,
    {"task_status": "approved"},  # 상태 수정
    as_node="review"
)
graph.invoke(None, config=config)
[Human-in-the-Loop 패턴]
→ interrupt_before: 특정 노드 실행 전 중단 (검토 후 승인)
→ interrupt_after: 특정 노드 실행 후 중단 (결과 확인 후 계속)
→ graph.update_state(): 재개 전 상태 수동 수정 가능
→ None 전달: 중단된 지점에서 그대로 재개
→ 활용: 코드 실행 전 승인, 외부 API 호출 전 확인, 비용 큰 작업 게이트

마무리

✅ Checkpointing 반드시 써야 하는 경우
→ 실행 시간이 1분 이상인 에이전트
→ 멀티턴 대화가 필요한 챗봇/어시스턴트
→ 사람 승인이 필요한 워크플로우 (Human-in-the-Loop)
→ 멀티 유저를 동시에 처리하는 서비스
→ 외부 API 호출 포함 (실패 시 재시도 비용 큰 경우)

❌ Checkpointing 없어도 되는 경우
→ 단발성 요청 (질문 하나에 답 하나)
→ 상태가 없는 순수 함수형 파이프라인
→ 실행 시간 수초 이내의 단순 에이전트
→ 대화 기록 유지가 필요 없는 배치 처리

관련 글

 

반응형