본문 바로가기

AI Agent

AI 에이전트 상태 관리 완전 가이드 — 장기 실행 에이전트에서 상태를 잃지 않는 법

반응형

에이전트가 30분 작업 중 20분에 크래시났습니다. 처음부터 다시 시작합니다. 이 문제를 구조적으로 해결하는 법을 정리했습니다.

[핵심 요약]
→ 문제: LLM 컨텍스트는 세션 종료 시 사라짐 → 장기 작업에서 치명적
→ 해결: 상태를 외부 저장소에 명시적으로 영속화
→ 레이어: 메모리 계층 (인메모리 → Redis → DB) 구분
→ 핵심 패턴: Checkpoint, Event Sourcing, 체크포인트 재시작
→ 도구: LangGraph Checkpointer, Redis, Supabase, MemGPT 패턴
→ 원칙: 에이전트가 죽어도 상태는 살아있어야 함

왜 상태 관리가 어려운가

일반 소프트웨어 상태 관리:
→ 변수에 값 저장 → 프로세스가 살아있는 동안 유지
→ DB 저장 → 영구 보존

AI 에이전트 상태 관리:
→ 컨텍스트 윈도우 = 단기 메모리 (세션 종료 시 소멸)
→ 긴 작업 = 컨텍스트 한도 초과
→ 멀티 에이전트 = 서로 다른 컨텍스트
→ 재시작 = 처음부터 다시

문제 상황:
에이전트: "파일 100개 분석 중... 47번째 분석 중..."
[서버 재시작]
에이전트: "안녕하세요! 무엇을 도와드릴까요?"
→ 47개 분석 결과 전부 소멸

 

[상태의 3가지 종류]
→ 실행 상태: 현재 어떤 스텝에 있는지, 무엇을 하고 있는지
→ 작업 상태: 지금까지 한 일, 수집한 데이터, 결정 내역
→ 메모리 상태: 이전 세션에서 학습한 정보, 사용자 선호도

실전 1 — 상태 계층 설계

from dataclasses import dataclass, field
from typing import Any
from enum import Enum
from datetime import datetime
import json

class AgentStatus(Enum):
    IDLE      = "idle"
    RUNNING   = "running"
    PAUSED    = "paused"
    COMPLETED = "completed"
    FAILED    = "failed"

@dataclass
class StepResult:
    """단일 스텝 실행 결과"""
    step_id:    str
    tool_name:  str
    input:      dict
    output:     Any
    timestamp:  str = field(default_factory=lambda: datetime.now().isoformat())
    success:    bool = True
    error:      str = None

@dataclass
class AgentState:
    """에이전트 전체 상태"""
    session_id:    str                    # 세션 고유 ID
    task:          str                    # 원래 작업 지시
    status:        AgentStatus            # 현재 상태
    current_step:  int = 0               # 몇 번째 스텝인지
    total_steps:   int = None            # 총 스텝 수 (알 경우)
    steps:         list[StepResult] = field(default_factory=list)
    context:       dict = field(default_factory=dict)  # 자유 형식 컨텍스트
    created_at:    str = field(default_factory=lambda: datetime.now().isoformat())
    updated_at:    str = field(default_factory=lambda: datetime.now().isoformat())

    def to_dict(self) -> dict:
        return {
            "session_id":   self.session_id,
            "task":         self.task,
            "status":       self.status.value,
            "current_step": self.current_step,
            "total_steps":  self.total_steps,
            "steps":        [vars(s) for s in self.steps],
            "context":      self.context,
            "created_at":   self.created_at,
            "updated_at":   self.updated_at
        }

    @classmethod
    def from_dict(cls, data: dict) -> "AgentState":
        data["status"] = AgentStatus(data["status"])
        data["steps"]  = [StepResult(**s) for s in data.get("steps", [])]
        return cls(**data)
