본문 바로가기

Claude

LLM 배치 처리 실전 — Anthropic Message Batches API로 비용 50% 절감

반응형

사용자가 화면을 보며 기다리나요? 동기 API. 사용자가 제출하고 다른 일을 하나요? 배치 API. 이 한 줄 판단으로 비용이 절반이 됩니다.


핵심 요약 → Message Batches API: 최대 100,000 요청을 단일 배치로 제출, 24시간 내 결과 → 비용: 입력+출력 토큰 모두 표준 가격의 정확히 50% — 요청 수와 무관, 10개도 50% → 출력 토큰 한도: 동기 API 대비 대폭 확대 (beta 헤더로 최대 300K/요청) → 표준 rate limit 별도 — 배치가 일반 API 한도에 영향 없음 → 2026년 3월: output-300k-2026-03-24 베타 헤더로 300K 출력 토큰 지원 → 실전 사례: 782 파일 처리 → 8 배치 → 25분 (100% 성공률) → 한계: 진행 상황 실시간 추적 불가, 개별 취소 불가 — 프로덕션 주의사항 있음 → 캐싱 + 배치 스태킹: 반복 시스템 프롬프트면 추가 85~90% 절감 가능


언제 배치 API를 쓰나 — 단 하나의 판단 기준

핵심 규칙: 사용자가 화면을 보며 결과를 기다린다면 표준 Messages API를 쓰세요. 사용자가 작업을 제출하고 다른 일로 넘어간다면 배치 API를 쓰세요.

# 배치 API 적합 판단 트리

def should_use_batch_api(task: dict) -> tuple[bool, str]:

    # ❌ 배치 API 금지 케이스
    if task.get("user_waiting_realtime"):
        return False, "사용자가 실시간 대기 중 → 표준 API"

    if task.get("request_count", 0) < 100:
        return False, "100개 미만 → 폴링 오버헤드가 할인보다 클 수 있음"

    if task.get("max_latency_hours", 24) < 1:
        return False, "1시간 이내 결과 필요 → 표준 API"

    # ✅ 배치 API 최적 케이스
    batch_use_cases = [
        "document_processing",     # 문서 대량 처리
        "data_enrichment",         # 데이터 보강 파이프라인
        "nightly_analytics",       # 야간 분석 배치
        "offline_evaluation",      # LLM Eval 오프라인 실행
        "content_generation_queue", # 콘텐츠 생성 큐
        "translation_pipeline",    # 대량 번역
        "classification_at_scale", # 대규모 분류
    ]

    if task.get("type") in batch_use_cases:
        return True, f"{task['type']} → 배치 API (50% 절감)"

    return True, "비동기 허용 워크로드 → 배치 API 권장"

1. 기본 사용법 — 배치 생성부터 결과 수신까지

import anthropic
import time
from anthropic.types.beta.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.beta.messages.batch_create_params import Request

client = anthropic.Anthropic()


# ── 1단계: 배치 생성 ──

def create_batch(texts: list[str], model: str = "claude-sonnet-4-6") -> str:
    """
    텍스트 리스트를 배치로 제출
    custom_id: 나중에 결과와 매칭할 식별자
    """
    requests = [
        Request(
            custom_id=f"item-{i:06d}",      # 결과 매칭용 ID
            params=MessageCreateParamsNonStreaming(
                model=model,
                max_tokens=1024,
                messages=[{
                    "role": "user",
                    "content": f"다음 텍스트의 감정을 분석해줘 (positive/negative/neutral):\n\n{text}"
                }]
            )
        )
        for i, text in enumerate(texts)
    ]

    batch = client.beta.messages.batches.create(requests=requests)

    print(f"배치 생성 완료: {batch.id}")
    print(f"요청 수: {batch.request_counts.processing}")
    return batch.id


# ── 2단계: 완료 대기 (폴링) ──

