사용자가 화면을 보며 기다리나요? 동기 API. 사용자가 제출하고 다른 일을 하나요? 배치 API. 이 한 줄 판단으로 비용이 절반이 됩니다.
핵심 요약 → Message Batches API: 최대 100,000 요청을 단일 배치로 제출, 24시간 내 결과 → 비용: 입력+출력 토큰 모두 표준 가격의 정확히 50% — 요청 수와 무관, 10개도 50% → 출력 토큰 한도: 동기 API 대비 대폭 확대 (beta 헤더로 최대 300K/요청) → 표준 rate limit 별도 — 배치가 일반 API 한도에 영향 없음 → 2026년 3월: output-300k-2026-03-24 베타 헤더로 300K 출력 토큰 지원 → 실전 사례: 782 파일 처리 → 8 배치 → 25분 (100% 성공률) → 한계: 진행 상황 실시간 추적 불가, 개별 취소 불가 — 프로덕션 주의사항 있음 → 캐싱 + 배치 스태킹: 반복 시스템 프롬프트면 추가 85~90% 절감 가능
언제 배치 API를 쓰나 — 단 하나의 판단 기준
핵심 규칙: 사용자가 화면을 보며 결과를 기다린다면 표준 Messages API를 쓰세요. 사용자가 작업을 제출하고 다른 일로 넘어간다면 배치 API를 쓰세요.
# 배치 API 적합 판단 트리
def should_use_batch_api(task: dict) -> tuple[bool, str]:
# ❌ 배치 API 금지 케이스
if task.get("user_waiting_realtime"):
return False, "사용자가 실시간 대기 중 → 표준 API"
if task.get("request_count", 0) < 100:
return False, "100개 미만 → 폴링 오버헤드가 할인보다 클 수 있음"
if task.get("max_latency_hours", 24) < 1:
return False, "1시간 이내 결과 필요 → 표준 API"
# ✅ 배치 API 최적 케이스
batch_use_cases = [
"document_processing", # 문서 대량 처리
"data_enrichment", # 데이터 보강 파이프라인
"nightly_analytics", # 야간 분석 배치
"offline_evaluation", # LLM Eval 오프라인 실행
"content_generation_queue", # 콘텐츠 생성 큐
"translation_pipeline", # 대량 번역
"classification_at_scale", # 대규모 분류
]
if task.get("type") in batch_use_cases:
return True, f"{task['type']} → 배치 API (50% 절감)"
return True, "비동기 허용 워크로드 → 배치 API 권장"
1. 기본 사용법 — 배치 생성부터 결과 수신까지
import anthropic
import time
from anthropic.types.beta.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.beta.messages.batch_create_params import Request
client = anthropic.Anthropic()
# ── 1단계: 배치 생성 ──
def create_batch(texts: list[str], model: str = "claude-sonnet-4-6") -> str:
"""
텍스트 리스트를 배치로 제출
custom_id: 나중에 결과와 매칭할 식별자
"""
requests = [
Request(
custom_id=f"item-{i:06d}", # 결과 매칭용 ID
params=MessageCreateParamsNonStreaming(
model=model,
max_tokens=1024,
messages=[{
"role": "user",
"content": f"다음 텍스트의 감정을 분석해줘 (positive/negative/neutral):\n\n{text}"
}]
)
)
for i, text in enumerate(texts)
]
batch = client.beta.messages.batches.create(requests=requests)
print(f"배치 생성 완료: {batch.id}")
print(f"요청 수: {batch.request_counts.processing}")
return batch.id
# ── 2단계: 완료 대기 (폴링) ──
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> object:
"""
배치 완료까지 폴링
대부분 1시간 이내 완료, 최대 24시간
"""
print(f"배치 처리 대기 중: {batch_id}")
while True:
batch = client.beta.messages.batches.retrieve(batch_id)
counts = batch.request_counts
total = counts.processing + counts.succeeded + counts.errored + counts.canceled + counts.expired
print(
f" 진행: {counts.succeeded}✅ {counts.errored}❌ "
f"{counts.processing}⏳ / {total}개 "
f"({counts.succeeded/total*100:.0f}%)"
)
if batch.processing_status == "ended":
print(f"배치 완료!")
return batch
time.sleep(poll_interval)
# ── 3단계: 결과 수신 ──
def get_batch_results(batch_id: str) -> dict:
"""
배치 결과를 custom_id → 결과 딕셔너리로 반환
"""
results = {}
for result in client.beta.messages.batches.results(batch_id):
cid = result.custom_id
if result.result.type == "succeeded":
results[cid] = {
"status": "success",
"text": result.result.message.content[0].text,
"input_tokens": result.result.message.usage.input_tokens,
"output_tokens": result.result.message.usage.output_tokens,
}
elif result.result.type == "errored":
results[cid] = {
"status": "error",
"error": str(result.result.error),
}
return results
# ── 전체 파이프라인 ──
def process_large_dataset(texts: list[str]) -> dict:
"""
대량 텍스트 감정 분석 파이프라인
"""
batch_id = create_batch(texts)
wait_for_batch(batch_id)
results = get_batch_results(batch_id)
success_count = sum(1 for r in results.values() if r["status"] == "success")
print(f"\n완료: {success_count}/{len(texts)} 성공")
return results
# 실행
texts = [f"고객 리뷰 {i}: ..." for i in range(10_000)]
results = process_large_dataset(texts)
2. 비용 계산 — 실제 절감 규모
# 배치 API 비용 절감 계산
STANDARD_PRICES = {
"claude-haiku-4-5": {"input": 0.80, "output": 4.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
"claude-opus-4-7": {"input": 5.00, "output": 25.00},
}
def calculate_batch_savings(
model: str,
monthly_requests: int,
avg_input_tokens: int = 500,
avg_output_tokens: int = 200,
) -> dict:
"""
월 배치 비용 절감 계산
"""
prices = STANDARD_PRICES[model]
monthly_input_tokens = monthly_requests * avg_input_tokens
monthly_output_tokens = monthly_requests * avg_output_tokens
# 표준 API 비용
standard_cost = (
monthly_input_tokens / 1_000_000 * prices["input"] +
monthly_output_tokens / 1_000_000 * prices["output"]
)
# 배치 API 비용 (50% 할인)
batch_cost = standard_cost * 0.5
# 캐싱 + 배치 스태킹
# 시스템 프롬프트 1000 토큰을 모든 요청에서 재사용한다면
cached_portion = 1000 * monthly_requests / 1_000_000 * prices["input"]
cache_savings = cached_portion * 0.90 # 캐시 히트 시 90% 절감
stacked_cost = batch_cost - cache_savings
return {
"model": model,
"monthly_requests": f"{monthly_requests:,}",
"standard_monthly": f"${standard_cost:,.0f}",
"batch_monthly": f"${batch_cost:,.0f}",
"stacked_monthly": f"${stacked_cost:,.0f}",
"batch_savings": f"${standard_cost - batch_cost:,.0f}",
"total_savings": f"${standard_cost - stacked_cost:,.0f}",
"savings_pct": f"{(1 - stacked_cost/standard_cost)*100:.0f}%",
}
# 시뮬레이션
scenarios = [
("claude-sonnet-4-6", 100_000), # 월 10만 건
("claude-sonnet-4-6", 500_000), # 월 50만 건
("claude-opus-4-7", 50_000), # 월 5만 건 (고가 모델)
]
for model, reqs in scenarios:
r = calculate_batch_savings(model, reqs)
print(
f"{r['model']} {r['monthly_requests']}건/월:\n"
f" 표준: {r['standard_monthly']} → 배치: {r['batch_monthly']} "
f"→ 배치+캐싱: {r['stacked_monthly']} ({r['savings_pct']} 절감)\n"
)
# 예시 출력:
# claude-sonnet-4-6 100,000건/월:
# 표준: $1,900 → 배치: $950 → 배치+캐싱: $445 (77% 절감)
#
# claude-sonnet-4-6 500,000건/월:
# 표준: $9,500 → 배치: $4,750 → 배치+캐싱: $2,225 (77% 절감)
#
# claude-opus-4-7 50,000건/월:
# 표준: $6,250 → 배치: $3,125 → 배치+캐싱: $1,838 (71% 절감)
3. 100,000개 요청 처리 — 청크 분할 패턴
import asyncio
from dataclasses import dataclass, field
from typing import AsyncIterator
@dataclass
class BatchProcessor:
"""
100K 상한을 고려한 자동 청크 분할 배치 프로세서
"""
client: anthropic.Anthropic
model: str = "claude-sonnet-4-6"
chunk_size: int = 10_000 # 배치당 최대 요청 수 (안전 마진)
poll_interval: int = 60 # 폴링 간격 (초)
batch_ids: list[str] = field(default_factory=list)
results: dict = field(default_factory=dict)
def _make_request(self, custom_id: str, content: str) -> Request:
return Request(
custom_id=custom_id,
params=MessageCreateParamsNonStreaming(
model=self.model,
max_tokens=512,
system="당신은 데이터 분석 전문가입니다.",
messages=[{"role": "user", "content": content}]
)
)
async def submit_all(self, items: dict[str, str]) -> list[str]:
"""
items: {custom_id: prompt_content} 딕셔너리
자동으로 chunk_size 단위로 분할 제출
"""
items_list = list(items.items())
chunks = [
items_list[i:i+self.chunk_size]
for i in range(0, len(items_list), self.chunk_size)
]
print(f"총 {len(items)}개 요청 → {len(chunks)}개 배치로 분할")
for chunk_idx, chunk in enumerate(chunks):
requests = [
self._make_request(cid, content)
for cid, content in chunk
]
batch = self.client.beta.messages.batches.create(requests=requests)
self.batch_ids.append(batch.id)
print(f" 배치 {chunk_idx+1}/{len(chunks)} 제출: {batch.id}")
return self.batch_ids
async def wait_all(self) -> None:
"""모든 배치 완료 대기"""
pending = set(self.batch_ids)
completed = set()
while pending:
for batch_id in list(pending):
batch = self.client.beta.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
completed.add(batch_id)
pending.discard(batch_id)
print(f" 배치 완료: {batch_id}")
if pending:
print(f" 대기 중: {len(pending)}개 배치...")
await asyncio.sleep(self.poll_interval)
print(f"전체 {len(completed)}개 배치 완료")
def collect_results(self) -> dict:
"""모든 배치에서 결과 수집"""
for batch_id in self.batch_ids:
for result in self.client.beta.messages.batches.results(batch_id):
if result.result.type == "succeeded":
self.results[result.custom_id] = {
"text": result.result.message.content[0].text,
"tokens": {
"input": result.result.message.usage.input_tokens,
"output": result.result.message.usage.output_tokens,
}
}
else:
self.results[result.custom_id] = {
"error": str(result.result.error)
}
return self.results
# 사용 예시
async def process_100k_documents():
client = anthropic.Anthropic()
processor = BatchProcessor(client=client, chunk_size=10_000)
# 10만 개 문서
items = {
f"doc-{i:06d}": f"문서 {i}: ..."
for i in range(100_000)
}
# 제출 → 대기 → 수집
await processor.submit_all(items)
await processor.wait_all()
results = processor.collect_results()
success = sum(1 for r in results.values() if "text" in r)
print(f"최종: {success}/{len(items)} 성공")
return results
4. 300K 출력 토큰 베타 — 장문 생성 워크로드
# 2026년 3월 추가 기능:
# output-300k-2026-03-24 베타 헤더로 최대 300K 출력 토큰
# 배치 API 전용 (동기 Messages API 미지원)
def create_long_form_batch(prompts: list[dict]) -> str:
"""
장문 콘텐츠 생성 배치 (최대 300K 출력 토큰)
적합 케이스: 기술 문서, 코드 스캐폴딩, 장문 보고서
주의: 단일 300K 요청 완료에 1시간 이상 소요 가능
"""
requests = [
Request(
custom_id=p["id"],
params=MessageCreateParamsNonStreaming(
model="claude-opus-4-7", # 장문 생성은 Opus 권장
max_tokens=300_000, # 최대 300K
messages=[{"role": "user", "content": p["prompt"]}]
)
)
for p in prompts
]
batch = client.beta.messages.batches.create(
requests=requests,
betas=["output-300k-2026-03-24"] # 베타 헤더 필수
)
print(f"300K 배치 생성: {batch.id}")
print("주의: 300K 요청은 완료에 1시간+ 소요 가능")
return batch.id
# 사용 케이스
long_form_tasks = [
{
"id": "tech-spec-001",
"prompt": "분산 시스템 아키텍처 완전 가이드를 작성해줘 (패턴, 일관성 모델, 장애 내성 포함)"
},
{
"id": "code-scaffold-001",
"prompt": "FastAPI + PostgreSQL + Redis로 이커머스 백엔드 전체 코드 스캐폴딩을 작성해줘"
}
]
batch_id = create_long_form_batch(long_form_tasks)
5. 에러 처리 + 재시도 패턴
from enum import Enum
import json
from pathlib import Path
class BatchState(Enum):
SUBMITTED = "submitted"
COMPLETED = "completed"
FAILED = "failed"
def robust_batch_pipeline(
items: dict[str, str],
state_file: str = "batch_state.json",
max_retries: int = 3,
) -> dict:
"""
상태 파일 기반 내결함성 배치 파이프라인
중단 후 재시작 시 완료된 배치 건너뜀
"""
state_path = Path(state_file)
# 기존 상태 로드 (재시작 시)
if state_path.exists():
with open(state_path) as f:
state = json.load(f)
print(f"기존 상태 로드: 배치 {len(state['batches'])}개")
else:
state = {"batches": {}, "results": {}, "retry_counts": {}}
# 미완료 배치 확인
for batch_id, batch_state in list(state["batches"].items()):
if batch_state == BatchState.SUBMITTED.value:
batch = client.beta.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
# 결과 수집
for result in client.beta.messages.batches.results(batch_id):
if result.result.type == "succeeded":
state["results"][result.custom_id] = (
result.result.message.content[0].text
)
else:
# 에러 항목 재시도 큐에 추가
cid = result.custom_id
retry_count = state["retry_counts"].get(cid, 0)
if retry_count < max_retries:
state["retry_counts"][cid] = retry_count + 1
# items에서 원본 내용 가져와 재시도
if cid in items:
print(f" 재시도 예약: {cid} (시도 {retry_count+1}/{max_retries})")
state["batches"][batch_id] = BatchState.COMPLETED.value
# 상태 저장
with open(state_path, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
return state["results"]
# 실패한 항목만 재시도
def retry_failed_items(
failed_ids: list[str],
original_items: dict[str, str],
model: str = "claude-sonnet-4-6"
) -> str:
"""실패한 custom_id만 모아 새 배치로 재시도"""
retry_items = {
cid: original_items[cid]
for cid in failed_ids
if cid in original_items
}
if not retry_items:
return None
print(f"재시도 배치 생성: {len(retry_items)}개 항목")
requests = [
Request(
custom_id=cid,
params=MessageCreateParamsNonStreaming(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": content}]
)
)
for cid, content in retry_items.items()
]
batch = client.beta.messages.batches.create(requests=requests)
return batch.id
6. 캐싱 + 배치 스태킹 — 최대 절감
# 배치 API + 프롬프트 캐싱 동시 적용
# 반복 시스템 프롬프트 + 공통 문서가 있는 경우 추가 85~90% 절감
def create_cached_batch(
texts: list[str],
system_prompt: str,
shared_document: str = "",
) -> str:
"""
배치 + 캐싱 동시 적용
system_prompt와 shared_document는 cache_control로 마킹
"""
requests = []
for i, text in enumerate(texts):
# 시스템 프롬프트에 캐시 마커 추가
system_content = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"} # ← 캐시 마킹
}
]
# 메시지 구성
user_content = []
if shared_document:
# 공유 문서도 캐시 마킹
user_content.append({
"type": "text",
"text": f"참조 문서:\n{shared_document}",
"cache_control": {"type": "ephemeral"}
})
# 개별 텍스트 (캐시 안 됨)
user_content.append({
"type": "text",
"text": f"분석 대상:\n{text}"
})
requests.append(
Request(
custom_id=f"item-{i:06d}",
params=MessageCreateParamsNonStreaming(
model="claude-sonnet-4-6",
max_tokens=512,
system=system_content,
messages=[{"role": "user", "content": user_content}]
)
)
)
batch = client.beta.messages.batches.create(requests=requests)
return batch.id
# 비용 효과 계산
# 예: 시스템 프롬프트 1,000 토큰 + 공유 문서 5,000 토큰
# → 10,000 요청 기준
# 배치 전용 (50% 할인):
# (6,000 토큰 × 10,000회) / 1M × $3.00 × 0.5 = $90
# 배치 + 캐싱 스태킹:
# 캐시 히트 시 6,000 토큰의 90% = 5,400 토큰 × 10% 가격
# 개별 텍스트 나머지 → 표준 배치 가격
# → 실질 비용 $90의 약 30% = ~$27 (총 표준 대비 ~86% 절감)
7. Webhook 대안 — 폴링 없이 완료 알림
# Anthropic Batch API는 아직 Webhook 미지원 (2026.05 기준)
# 폴링 대신 다음 대안 패턴 사용
# ── 패턴 1: 짧은 폴링 간격으로 빠른 완료 감지 ──
def smart_poll(batch_id: str) -> object:
"""
처음에는 짧게, 시간 지날수록 간격 늘림
"""
intervals = [30, 60, 120, 300, 600] # 30초 → 10분
attempt = 0
while True:
batch = client.beta.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
return batch
interval = intervals[min(attempt, len(intervals)-1)]
print(f"대기 중... {interval}초 후 재확인")
time.sleep(interval)
attempt += 1
# ── 패턴 2: 배치 제출 후 Temporal로 상태 영속 관리 ──
# Temporal을 쓰면 서버 재시작해도 폴링 상태 유지
# ── 패턴 3: 배치 ID를 DB에 저장하고 크론잡으로 확인 ──
def submit_and_store(items: dict, db_connection) -> None:
"""배치 제출 후 DB에 ID 저장, 크론잡이 완료 체크"""
batch_id = create_batch(list(items.values()))
db_connection.execute("""
INSERT INTO batch_jobs (batch_id, status, created_at, item_count)
VALUES (?, 'submitted', datetime('now'), ?)
""", (batch_id, len(items)))
print(f"배치 제출 완료. ID 저장됨: {batch_id}")
# 이후 크론잡이 30분마다 미완료 배치 확인
8. 실전 주의사항 — 프로덕션에서 배운 것
실제 프로덕션 사용에서 Batch API는 4시간+ 완료 시간, 개별 항목 진행 상황 없음, 취소 지원 없음, 불투명한 실패 등의 문제를 보였습니다. 이후 이 프로젝트는 asyncio.TaskGroup 병렬 처리로 전환해 수 시간을 수 분으로 단축했습니다.
# 프로덕션 주의사항 체크리스트
PRODUCTION_GOTCHAS = {
"완료 시간 불확실":
"대부분 1시간 이내지만 최대 24시간 가능\n"
"→ 타임라인이 확실한 워크로드에만 사용",
"개별 항목 진행 불가":
"배치 전체 성공/실패 수만 확인 가능\n"
"→ 어떤 항목이 실패했는지는 완료 후에만 알 수 있음\n"
"→ custom_id로 사후 매칭",
"개별 취소 불가":
"제출된 배치는 개별 요청 단위 취소 없음\n"
"배치 전체만 취소 가능: batches.cancel(batch_id)",
"결과 만료":
"결과는 제출 후 29일간 보관 후 자동 삭제\n"
"→ 완료 즉시 결과 수집 후 자체 저장소에 보관",
"실시간 UI 금지":
"배치는 비동기 — 사용자가 기다리는 시나리오에 절대 사용 금지\n"
"→ '제출 완료, 결과는 이메일로 드립니다' 패턴이 적합",
"에러 처리":
"일부 요청 실패해도 배치는 'ended' 상태가 됨\n"
"→ result.type == 'errored' 항목 별도 처리 필수\n"
"→ 재시도 배치 자동화 권장",
}
# ── 대안: 실시간성이 필요하면 asyncio 병렬 처리 ──
import asyncio
async def parallel_process(items: list[str], concurrency: int = 50) -> list:
"""
배치 API 대신: asyncio로 50개씩 동시 처리
장점: 실시간 진행 상황, 개별 재시도, 즉시 결과
단점: 표준 가격 (50% 할인 없음)
"""
semaphore = asyncio.Semaphore(concurrency)
async def process_one(item: str) -> str:
async with semaphore:
# async anthropic client 사용
async_client = anthropic.AsyncAnthropic()
response = await async_client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": item}]
)
return response.content[0].text
return await asyncio.gather(*[process_one(item) for item in items])
9. 워크로드별 최적 선택
# 워크로드별 권장 접근법
워크로드 추천 이유
─────────────────────────────────────────────────────────────
월간 문서 처리 10만 건+ 배치 API 50% 절감 + 처리량
LLM Eval 오프라인 실행 배치 API 비용 최소화
야간 데이터 보강 배치 API 시간 여유, 비용 중요
콘텐츠 생성 대기열 배치 API 실시간 불필요
장문 기술 문서 생성 배치 + 300K 베타 장문 지원 필수
사용자 대기 채팅 동기 API 실시간 필수
빠른 프로토타입 (<100건) 동기 API 배치 오버헤드 불필요
1시간 이내 결과 필요 asyncio 병렬 실시간 + 병렬
진행 상황 UI 보여줘야 함 asyncio 병렬 개별 상태 추적
취소 기능 필요 asyncio 병렬 개별 취소 가능
결론
✅ Batch API가 확실히 유리한 케이스
- 비동기 허용 + 100건 이상 + 비용 중요 → 50% 할인 무조건
- 반복 시스템 프롬프트 + 캐싱 스태킹 → 최대 85% 절감
- 장문 콘텐츠 생성 → 300K 출력 토큰 (동기 API 불가)
- 표준 rate limit 별도 → 대량 처리 시 한도 충돌 없음
✅ 지금 바로 적용할 수 있는 워크로드
- 야간 분석 크론잡 → 표준 API → 배치 전환
- LLM Eval 오프라인 실행 → Pydantic Evals + 배치 조합
- 문서/피드백 대량 분류 → 배치로 전환 후 비용 확인
❌ 반드시 기억할 것
- 실시간 UI에 배치 쓰면 안 됨 — "가짜 동기 호출" 패턴 금지
- 결과 29일 후 자동 삭제 → 완료 즉시 자체 저장소 보관
- 4시간+ 완료 경험 있음 → 24시간 여유 없으면 asyncio 병렬 대안 고려
'Claude' 카테고리의 다른 글
| Claude Opus 4.8 Fast Mode 완전 분석 — 2.5배 빠르고 3배 싸다는 게 실제로 맞는가 (0) | 2026.06.01 |
|---|---|
| Claude Opus 4.8 — 69.2: GPT-5.5가 SWE-bench Pro 58.6%로 정상을 노리던 그 자리, Anthropic이 41일 만에 답했습니다. (0) | 2026.06.01 |
| Claude Code Hooks 완전가이드 — 프롬프트 요청이 아닌 보장된 실행 (0) | 2026.05.29 |
| 일본 정부 + 3대 메가뱅크 Claude Mythos 도입 — 왜 하필 일본이 첫 번째 비(非)영미권 파트너인가 (1) | 2026.05.29 |
| Anthropic이 월스트리트를 노린다 — 10개 금융 에이전트 + $1.5B JV, 무엇이 바뀌나 (0) | 2026.05.28 |