[상태 설계 원칙]
→ 직렬화 가능: JSON으로 변환 가능해야 DB/Redis에 저장 가능
→ 재현 가능: 상태만 보고 어디서 멈췄는지 파악 가능
→ 최소화: 꼭 필요한 것만 저장 (컨텍스트 전체는 저장하지 않음)
→ 버전 관리: 상태 스키마 변경 시 마이그레이션 고려

실전 2 — 체크포인트 저장소 구현

import redis
import json
from abc import ABC, abstractmethod

class StateStore(ABC):
    """상태 저장소 추상 인터페이스"""

    @abstractmethod
    async def save(self, state: AgentState) -> None: ...

    @abstractmethod
    async def load(self, session_id: str) -> AgentState | None: ...

    @abstractmethod
    async def delete(self, session_id: str) -> None: ...


class RedisStateStore(StateStore):
    """Redis 기반 상태 저장소 — 빠른 읽기/쓰기, TTL 지원"""

    def __init__(self, host: str = "localhost", port: int = 6379,
                 ttl_seconds: int = 86400):
        self.redis = redis.Redis(host=host, port=port, decode_responses=True)
        self.ttl   = ttl_seconds  # 24시간 기본값

    def _key(self, session_id: str) -> str:
        return f"agent:state:{session_id}"

    async def save(self, state: AgentState) -> None:
        state.updated_at = datetime.now().isoformat()
        self.redis.setex(
            self._key(state.session_id),
            self.ttl,
            json.dumps(state.to_dict(), ensure_ascii=False)
        )

    async def load(self, session_id: str) -> AgentState | None:
        data = self.redis.get(self._key(session_id))
        if not data:
            return None
        return AgentState.from_dict(json.loads(data))

    async def delete(self, session_id: str) -> None:
        self.redis.delete(self._key(session_id))


class PostgresStateStore(StateStore):
    """PostgreSQL 기반 상태 저장소 — 영구 보존, 쿼리 가능"""

    def __init__(self, connection_string: str):
        import asyncpg
        self.conn_str = connection_string

    async def save(self, state: AgentState) -> None:
        import asyncpg
        conn = await asyncpg.connect(self.conn_str)
        try:
            await conn.execute("""
                INSERT INTO agent_states (session_id, state_data, updated_at)
                VALUES ($1, $2, NOW())
                ON CONFLICT (session_id)
                DO UPDATE SET
                    state_data = EXCLUDED.state_data,
                    updated_at = NOW()
            """, state.session_id, json.dumps(state.to_dict()))
        finally:
            await conn.close()

    async def load(self, session_id: str) -> AgentState | None:
        import asyncpg
        conn = await asyncpg.connect(self.conn_str)
        try:
            row = await conn.fetchrow(
                "SELECT state_data FROM agent_states WHERE session_id = $1",
                session_id
            )
            if not row:
                return None
            return AgentState.from_dict(json.loads(row["state_data"]))
        finally:
            await conn.close()

    async def delete(self, session_id: str) -> None:
        import asyncpg
        conn = await asyncpg.connect(self.conn_str)
        try:
            await conn.execute(
                "DELETE FROM agent_states WHERE session_id = $1",
                session_id
            )
        finally:
            await conn.close()
-- PostgreSQL 테이블 생성
CREATE TABLE agent_states (
    session_id  TEXT PRIMARY KEY,
    state_data  JSONB NOT NULL,
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    updated_at  TIMESTAMPTZ DEFAULT NOW()
);

-- 인덱스 (상태별 조회용)
CREATE INDEX idx_agent_states_status
ON agent_states ((state_data->>'status'));

실전 3 — 체크포인트 에이전트 구현

import uuid
import asyncio
from typing import Callable