def wait_for_batch(batch_id: str, poll_interval: int = 60) -> object:
    """
    배치 완료까지 폴링
    대부분 1시간 이내 완료, 최대 24시간
    """
    print(f"배치 처리 대기 중: {batch_id}")

    while True:
        batch = client.beta.messages.batches.retrieve(batch_id)

        counts = batch.request_counts
        total = counts.processing + counts.succeeded + counts.errored + counts.canceled + counts.expired

        print(
            f"  진행: {counts.succeeded}✅ {counts.errored}❌ "
            f"{counts.processing}⏳ / {total}개 "
            f"({counts.succeeded/total*100:.0f}%)"
        )

        if batch.processing_status == "ended":
            print(f"배치 완료!")
            return batch

        time.sleep(poll_interval)


# ── 3단계: 결과 수신 ──

def get_batch_results(batch_id: str) -> dict:
    """
    배치 결과를 custom_id → 결과 딕셔너리로 반환
    """
    results = {}

    for result in client.beta.messages.batches.results(batch_id):
        cid = result.custom_id

        if result.result.type == "succeeded":
            results[cid] = {
                "status": "success",
                "text": result.result.message.content[0].text,
                "input_tokens": result.result.message.usage.input_tokens,
                "output_tokens": result.result.message.usage.output_tokens,
            }
        elif result.result.type == "errored":
            results[cid] = {
                "status": "error",
                "error": str(result.result.error),
            }

    return results


# ── 전체 파이프라인 ──

def process_large_dataset(texts: list[str]) -> dict:
    """
    대량 텍스트 감정 분석 파이프라인
    """
    batch_id = create_batch(texts)
    wait_for_batch(batch_id)
    results = get_batch_results(batch_id)

    success_count = sum(1 for r in results.values() if r["status"] == "success")
    print(f"\n완료: {success_count}/{len(texts)} 성공")

    return results


# 실행
texts = [f"고객 리뷰 {i}: ..." for i in range(10_000)]
results = process_large_dataset(texts)

2. 비용 계산 — 실제 절감 규모

# 배치 API 비용 절감 계산

STANDARD_PRICES = {
    "claude-haiku-4-5":    {"input": 0.80,  "output": 4.00},
    "claude-sonnet-4-6":   {"input": 3.00,  "output": 15.00},
    "claude-opus-4-7":     {"input": 5.00,  "output": 25.00},
}

def calculate_batch_savings(
    model: str,
    monthly_requests: int,
    avg_input_tokens: int = 500,
    avg_output_tokens: int = 200,
) -> dict:
    """
    월 배치 비용 절감 계산
    """
    prices = STANDARD_PRICES[model]

    monthly_input_tokens  = monthly_requests * avg_input_tokens
    monthly_output_tokens = monthly_requests * avg_output_tokens

    # 표준 API 비용
    standard_cost = (
        monthly_input_tokens / 1_000_000 * prices["input"] +
        monthly_output_tokens / 1_000_000 * prices["output"]
    )

    # 배치 API 비용 (50% 할인)
    batch_cost = standard_cost * 0.5

    # 캐싱 + 배치 스태킹
    # 시스템 프롬프트 1000 토큰을 모든 요청에서 재사용한다면
    cached_portion = 1000 * monthly_requests / 1_000_000 * prices["input"]
    cache_savings  = cached_portion * 0.90   # 캐시 히트 시 90% 절감
    stacked_cost   = batch_cost - cache_savings

    return {
        "model": model,
        "monthly_requests": f"{monthly_requests:,}",
        "standard_monthly": f"${standard_cost:,.0f}",
        "batch_monthly":    f"${batch_cost:,.0f}",
        "stacked_monthly":  f"${stacked_cost:,.0f}",
        "batch_savings":    f"${standard_cost - batch_cost:,.0f}",
        "total_savings":    f"${standard_cost - stacked_cost:,.0f}",
        "savings_pct":      f"{(1 - stacked_cost/standard_cost)*100:.0f}%",
    }


# 시뮬레이션
scenarios = [
    ("claude-sonnet-4-6", 100_000),   # 월 10만 건
    ("claude-sonnet-4-6", 500_000),   # 월 50만 건
    ("claude-opus-4-7",   50_000),    # 월 5만 건 (고가 모델)
]

for model, reqs in scenarios:
    r = calculate_batch_savings(model, reqs)
    print(
        f"{r['model']} {r['monthly_requests']}건/월:\n"
        f"  표준: {r['standard_monthly']} → 배치: {r['batch_monthly']} "
        f"→ 배치+캐싱: {r['stacked_monthly']} ({r['savings_pct']} 절감)\n"
    )

