에이전트가 30분 작업 중 20분에 크래시났습니다. 처음부터 다시 시작합니다. 이 문제를 구조적으로 해결하는 법을 정리했습니다.
[핵심 요약]
→ 문제: LLM 컨텍스트는 세션 종료 시 사라짐 → 장기 작업에서 치명적
→ 해결: 상태를 외부 저장소에 명시적으로 영속화
→ 레이어: 메모리 계층 (인메모리 → Redis → DB) 구분
→ 핵심 패턴: Checkpoint, Event Sourcing, 체크포인트 재시작
→ 도구: LangGraph Checkpointer, Redis, Supabase, MemGPT 패턴
→ 원칙: 에이전트가 죽어도 상태는 살아있어야 함
왜 상태 관리가 어려운가
일반 소프트웨어 상태 관리:
→ 변수에 값 저장 → 프로세스가 살아있는 동안 유지
→ DB 저장 → 영구 보존
AI 에이전트 상태 관리:
→ 컨텍스트 윈도우 = 단기 메모리 (세션 종료 시 소멸)
→ 긴 작업 = 컨텍스트 한도 초과
→ 멀티 에이전트 = 서로 다른 컨텍스트
→ 재시작 = 처음부터 다시
문제 상황:
에이전트: "파일 100개 분석 중... 47번째 분석 중..."
[서버 재시작]
에이전트: "안녕하세요! 무엇을 도와드릴까요?"
→ 47개 분석 결과 전부 소멸
[상태의 3가지 종류]
→ 실행 상태: 현재 어떤 스텝에 있는지, 무엇을 하고 있는지
→ 작업 상태: 지금까지 한 일, 수집한 데이터, 결정 내역
→ 메모리 상태: 이전 세션에서 학습한 정보, 사용자 선호도
실전 1 — 상태 계층 설계
from dataclasses import dataclass, field
from typing import Any
from enum import Enum
from datetime import datetime
import json
class AgentStatus(Enum):
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class StepResult:
"""단일 스텝 실행 결과"""
step_id: str
tool_name: str
input: dict
output: Any
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
success: bool = True
error: str = None
@dataclass
class AgentState:
"""에이전트 전체 상태"""
session_id: str # 세션 고유 ID
task: str # 원래 작업 지시
status: AgentStatus # 현재 상태
current_step: int = 0 # 몇 번째 스텝인지
total_steps: int = None # 총 스텝 수 (알 경우)
steps: list[StepResult] = field(default_factory=list)
context: dict = field(default_factory=dict) # 자유 형식 컨텍스트
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"task": self.task,
"status": self.status.value,
"current_step": self.current_step,
"total_steps": self.total_steps,
"steps": [vars(s) for s in self.steps],
"context": self.context,
"created_at": self.created_at,
"updated_at": self.updated_at
}
@classmethod
def from_dict(cls, data: dict) -> "AgentState":
data["status"] = AgentStatus(data["status"])
data["steps"] = [StepResult(**s) for s in data.get("steps", [])]
return cls(**data)
[상태 설계 원칙]
→ 직렬화 가능: JSON으로 변환 가능해야 DB/Redis에 저장 가능
→ 재현 가능: 상태만 보고 어디서 멈췄는지 파악 가능
→ 최소화: 꼭 필요한 것만 저장 (컨텍스트 전체는 저장하지 않음)
→ 버전 관리: 상태 스키마 변경 시 마이그레이션 고려
실전 2 — 체크포인트 저장소 구현
import redis
import json
from abc import ABC, abstractmethod
class StateStore(ABC):
"""상태 저장소 추상 인터페이스"""
@abstractmethod
async def save(self, state: AgentState) -> None: ...
@abstractmethod
async def load(self, session_id: str) -> AgentState | None: ...
@abstractmethod
async def delete(self, session_id: str) -> None: ...
class RedisStateStore(StateStore):
"""Redis 기반 상태 저장소 — 빠른 읽기/쓰기, TTL 지원"""
def __init__(self, host: str = "localhost", port: int = 6379,
ttl_seconds: int = 86400):
self.redis = redis.Redis(host=host, port=port, decode_responses=True)
self.ttl = ttl_seconds # 24시간 기본값
def _key(self, session_id: str) -> str:
return f"agent:state:{session_id}"
async def save(self, state: AgentState) -> None:
state.updated_at = datetime.now().isoformat()
self.redis.setex(
self._key(state.session_id),
self.ttl,
json.dumps(state.to_dict(), ensure_ascii=False)
)
async def load(self, session_id: str) -> AgentState | None:
data = self.redis.get(self._key(session_id))
if not data:
return None
return AgentState.from_dict(json.loads(data))
async def delete(self, session_id: str) -> None:
self.redis.delete(self._key(session_id))
class PostgresStateStore(StateStore):
"""PostgreSQL 기반 상태 저장소 — 영구 보존, 쿼리 가능"""
def __init__(self, connection_string: str):
import asyncpg
self.conn_str = connection_string
async def save(self, state: AgentState) -> None:
import asyncpg
conn = await asyncpg.connect(self.conn_str)
try:
await conn.execute("""
INSERT INTO agent_states (session_id, state_data, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (session_id)
DO UPDATE SET
state_data = EXCLUDED.state_data,
updated_at = NOW()
""", state.session_id, json.dumps(state.to_dict()))
finally:
await conn.close()
async def load(self, session_id: str) -> AgentState | None:
import asyncpg
conn = await asyncpg.connect(self.conn_str)
try:
row = await conn.fetchrow(
"SELECT state_data FROM agent_states WHERE session_id = $1",
session_id
)
if not row:
return None
return AgentState.from_dict(json.loads(row["state_data"]))
finally:
await conn.close()
async def delete(self, session_id: str) -> None:
import asyncpg
conn = await asyncpg.connect(self.conn_str)
try:
await conn.execute(
"DELETE FROM agent_states WHERE session_id = $1",
session_id
)
finally:
await conn.close()
-- PostgreSQL 테이블 생성
CREATE TABLE agent_states (
session_id TEXT PRIMARY KEY,
state_data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 인덱스 (상태별 조회용)
CREATE INDEX idx_agent_states_status
ON agent_states ((state_data->>'status'));
실전 3 — 체크포인트 에이전트 구현
import uuid
import asyncio
from typing import Callable
class CheckpointAgent:
"""체크포인트 기반 장기 실행 에이전트"""
def __init__(
self,
store: StateStore,
llm_client,
checkpoint_every: int = 1 # 몇 스텝마다 저장할지
):
self.store = store
self.llm = llm_client
self.checkpoint_every = checkpoint_every
async def run(
self,
task: str,
session_id: str = None,
resume: bool = False
) -> str:
"""
에이전트 실행 (재시작 지원)
Args:
task: 작업 지시
session_id: 세션 ID (없으면 자동 생성)
resume: True면 기존 세션 재개
"""
# 상태 초기화 또는 복원
if resume and session_id:
state = await self.store.load(session_id)
if state:
print(f"세션 {session_id} 재개 (스텝 {state.current_step}부터)")
else:
print(f"저장된 상태 없음 — 새로 시작")
state = self._new_state(task, session_id)
else:
session_id = session_id or str(uuid.uuid4())
state = self._new_state(task, session_id)
# 이미 완료된 세션
if state.status == AgentStatus.COMPLETED:
return self._summarize(state)
state.status = AgentStatus.RUNNING
await self.store.save(state)
try:
# 메인 실행 루프
while state.status == AgentStatus.RUNNING:
# LLM에게 다음 행동 결정 요청
action = await self._decide_next_action(state)
if action is None:
# 작업 완료
state.status = AgentStatus.COMPLETED
break
# 액션 실행
result = await self._execute_action(action, state)
# 결과 기록
step_result = StepResult(
step_id=f"step_{state.current_step}",
tool_name=action["tool"],
input=action["args"],
output=result,
success=True
)
state.steps.append(step_result)
state.current_step += 1
# 주기적 체크포인트 저장
if state.current_step % self.checkpoint_every == 0:
await self.store.save(state)
print(f"체크포인트 저장 (스텝 {state.current_step})")
# 최종 상태 저장
await self.store.save(state)
return self._summarize(state)
except Exception as e:
state.status = AgentStatus.FAILED
state.context["last_error"] = str(e)
await self.store.save(state)
raise
def _new_state(self, task: str, session_id: str) -> AgentState:
return AgentState(
session_id=session_id,
task=task,
status=AgentStatus.IDLE
)
async def _decide_next_action(self, state: AgentState) -> dict | None:
"""LLM에게 다음 행동 결정 요청"""
# 지금까지의 상태를 LLM 컨텍스트로 구성
context = self._build_context(state)
response = await self.llm.complete(context)
if response.is_done:
return None
return response.next_action
def _build_context(self, state: AgentState) -> str:
"""상태 → LLM 컨텍스트 변환 (컨텍스트 윈도우 절약)"""
# 최근 N개 스텝만 포함 (전체 히스토리는 너무 길어짐)
recent_steps = state.steps[-10:]
return f"""
작업: {state.task}
현재 스텝: {state.current_step}
지금까지 한 일:
{chr(10).join(f"- {s.tool_name}: {s.output}" for s in recent_steps)}
다음에 무엇을 해야 하나요? 완료됐으면 DONE을 반환하세요.
"""
def _summarize(self, state: AgentState) -> str:
"""최종 결과 요약"""
return f"완료 ({state.current_step}개 스텝, {len(state.steps)}개 액션)"
# 사용 예시
async def main():
store = RedisStateStore()
agent = CheckpointAgent(store=store, llm_client=my_llm)
# 첫 실행
session_id = "task_20260428_001"
await agent.run(
task="레포지토리 100개 파일 분석 후 보고서 작성",
session_id=session_id
)
# 실패 후 재시작
await agent.run(
task="레포지토리 100개 파일 분석 후 보고서 작성",
session_id=session_id,
resume=True # 47번째 스텝부터 재개
)
실전 4 — LangGraph Checkpointer 활용
LangGraph를 쓴다면 내장 체크포인터를 사용하는 게 제일 빠릅니다.
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.redis import RedisSaver
from typing import TypedDict, Annotated
import operator
# 그래프 상태 정의
class GraphState(TypedDict):
task: str
files_analyzed: Annotated[list, operator.add] # 리스트 누적
current_file: int
report: str
errors: Annotated[list, operator.add]
# 노드 함수들
async def analyze_file(state: GraphState) -> dict:
"""파일 하나 분석"""
file_idx = state["current_file"]
# 실제 분석 로직
result = f"파일 {file_idx} 분석 완료"
return {
"files_analyzed": [result],
"current_file": file_idx + 1
}
async def generate_report(state: GraphState) -> dict:
"""보고서 생성"""
report = "\n".join(state["files_analyzed"])
return {"report": report}
def should_continue(state: GraphState) -> str:
"""다음 노드 결정"""
if state["current_file"] >= 100:
return "generate_report"
return "analyze_file"
# 그래프 구성
workflow = StateGraph(GraphState)
workflow.add_node("analyze_file", analyze_file)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_file")
workflow.add_conditional_edges("analyze_file", should_continue)
workflow.add_edge("generate_report", END)
# 체크포인터 설정 (SQLite — 로컬 개발용)
checkpointer = SqliteSaver.from_conn_string("./checkpoints.db")
# Redis 체크포인터 (프로덕션)
# checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
app = workflow.compile(checkpointer=checkpointer)
# 실행
config = {"configurable": {"thread_id": "task_20260428_001"}}
# 첫 실행
result = await app.ainvoke(
{"task": "파일 100개 분석", "current_file": 0,
"files_analyzed": [], "report": "", "errors": []},
config=config
)
# 재시작 (자동으로 마지막 체크포인트에서 재개)
result = await app.ainvoke(None, config=config)
[LangGraph Checkpointer 지원 백엔드]
→ SqliteSaver: 로컬 개발, 단일 프로세스
→ RedisSaver: 프로덕션, 빠른 읽기/쓰기
→ PostgresSaver: 영구 보존, 복잡한 쿼리 필요 시
→ MemorySaver: 테스트용 (프로세스 종료 시 소멸)
실전 5 — 메모리 계층 구조
장기 에이전트는 메모리를 4개 계층으로 나눠서 관리합니다.
class AgentMemorySystem:
"""4계층 메모리 시스템"""
def __init__(self, redis_client, db_client, embedding_client):
self.redis = redis_client # L1: 작업 메모리
self.db = db_client # L2: 에피소드 메모리
self.embedding = embedding_client # L3: 시맨틱 검색용
# ===== L1: 작업 메모리 (현재 세션, TTL 있음) =====
async def get_working_memory(self, session_id: str) -> dict:
"""현재 세션의 단기 컨텍스트"""
data = self.redis.get(f"working:{session_id}")
return json.loads(data) if data else {}
async def update_working_memory(self, session_id: str, updates: dict):
existing = await self.get_working_memory(session_id)
existing.update(updates)
self.redis.setex(
f"working:{session_id}",
3600, # 1시간 TTL
json.dumps(existing)
)
# ===== L2: 에피소드 메모리 (완료된 작업 기록) =====
async def save_episode(self, session_id: str, summary: str, outcome: str):
"""완료된 작업을 에피소드로 저장"""
await self.db.execute("""
INSERT INTO agent_episodes
(session_id, summary, outcome, embedding, created_at)
VALUES ($1, $2, $3, $4, NOW())
""", session_id, summary, outcome,
await self._embed(summary))
# ===== L3: 시맨틱 메모리 (유사 경험 검색) =====
async def recall_similar(self, query: str, limit: int = 3) -> list[dict]:
"""현재 작업과 유사한 과거 에피소드 검색"""
query_embedding = await self._embed(query)
return await self.db.fetch("""
SELECT summary, outcome,
1 - (embedding <=> $1) as similarity
FROM agent_episodes
ORDER BY similarity DESC
LIMIT $2
""", query_embedding, limit)
# ===== L4: 절차 메모리 (학습된 패턴) =====
async def get_learned_patterns(self, task_type: str) -> list[str]:
"""성공한 작업 패턴 조회"""
rows = await self.db.fetch("""
SELECT pattern FROM learned_patterns
WHERE task_type = $1
AND success_rate > 0.8
ORDER BY success_rate DESC
LIMIT 5
""", task_type)
return [r["pattern"] for r in rows]
async def _embed(self, text: str) -> list[float]:
"""텍스트 → 임베딩 벡터"""
return await self.embedding.embed(text)
# 에이전트에서 메모리 시스템 활용
class MemoryAwareAgent:
def __init__(self, memory: AgentMemorySystem, ...):
self.memory = memory
async def run(self, task: str, session_id: str):
# 유사한 과거 경험 검색
similar_episodes = await self.memory.recall_similar(task)
# 학습된 패턴 적용
patterns = await self.memory.get_learned_patterns("coding")
# 컨텍스트에 과거 경험 주입
context = self._build_context_with_memory(
task, similar_episodes, patterns
)
# 실행...
result = await self._execute(context, session_id)
# 완료 후 에피소드 저장
await self.memory.save_episode(
session_id=session_id,
summary=f"작업: {task[:100]}",
outcome=result
)
return result
[4계층 메모리 요약]
L1 작업 메모리: 현재 세션 컨텍스트, Redis TTL, 1시간
L2 에피소드: 완료된 작업 기록, PostgreSQL, 영구 보존
L3 시맨틱: 유사 경험 벡터 검색, pgvector, 과거 참고
L4 절차: 성공 패턴 학습, PostgreSQL, 반복 작업 최적화
마무리
✅ 상태 관리 필수인 상황
→ 30분 이상 걸리는 장기 에이전트 작업
→ 외부 API 호출이 많아 중간 실패 가능성이 높을 때
→ 비용이 비싼 작업 (재시도 시 비용 두 배)
→ 사용자가 진행 상황을 확인해야 할 때
→ 멀티 에이전트 협업 (공유 상태 필요)
❌ 과한 경우
→ 30초 이내 완료되는 단순 Q&A
→ 실패해도 재시작 비용이 낮은 작업
→ 프로토타입/실험 단계
[구현 순서 추천]
1단계: 인메모리 상태 (dict) — 가장 단순
2단계: Redis 체크포인트 — 재시작 지원
3단계: PostgreSQL 영구 저장 — 감사 추적 필요 시
4단계: 벡터 메모리 — 경험 기반 학습 필요 시
관련 글:
https://cell-devlog.tistory.com/29
AI 에이전트가 기억하는 법 — 단기/장기 메모리 아키텍처와 MemGPT 완전 정리
AI 에이전트를 쓰다 보면 이런 답답함이 생겨요."지난주에 분명히 말했는데 또 처음부터 설명해야 하네."LLM은 기본적으로 상태가 없어요(stateless). 대화가 끝나면 모든 걸 잊어요. 컨텍스트 창 안
cell-devlog.tistory.com
https://cell-devlog.tistory.com/14
Thought, Action, Observation을 코드로 — LangGraph + ReAct 완전 정리
AI 에이전트를 만들다 보면 이런 상황이 생깁니다."LLM이 도구를 써야 할 때도 있고, 바로 답할 수 있을 때도 있는데 이걸 어떻게 처리하지?"이걸 깔끔하게 해결하는 패턴이 ReAct이고, 이를 코드로
cell-devlog.tistory.com
https://cell-devlog.tistory.com/24
쿼리 재작성, 반복 검색, 멀티소스 라우팅 — Agentic RAG 동작 원리와 동적 검색 전략 완전 정리
RAG 시스템을 만들고 나면 이런 한계가 생겨요."단순한 질문은 잘 답하는데, '2024년 실적을 바탕으로 2025년 전략을 분석해줘' 같은 복잡한 질문은 엉뚱한 답이 나온다."이건 일반 RAG의 구조적 한계
cell-devlog.tistory.com
'AI Agent' 카테고리의 다른 글
| LLM-as-Judge 완전 가이드 — AI로 AI 출력을 자동 평가하는 법 (0) | 2026.04.30 |
|---|---|
| AI 에이전트 롤백 전략 완전 가이드 — 에이전트가 망쳤을 때 복구하는 법 (0) | 2026.04.28 |
| AI 에이전트 테스트 전략 완전 가이드 — 단위 테스트부터 통합 테스트, E2E까지 (0) | 2026.04.28 |
| MCP 9700만 설치 — Linux Foundation 오픈 거버넌스 채택, AI 에이전트 표준 인프라가 됐습니다 (0) | 2026.04.28 |
| Fabric MCP 서버 완전 가이드 — Claude Code에 240개 AI 패턴과 개발자 지식베이스 연결하기 (0) | 2026.04.27 |