class CheckpointAgent:
    """체크포인트 기반 장기 실행 에이전트"""

    def __init__(
        self,
        store: StateStore,
        llm_client,
        checkpoint_every: int = 1  # 몇 스텝마다 저장할지
    ):
        self.store            = store
        self.llm              = llm_client
        self.checkpoint_every = checkpoint_every

    async def run(
        self,
        task: str,
        session_id: str = None,
        resume: bool = False
    ) -> str:
        """
        에이전트 실행 (재시작 지원)

        Args:
            task: 작업 지시
            session_id: 세션 ID (없으면 자동 생성)
            resume: True면 기존 세션 재개
        """
        # 상태 초기화 또는 복원
        if resume and session_id:
            state = await self.store.load(session_id)
            if state:
                print(f"세션 {session_id} 재개 (스텝 {state.current_step}부터)")
            else:
                print(f"저장된 상태 없음 — 새로 시작")
                state = self._new_state(task, session_id)
        else:
            session_id = session_id or str(uuid.uuid4())
            state = self._new_state(task, session_id)

        # 이미 완료된 세션
        if state.status == AgentStatus.COMPLETED:
            return self._summarize(state)

        state.status = AgentStatus.RUNNING
        await self.store.save(state)

        try:
            # 메인 실행 루프
            while state.status == AgentStatus.RUNNING:
                # LLM에게 다음 행동 결정 요청
                action = await self._decide_next_action(state)

                if action is None:
                    # 작업 완료
                    state.status = AgentStatus.COMPLETED
                    break

                # 액션 실행
                result = await self._execute_action(action, state)

                # 결과 기록
                step_result = StepResult(
                    step_id=f"step_{state.current_step}",
                    tool_name=action["tool"],
                    input=action["args"],
                    output=result,
                    success=True
                )
                state.steps.append(step_result)
                state.current_step += 1

                # 주기적 체크포인트 저장
                if state.current_step % self.checkpoint_every == 0:
                    await self.store.save(state)
                    print(f"체크포인트 저장 (스텝 {state.current_step})")

            # 최종 상태 저장
            await self.store.save(state)
            return self._summarize(state)

        except Exception as e:
            state.status = AgentStatus.FAILED
            state.context["last_error"] = str(e)
            await self.store.save(state)
            raise

    def _new_state(self, task: str, session_id: str) -> AgentState:
        return AgentState(
            session_id=session_id,
            task=task,
            status=AgentStatus.IDLE
        )

    async def _decide_next_action(self, state: AgentState) -> dict | None:
        """LLM에게 다음 행동 결정 요청"""
        # 지금까지의 상태를 LLM 컨텍스트로 구성
        context = self._build_context(state)

        response = await self.llm.complete(context)

        if response.is_done:
            return None
        return response.next_action

    def _build_context(self, state: AgentState) -> str:
        """상태 → LLM 컨텍스트 변환 (컨텍스트 윈도우 절약)"""
        # 최근 N개 스텝만 포함 (전체 히스토리는 너무 길어짐)
        recent_steps = state.steps[-10:]

        return f"""
작업: {state.task}
현재 스텝: {state.current_step}
지금까지 한 일:
{chr(10).join(f"- {s.tool_name}: {s.output}" for s in recent_steps)}

다음에 무엇을 해야 하나요? 완료됐으면 DONE을 반환하세요.
"""

    def _summarize(self, state: AgentState) -> str:
        """최종 결과 요약"""
        return f"완료 ({state.current_step}개 스텝, {len(state.steps)}개 액션)"


# 사용 예시
async def main():
    store = RedisStateStore()
    agent = CheckpointAgent(store=store, llm_client=my_llm)

    # 첫 실행
    session_id = "task_20260428_001"
    await agent.run(
        task="레포지토리 100개 파일 분석 후 보고서 작성",
        session_id=session_id
    )

    # 실패 후 재시작
    await agent.run(
        task="레포지토리 100개 파일 분석 후 보고서 작성",
        session_id=session_id,
        resume=True  # 47번째 스텝부터 재개
    )

실전 4 — LangGraph Checkpointer 활용

LangGraph를 쓴다면 내장 체크포인터를 사용하는 게 제일 빠릅니다.

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.redis import RedisSaver
from typing import TypedDict, Annotated
import operator

# 그래프 상태 정의
class GraphState(TypedDict):
    task:           str
    files_analyzed: Annotated[list, operator.add]  # 리스트 누적
    current_file:   int
    report:         str
    errors:         Annotated[list, operator.add]