# 예시 출력:
# claude-sonnet-4-6 100,000건/월:
#   표준: $1,900 → 배치: $950 → 배치+캐싱: $445 (77% 절감)
#
# claude-sonnet-4-6 500,000건/월:
#   표준: $9,500 → 배치: $4,750 → 배치+캐싱: $2,225 (77% 절감)
#
# claude-opus-4-7 50,000건/월:
#   표준: $6,250 → 배치: $3,125 → 배치+캐싱: $1,838 (71% 절감)

3. 100,000개 요청 처리 — 청크 분할 패턴

import asyncio
from dataclasses import dataclass, field
from typing import AsyncIterator

@dataclass
class BatchProcessor:
    """
    100K 상한을 고려한 자동 청크 분할 배치 프로세서
    """
    client: anthropic.Anthropic
    model: str = "claude-sonnet-4-6"
    chunk_size: int = 10_000    # 배치당 최대 요청 수 (안전 마진)
    poll_interval: int = 60     # 폴링 간격 (초)

    batch_ids: list[str] = field(default_factory=list)
    results: dict = field(default_factory=dict)

    def _make_request(self, custom_id: str, content: str) -> Request:
        return Request(
            custom_id=custom_id,
            params=MessageCreateParamsNonStreaming(
                model=self.model,
                max_tokens=512,
                system="당신은 데이터 분석 전문가입니다.",
                messages=[{"role": "user", "content": content}]
            )
        )

    async def submit_all(self, items: dict[str, str]) -> list[str]:
        """
        items: {custom_id: prompt_content} 딕셔너리
        자동으로 chunk_size 단위로 분할 제출
        """
        items_list = list(items.items())
        chunks = [
            items_list[i:i+self.chunk_size]
            for i in range(0, len(items_list), self.chunk_size)
        ]

        print(f"총 {len(items)}개 요청 → {len(chunks)}개 배치로 분할")

        for chunk_idx, chunk in enumerate(chunks):
            requests = [
                self._make_request(cid, content)
                for cid, content in chunk
            ]
            batch = self.client.beta.messages.batches.create(requests=requests)
            self.batch_ids.append(batch.id)
            print(f"  배치 {chunk_idx+1}/{len(chunks)} 제출: {batch.id}")

        return self.batch_ids

    async def wait_all(self) -> None:
        """모든 배치 완료 대기"""
        pending = set(self.batch_ids)
        completed = set()

        while pending:
            for batch_id in list(pending):
                batch = self.client.beta.messages.batches.retrieve(batch_id)
                if batch.processing_status == "ended":
                    completed.add(batch_id)
                    pending.discard(batch_id)
                    print(f"  배치 완료: {batch_id}")

            if pending:
                print(f"  대기 중: {len(pending)}개 배치...")
                await asyncio.sleep(self.poll_interval)

        print(f"전체 {len(completed)}개 배치 완료")

    def collect_results(self) -> dict:
        """모든 배치에서 결과 수집"""
        for batch_id in self.batch_ids:
            for result in self.client.beta.messages.batches.results(batch_id):
                if result.result.type == "succeeded":
                    self.results[result.custom_id] = {
                        "text": result.result.message.content[0].text,
                        "tokens": {
                            "input":  result.result.message.usage.input_tokens,
                            "output": result.result.message.usage.output_tokens,
                        }
                    }
                else:
                    self.results[result.custom_id] = {
                        "error": str(result.result.error)
                    }

        return self.results


# 사용 예시
async def process_100k_documents():
    client = anthropic.Anthropic()
    processor = BatchProcessor(client=client, chunk_size=10_000)

    # 10만 개 문서
    items = {
        f"doc-{i:06d}": f"문서 {i}: ..."
        for i in range(100_000)
    }

    # 제출 → 대기 → 수집
    await processor.submit_all(items)
    await processor.wait_all()
    results = processor.collect_results()

    success = sum(1 for r in results.values() if "text" in r)
    print(f"최종: {success}/{len(items)} 성공")
    return results

4. 300K 출력 토큰 베타 — 장문 생성 워크로드

# 2026년 3월 추가 기능:
# output-300k-2026-03-24 베타 헤더로 최대 300K 출력 토큰
# 배치 API 전용 (동기 Messages API 미지원)

def create_long_form_batch(prompts: list[dict]) -> str:
    """
    장문 콘텐츠 생성 배치 (최대 300K 출력 토큰)
    적합 케이스: 기술 문서, 코드 스캐폴딩, 장문 보고서
    주의: 단일 300K 요청 완료에 1시간 이상 소요 가능
    """
    requests = [
        Request(
            custom_id=p["id"],
            params=MessageCreateParamsNonStreaming(
                model="claude-opus-4-7",    # 장문 생성은 Opus 권장
                max_tokens=300_000,         # 최대 300K
                messages=[{"role": "user", "content": p["prompt"]}]
            )
        )
        for p in prompts
    ]

    batch = client.beta.messages.batches.create(
        requests=requests,
        betas=["output-300k-2026-03-24"]   # 베타 헤더 필수
    )

    print(f"300K 배치 생성: {batch.id}")
    print("주의: 300K 요청은 완료에 1시간+ 소요 가능")
    return batch.id


# 사용 케이스
long_form_tasks = [
    {
        "id": "tech-spec-001",
        "prompt": "분산 시스템 아키텍처 완전 가이드를 작성해줘 (패턴, 일관성 모델, 장애 내성 포함)"
    },
    {
        "id": "code-scaffold-001",
        "prompt": "FastAPI + PostgreSQL + Redis로 이커머스 백엔드 전체 코드 스캐폴딩을 작성해줘"
    }
]

batch_id = create_long_form_batch(long_form_tasks)

5. 에러 처리 + 재시도 패턴

from enum import Enum
import json
from pathlib import Path

class BatchState(Enum):
    SUBMITTED = "submitted"
    COMPLETED = "completed"
    FAILED    = "failed"

def robust_batch_pipeline(
    items: dict[str, str],
    state_file: str = "batch_state.json",
    max_retries: int = 3,
) -> dict:
    """
    상태 파일 기반 내결함성 배치 파이프라인
    중단 후 재시작 시 완료된 배치 건너뜀
    """
    state_path = Path(state_file)

    # 기존 상태 로드 (재시작 시)
    if state_path.exists():
        with open(state_path) as f:
            state = json.load(f)
        print(f"기존 상태 로드: 배치 {len(state['batches'])}개")
    else:
        state = {"batches": {}, "results": {}, "retry_counts": {}}

    # 미완료 배치 확인
    for batch_id, batch_state in list(state["batches"].items()):
        if batch_state == BatchState.SUBMITTED.value:
            batch = client.beta.messages.batches.retrieve(batch_id)

            if batch.processing_status == "ended":
                # 결과 수집
                for result in client.beta.messages.batches.results(batch_id):
                    if result.result.type == "succeeded":
                        state["results"][result.custom_id] = (
                            result.result.message.content[0].text
                        )
                    else:
                        # 에러 항목 재시도 큐에 추가
                        cid = result.custom_id
                        retry_count = state["retry_counts"].get(cid, 0)

                        if retry_count < max_retries:
                            state["retry_counts"][cid] = retry_count + 1
                            # items에서 원본 내용 가져와 재시도
                            if cid in items:
                                print(f"  재시도 예약: {cid} (시도 {retry_count+1}/{max_retries})")

                state["batches"][batch_id] = BatchState.COMPLETED.value

    # 상태 저장
    with open(state_path, "w") as f:
        json.dump(state, f, ensure_ascii=False, indent=2)

    return state["results"]


# 실패한 항목만 재시도
def retry_failed_items(
    failed_ids: list[str],
    original_items: dict[str, str],
    model: str = "claude-sonnet-4-6"
) -> str:
    """실패한 custom_id만 모아 새 배치로 재시도"""
    retry_items = {
        cid: original_items[cid]
        for cid in failed_ids
        if cid in original_items
    }

    if not retry_items:
        return None

    print(f"재시도 배치 생성: {len(retry_items)}개 항목")

    requests = [
        Request(
            custom_id=cid,
            params=MessageCreateParamsNonStreaming(
                model=model,
                max_tokens=512,
                messages=[{"role": "user", "content": content}]
            )
        )
        for cid, content in retry_items.items()
    ]

    batch = client.beta.messages.batches.create(requests=requests)
    return batch.id