# 노드 함수들
async def analyze_file(state: GraphState) -> dict:
    """파일 하나 분석"""
    file_idx = state["current_file"]
    # 실제 분석 로직
    result = f"파일 {file_idx} 분석 완료"
    return {
        "files_analyzed": [result],
        "current_file": file_idx + 1
    }

async def generate_report(state: GraphState) -> dict:
    """보고서 생성"""
    report = "\n".join(state["files_analyzed"])
    return {"report": report}

def should_continue(state: GraphState) -> str:
    """다음 노드 결정"""
    if state["current_file"] >= 100:
        return "generate_report"
    return "analyze_file"

# 그래프 구성
workflow = StateGraph(GraphState)
workflow.add_node("analyze_file", analyze_file)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_file")
workflow.add_conditional_edges("analyze_file", should_continue)
workflow.add_edge("generate_report", END)

# 체크포인터 설정 (SQLite — 로컬 개발용)
checkpointer = SqliteSaver.from_conn_string("./checkpoints.db")

# Redis 체크포인터 (프로덕션)
# checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")

app = workflow.compile(checkpointer=checkpointer)

# 실행
config = {"configurable": {"thread_id": "task_20260428_001"}}

# 첫 실행
result = await app.ainvoke(
    {"task": "파일 100개 분석", "current_file": 0,
     "files_analyzed": [], "report": "", "errors": []},
    config=config
)

# 재시작 (자동으로 마지막 체크포인트에서 재개)
result = await app.ainvoke(None, config=config)
[LangGraph Checkpointer 지원 백엔드]
→ SqliteSaver: 로컬 개발, 단일 프로세스
→ RedisSaver: 프로덕션, 빠른 읽기/쓰기
→ PostgresSaver: 영구 보존, 복잡한 쿼리 필요 시
→ MemorySaver: 테스트용 (프로세스 종료 시 소멸)

실전 5 — 메모리 계층 구조

장기 에이전트는 메모리를 4개 계층으로 나눠서 관리합니다.

class AgentMemorySystem:
    """4계층 메모리 시스템"""

    def __init__(self, redis_client, db_client, embedding_client):
        self.redis     = redis_client  # L1: 작업 메모리
        self.db        = db_client     # L2: 에피소드 메모리
        self.embedding = embedding_client  # L3: 시맨틱 검색용

    # ===== L1: 작업 메모리 (현재 세션, TTL 있음) =====
    async def get_working_memory(self, session_id: str) -> dict:
        """현재 세션의 단기 컨텍스트"""
        data = self.redis.get(f"working:{session_id}")
        return json.loads(data) if data else {}

    async def update_working_memory(self, session_id: str, updates: dict):
        existing = await self.get_working_memory(session_id)
        existing.update(updates)
        self.redis.setex(
            f"working:{session_id}",
            3600,  # 1시간 TTL
            json.dumps(existing)
        )

    # ===== L2: 에피소드 메모리 (완료된 작업 기록) =====
    async def save_episode(self, session_id: str, summary: str, outcome: str):
        """완료된 작업을 에피소드로 저장"""
        await self.db.execute("""
            INSERT INTO agent_episodes
            (session_id, summary, outcome, embedding, created_at)
            VALUES ($1, $2, $3, $4, NOW())
        """, session_id, summary, outcome,
           await self._embed(summary))

    # ===== L3: 시맨틱 메모리 (유사 경험 검색) =====
    async def recall_similar(self, query: str, limit: int = 3) -> list[dict]:
        """현재 작업과 유사한 과거 에피소드 검색"""
        query_embedding = await self._embed(query)
        return await self.db.fetch("""
            SELECT summary, outcome,
                   1 - (embedding <=> $1) as similarity
            FROM agent_episodes
            ORDER BY similarity DESC
            LIMIT $2
        """, query_embedding, limit)

    # ===== L4: 절차 메모리 (학습된 패턴) =====
    async def get_learned_patterns(self, task_type: str) -> list[str]:
        """성공한 작업 패턴 조회"""
        rows = await self.db.fetch("""
            SELECT pattern FROM learned_patterns
            WHERE task_type = $1
            AND success_rate > 0.8
            ORDER BY success_rate DESC
            LIMIT 5
        """, task_type)
        return [r["pattern"] for r in rows]

    async def _embed(self, text: str) -> list[float]:
        """텍스트 → 임베딩 벡터"""
        return await self.embedding.embed(text)


# 에이전트에서 메모리 시스템 활용
class MemoryAwareAgent:

    def __init__(self, memory: AgentMemorySystem, ...):
        self.memory = memory

    async def run(self, task: str, session_id: str):
        # 유사한 과거 경험 검색
        similar_episodes = await self.memory.recall_similar(task)

        # 학습된 패턴 적용
        patterns = await self.memory.get_learned_patterns("coding")

        # 컨텍스트에 과거 경험 주입
        context = self._build_context_with_memory(
            task, similar_episodes, patterns
        )

        # 실행...
        result = await self._execute(context, session_id)

        # 완료 후 에피소드 저장
        await self.memory.save_episode(
            session_id=session_id,
            summary=f"작업: {task[:100]}",
            outcome=result
        )

        return result
[4계층 메모리 요약]
L1 작업 메모리:  현재 세션 컨텍스트, Redis TTL, 1시간
L2 에피소드:     완료된 작업 기록, PostgreSQL, 영구 보존
L3 시맨틱:       유사 경험 벡터 검색, pgvector, 과거 참고
L4 절차:         성공 패턴 학습, PostgreSQL, 반복 작업 최적화

마무리

✅ 상태 관리 필수인 상황
→ 30분 이상 걸리는 장기 에이전트 작업
→ 외부 API 호출이 많아 중간 실패 가능성이 높을 때
→ 비용이 비싼 작업 (재시도 시 비용 두 배)
→ 사용자가 진행 상황을 확인해야 할 때
→ 멀티 에이전트 협업 (공유 상태 필요)

❌ 과한 경우
→ 30초 이내 완료되는 단순 Q&A
→ 실패해도 재시작 비용이 낮은 작업
→ 프로토타입/실험 단계

[구현 순서 추천]
1단계: 인메모리 상태 (dict) — 가장 단순
2단계: Redis 체크포인트 — 재시작 지원
3단계: PostgreSQL 영구 저장 — 감사 추적 필요 시
4단계: 벡터 메모리 — 경험 기반 학습 필요 시

 


관련 글:

https://cell-devlog.tistory.com/29

 

AI 에이전트가 기억하는 법 — 단기/장기 메모리 아키텍처와 MemGPT 완전 정리

AI 에이전트를 쓰다 보면 이런 답답함이 생겨요."지난주에 분명히 말했는데 또 처음부터 설명해야 하네."LLM은 기본적으로 상태가 없어요(stateless). 대화가 끝나면 모든 걸 잊어요. 컨텍스트 창 안

cell-devlog.tistory.com

 

https://cell-devlog.tistory.com/14

 

Thought, Action, Observation을 코드로 — LangGraph + ReAct 완전 정리

AI 에이전트를 만들다 보면 이런 상황이 생깁니다."LLM이 도구를 써야 할 때도 있고, 바로 답할 수 있을 때도 있는데 이걸 어떻게 처리하지?"이걸 깔끔하게 해결하는 패턴이 ReAct이고, 이를 코드로

cell-devlog.tistory.com

https://cell-devlog.tistory.com/24

 

쿼리 재작성, 반복 검색, 멀티소스 라우팅 — Agentic RAG 동작 원리와 동적 검색 전략 완전 정리

RAG 시스템을 만들고 나면 이런 한계가 생겨요."단순한 질문은 잘 답하는데, '2024년 실적을 바탕으로 2025년 전략을 분석해줘' 같은 복잡한 질문은 엉뚱한 답이 나온다."이건 일반 RAG의 구조적 한계

cell-devlog.tistory.com

 

반응형