6. 캐싱 + 배치 스태킹 — 최대 절감

# 배치 API + 프롬프트 캐싱 동시 적용
# 반복 시스템 프롬프트 + 공통 문서가 있는 경우 추가 85~90% 절감

def create_cached_batch(
    texts: list[str],
    system_prompt: str,
    shared_document: str = "",
) -> str:
    """
    배치 + 캐싱 동시 적용
    system_prompt와 shared_document는 cache_control로 마킹
    """
    requests = []

    for i, text in enumerate(texts):
        # 시스템 프롬프트에 캐시 마커 추가
        system_content = [
            {
                "type": "text",
                "text": system_prompt,
                "cache_control": {"type": "ephemeral"}  # ← 캐시 마킹
            }
        ]

        # 메시지 구성
        user_content = []

        if shared_document:
            # 공유 문서도 캐시 마킹
            user_content.append({
                "type": "text",
                "text": f"참조 문서:\n{shared_document}",
                "cache_control": {"type": "ephemeral"}
            })

        # 개별 텍스트 (캐시 안 됨)
        user_content.append({
            "type": "text",
            "text": f"분석 대상:\n{text}"
        })

        requests.append(
            Request(
                custom_id=f"item-{i:06d}",
                params=MessageCreateParamsNonStreaming(
                    model="claude-sonnet-4-6",
                    max_tokens=512,
                    system=system_content,
                    messages=[{"role": "user", "content": user_content}]
                )
            )
        )

    batch = client.beta.messages.batches.create(requests=requests)
    return batch.id


# 비용 효과 계산
# 예: 시스템 프롬프트 1,000 토큰 + 공유 문서 5,000 토큰
# → 10,000 요청 기준

# 배치 전용 (50% 할인):
#   (6,000 토큰 × 10,000회) / 1M × $3.00 × 0.5 = $90

# 배치 + 캐싱 스태킹:
#   캐시 히트 시 6,000 토큰의 90% = 5,400 토큰 × 10% 가격
#   개별 텍스트 나머지 → 표준 배치 가격
#   → 실질 비용 $90의 약 30% = ~$27 (총 표준 대비 ~86% 절감)

7. Webhook 대안 — 폴링 없이 완료 알림

# Anthropic Batch API는 아직 Webhook 미지원 (2026.05 기준)
# 폴링 대신 다음 대안 패턴 사용

# ── 패턴 1: 짧은 폴링 간격으로 빠른 완료 감지 ──
def smart_poll(batch_id: str) -> object:
    """
    처음에는 짧게, 시간 지날수록 간격 늘림
    """
    intervals = [30, 60, 120, 300, 600]  # 30초 → 10분
    attempt = 0

    while True:
        batch = client.beta.messages.batches.retrieve(batch_id)
        if batch.processing_status == "ended":
            return batch

        interval = intervals[min(attempt, len(intervals)-1)]
        print(f"대기 중... {interval}초 후 재확인")
        time.sleep(interval)
        attempt += 1


# ── 패턴 2: 배치 제출 후 Temporal로 상태 영속 관리 ──
# Temporal을 쓰면 서버 재시작해도 폴링 상태 유지

# ── 패턴 3: 배치 ID를 DB에 저장하고 크론잡으로 확인 ──
def submit_and_store(items: dict, db_connection) -> None:
    """배치 제출 후 DB에 ID 저장, 크론잡이 완료 체크"""
    batch_id = create_batch(list(items.values()))

    db_connection.execute("""
        INSERT INTO batch_jobs (batch_id, status, created_at, item_count)
        VALUES (?, 'submitted', datetime('now'), ?)
    """, (batch_id, len(items)))

    print(f"배치 제출 완료. ID 저장됨: {batch_id}")
    # 이후 크론잡이 30분마다 미완료 배치 확인

8. 실전 주의사항 — 프로덕션에서 배운 것

실제 프로덕션 사용에서 Batch API는 4시간+ 완료 시간, 개별 항목 진행 상황 없음, 취소 지원 없음, 불투명한 실패 등의 문제를 보였습니다. 이후 이 프로젝트는 asyncio.TaskGroup 병렬 처리로 전환해 수 시간을 수 분으로 단축했습니다.

# 프로덕션 주의사항 체크리스트

PRODUCTION_GOTCHAS = {

    "완료 시간 불확실":
        "대부분 1시간 이내지만 최대 24시간 가능\n"
        "→ 타임라인이 확실한 워크로드에만 사용",

    "개별 항목 진행 불가":
        "배치 전체 성공/실패 수만 확인 가능\n"
        "→ 어떤 항목이 실패했는지는 완료 후에만 알 수 있음\n"
        "→ custom_id로 사후 매칭",

    "개별 취소 불가":
        "제출된 배치는 개별 요청 단위 취소 없음\n"
        "배치 전체만 취소 가능: batches.cancel(batch_id)",

    "결과 만료":
        "결과는 제출 후 29일간 보관 후 자동 삭제\n"
        "→ 완료 즉시 결과 수집 후 자체 저장소에 보관",

    "실시간 UI 금지":
        "배치는 비동기 — 사용자가 기다리는 시나리오에 절대 사용 금지\n"
        "→ '제출 완료, 결과는 이메일로 드립니다' 패턴이 적합",

    "에러 처리":
        "일부 요청 실패해도 배치는 'ended' 상태가 됨\n"
        "→ result.type == 'errored' 항목 별도 처리 필수\n"
        "→ 재시도 배치 자동화 권장",
}

# ── 대안: 실시간성이 필요하면 asyncio 병렬 처리 ──
import asyncio

async def parallel_process(items: list[str], concurrency: int = 50) -> list:
    """
    배치 API 대신: asyncio로 50개씩 동시 처리
    장점: 실시간 진행 상황, 개별 재시도, 즉시 결과
    단점: 표준 가격 (50% 할인 없음)
    """
    semaphore = asyncio.Semaphore(concurrency)

    async def process_one(item: str) -> str:
        async with semaphore:
            # async anthropic client 사용
            async_client = anthropic.AsyncAnthropic()
            response = await async_client.messages.create(
                model="claude-sonnet-4-6",
                max_tokens=512,
                messages=[{"role": "user", "content": item}]
            )
            return response.content[0].text

    return await asyncio.gather(*[process_one(item) for item in items])

9. 워크로드별 최적 선택

# 워크로드별 권장 접근법

워크로드                       추천              이유
─────────────────────────────────────────────────────────────
월간 문서 처리 10만 건+         배치 API          50% 절감 + 처리량
LLM Eval 오프라인 실행          배치 API          비용 최소화
야간 데이터 보강                배치 API          시간 여유, 비용 중요
콘텐츠 생성 대기열              배치 API          실시간 불필요
장문 기술 문서 생성             배치 + 300K 베타  장문 지원 필수

사용자 대기 채팅                동기 API          실시간 필수
빠른 프로토타입 (<100건)        동기 API          배치 오버헤드 불필요
1시간 이내 결과 필요            asyncio 병렬      실시간 + 병렬
진행 상황 UI 보여줘야 함        asyncio 병렬      개별 상태 추적
취소 기능 필요                  asyncio 병렬      개별 취소 가능

결론

Batch API가 확실히 유리한 케이스

  • 비동기 허용 + 100건 이상 + 비용 중요 → 50% 할인 무조건
  • 반복 시스템 프롬프트 + 캐싱 스태킹 → 최대 85% 절감
  • 장문 콘텐츠 생성 → 300K 출력 토큰 (동기 API 불가)
  • 표준 rate limit 별도 → 대량 처리 시 한도 충돌 없음

지금 바로 적용할 수 있는 워크로드

  • 야간 분석 크론잡 → 표준 API → 배치 전환
  • LLM Eval 오프라인 실행 → Pydantic Evals + 배치 조합
  • 문서/피드백 대량 분류 → 배치로 전환 후 비용 확인

반드시 기억할 것

  • 실시간 UI에 배치 쓰면 안 됨 — "가짜 동기 호출" 패턴 금지
  • 결과 29일 후 자동 삭제 → 완료 즉시 자체 저장소 보관
  • 4시간+ 완료 경험 있음 → 24시간 여유 없으면 asyncio 병렬 대안 고려

 

반응형