본문 바로가기

AI Development

AI 네이티브 앱 아키텍처 설계 — 처음부터 AI를 고려한 풀스택 구조 (with Supabase)

반응형

"AI 기능 추가해야 해"라는 말을 들으면 많은 개발자가 기존 앱에 LLM API 호출을 끼워 넣어요.

# 이렇게 하면 안 돼요
@app.post("/chat")
def chat(message: str):
    response = openai.chat.completions.create(...)  # 그냥 때려넣기
    return response

이렇게 만들면 사용자가 100명만 돼도 무너져요. LLM은 일반 API 호출과 물리학이 달라요.

일반 API:   10~100ms, 결정론적, 토큰 비용 없음
LLM API:  500ms~30초, 확률론적, 토큰마다 비용 발생

이 차이가 아키텍처 전체를 바꿔요. 이번 글에서는 처음부터 AI를 고려한 풀스택 구조를 실전 코드와 함께 정리해 드릴게요.


전체 아키텍처 구조


백엔드 — FastAPI 설계

1. LLM 호출 레이어

LLM 호출을 직접 하지 말고 레이어로 감싸요. 모델 교체, 폴백, 비용 추적을 한 곳에서 관리할 수 있어요.

# llm_service.py
from anthropic import Anthropic
from openai import OpenAI
import time
import logging

class LLMService:
    def __init__(self):
        self.anthropic = Anthropic()
        self.openai = OpenAI()
        self.primary_model = "claude-sonnet-4-6"
        self.fallback_model = "gpt-4o"

    async def generate(
        self,
        messages: list,
        user_id: str,
        max_tokens: int = 1024,
        stream: bool = False
    ):
        start_time = time.time()

        try:
            cached = await self.check_semantic_cache(messages)
            if cached:
                await self.log_usage(user_id, 0, 0, cached=True)
                return cached

            response = await self._call_claude(messages, max_tokens, stream)

            await self.log_usage(
                user_id,
                response.usage.input_tokens,
                response.usage.output_tokens
            )
            return response

        except Exception as e:
            logging.error(f"Primary model failed: {e}")
            return await self._call_openai_fallback(messages, max_tokens, stream)

    async def generate_stream(self, messages: list, user_id: str):
        """스트리밍 제너레이터"""
        async with self.anthropic.messages.stream(
            model=self.primary_model,
            max_tokens=1024,
            messages=messages
        ) as stream:
            async for text in stream.text_stream:
                yield text

    async def _call_claude(self, messages, max_tokens, stream):
        return self.anthropic.messages.create(
            model=self.primary_model,
            max_tokens=max_tokens,
            messages=messages,
            stream=stream
        )

    async def _call_openai_fallback(self, messages, max_tokens, stream):
        logging.warning("Falling back to OpenAI")
        return self.openai.chat.completions.create(
            model=self.fallback_model,
            max_tokens=max_tokens,
            messages=messages,
            stream=stream
        )

    async def log_usage(self, user_id, input_tokens, output_tokens, cached=False):
        await supabase.table("token_usage").insert({
            "user_id": user_id,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cached": cached
        }).execute()

2. 스트리밍 API 엔드포인트

사용자는 8초짜리 빈 화면을 못 참아요. SSE(Server-Sent Events)로 토큰이 생성되는 즉시 보내야 해요.

# router/chat.py
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse

router = APIRouter()

@router.post("/chat/stream")
async def chat_stream(
    request: ChatRequest,
    llm_service: LLMService = Depends(get_llm_service),
    current_user: User = Depends(get_current_user)
):
    async def event_generator():
        try:
            history = await get_conversation_history(
                request.conversation_id,
                current_user.id
            )

            if request.use_rag:
                context = await rag_service.retrieve(request.message)
                system_prompt = f"다음 컨텍스트를 참고해서 답변하세요:\n{context}"
            else:
                system_prompt = "당신은 도움이 되는 어시스턴트입니다."

            messages = history + [{"role": "user", "content": request.message}]

            full_response = ""
            async for token in llm_service.generate_stream(messages, current_user.id):
                full_response += token
                yield {"event": "token", "data": token}

            await save_message(request.conversation_id, "assistant", full_response)
            yield {"event": "done", "data": "[DONE]"}

        except Exception as e:
            yield {"event": "error", "data": str(e)}

    return EventSourceResponse(event_generator())


@router.post("/chat/async")
async def chat_async(
    request: HeavyTaskRequest,
    current_user: User = Depends(get_current_user)
):
    """무거운 작업은 큐에 넣고 즉시 job_id 반환"""
    job_id = await task_queue.enqueue(
        "process_heavy_ai_task",
        user_id=current_user.id,
        task_data=request.dict()
    )
    return {"job_id": job_id, "status": "queued"}


@router.get("/chat/async/{job_id}")
async def get_job_status(job_id: str):
    job = await task_queue.get_job(job_id)
    return {
        "job_id": job_id,
        "status": job.status,
        "result": job.result if job.status == "completed" else None
    }

3. 시맨틱 캐시 (비용 절감 핵심)

# cache/semantic_cache.py
from supabase import create_client
import numpy as np

class SemanticCache:
    def __init__(self):
        self.supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
        self.similarity_threshold = 0.92

    async def get(self, query: str) -> str | None:
        query_embedding = await self.embed(query)

        # Supabase pgvector로 유사도 검색
        result = self.supabase.rpc(
            "match_semantic_cache",
            {
                "query_embedding": query_embedding,
                "match_threshold": self.similarity_threshold,
                "match_count": 1
            }
        ).execute()

        if result.data:
            return result.data[0]["response"]
        return None

    async def set(self, query: str, response: str):
        query_embedding = await self.embed(query)

        self.supabase.table("semantic_cache").insert({
            "query": query,
            "response": response,
            "embedding": query_embedding
        }).execute()

    async def embed(self, text: str) -> list[float]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

4. 사용량 제한 (비용 폭탄 방지)

# middleware/rate_limit.py
from fastapi import HTTPException

class TokenBudgetMiddleware:
    DAILY_LIMIT = {
        "free": 10_000,
        "pro": 500_000,
        "enterprise": -1
    }

    async def check_budget(self, user_id: str, plan: str, estimated_tokens: int):
        daily_limit = self.DAILY_LIMIT.get(plan, 10_000)

        if daily_limit == -1:
            return True

        # Supabase로 오늘 사용량 조회
        result = self.supabase.rpc(
            "get_daily_token_usage",
            {"p_user_id": user_id}
        ).execute()

        used_today = result.data[0]["total"] if result.data else 0

        if used_today + estimated_tokens > daily_limit:
            raise HTTPException(
                status_code=429,
                detail={
                    "error": "daily_token_limit_exceeded",
                    "used": used_today,
                    "limit": daily_limit,
                    "reset_in": "내일 자정에 초기화됩니다"
                }
            )

프론트엔드 — Next.js 설계

1. 스트리밍 응답 처리

// hooks/useStreamingChat.ts
import { useState, useCallback } from "react";

interface Message {
  role: "user" | "assistant";
  content: string;
  isStreaming?: boolean;
}

export function useStreamingChat(conversationId: string) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);

  const sendMessage = useCallback(async (userMessage: string) => {
    setMessages(prev => [...prev, { role: "user", content: userMessage }]);
    setMessages(prev => [...prev, {
      role: "assistant",
      content: "",
      isStreaming: true
    }]);
    setIsLoading(true);
    setError(null);

    try {
      const response = await fetch("/api/chat/stream", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          message: userMessage,
          conversation_id: conversationId
        })
      });

      if (!response.ok) throw new Error("API 오류");

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split("\n");

        for (const line of lines) {
          if (line.startsWith("data: ")) {
            const data = line.slice(6);

            if (data === "[DONE]") {
              setMessages(prev => prev.map((msg, idx) =>
                idx === prev.length - 1
                  ? { ...msg, isStreaming: false }
                  : msg
              ));
              break;
            }

            setMessages(prev => prev.map((msg, idx) =>
              idx === prev.length - 1
                ? { ...msg, content: msg.content + data }
                : msg
            ));
          }

          if (line.startsWith("event: error")) {
            setError("응답 생성 중 오류가 발생했습니다.");
          }
        }
      }
    } catch (err) {
      setError("서버 연결에 실패했습니다. 다시 시도해 주세요.");
      setMessages(prev => prev.slice(0, -1));
    } finally {
      setIsLoading(false);
    }
  }, [conversationId]);

  return { messages, isLoading, error, sendMessage };
}

2. 채팅 UI 컴포넌트

// components/ChatInterface.tsx
import { useStreamingChat } from "@/hooks/useStreamingChat";
import { useState, useRef, useEffect } from "react";

export function ChatInterface({ conversationId }: { conversationId: string }) {
  const { messages, isLoading, error, sendMessage } = useStreamingChat(conversationId);
  const [input, setInput] = useState("");
  const bottomRef = useRef<HTMLDivElement>(null);

  useEffect(() => {
    bottomRef.current?.scrollIntoView({ behavior: "smooth" });
  }, [messages]);

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (!input.trim() || isLoading) return;
    sendMessage(input);
    setInput("");
  };

  return (
    <div className="flex flex-col h-full">
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map((msg, idx) => (
          <div key={idx} className={`flex ${msg.role === "user" ? "justify-end" : "justify-start"}`}>
            <div className={`max-w-[80%] rounded-lg p-3 ${
              msg.role === "user"
                ? "bg-blue-500 text-white"
                : "bg-gray-100 text-gray-800"
            }`}>
              {msg.content}
              {msg.isStreaming && (
                <span className="inline-block w-2 h-4 bg-gray-500 animate-pulse ml-1" />
              )}
            </div>
          </div>
        ))}

        {error && (
          <div className="bg-red-50 border border-red-200 rounded p-3 text-red-600 text-sm">
            {error}
          </div>
        )}
        <div ref={bottomRef} />
      </div>

      <form onSubmit={handleSubmit} className="p-4 border-t">
        <div className="flex gap-2">
          <input
            value={input}
            onChange={(e) => setInput(e.target.value)}
            placeholder="메시지를 입력하세요..."
            disabled={isLoading}
            className="flex-1 border rounded-lg px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"
          />
          <button
            type="submit"
            disabled={isLoading || !input.trim()}
            className="bg-blue-500 text-white px-4 py-2 rounded-lg disabled:opacity-50"
          >
            {isLoading ? "생성 중..." : "전송"}
          </button>
        </div>
      </form>
    </div>
  );
}

3. 무거운 작업 처리 (폴링)

// hooks/useAsyncTask.ts
import { useState, useEffect } from "react";

export function useAsyncTask() {
  const [jobId, setJobId] = useState<string | null>(null);
  const [status, setStatus] = useState<"idle" | "queued" | "processing" | "completed" | "failed">("idle");
  const [result, setResult] = useState<any>(null);

  const startTask = async (taskData: any) => {
    const response = await fetch("/api/chat/async", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify(taskData)
    });
    const { job_id } = await response.json();
    setJobId(job_id);
    setStatus("queued");
  };

  useEffect(() => {
    if (!jobId || status === "completed" || status === "failed") return;

    const interval = setInterval(async () => {
      const response = await fetch(`/api/chat/async/${jobId}`);
      const data = await response.json();
      setStatus(data.status);

      if (data.status === "completed") {
        setResult(data.result);
        clearInterval(interval);
      } else if (data.status === "failed") {
        clearInterval(interval);
      }
    }, 2000);

    return () => clearInterval(interval);
  }, [jobId, status]);

  return { startTask, status, result };
}

DB 설계 — Supabase 기준

Supabase는 PostgreSQL 기반이라 SQL 스키마를 그대로 쓸 수 있어요. 거기에 pgvector 확장으로 임베딩 검색까지 한 곳에서 해결돼요.

테이블 스키마

Supabase SQL Editor에서 실행하면 돼요.

-- pgvector 확장 활성화 (Supabase에서 기본 지원)
create extension if not exists vector;

-- 사용자 테이블
-- Supabase Auth와 연동하는 구조
create table public.profiles (
    id uuid references auth.users(id) on delete cascade primary key,
    email text unique not null,
    plan text default 'free' check (plan in ('free', 'pro', 'enterprise')),
    created_at timestamp with time zone default timezone('utc', now()),
    updated_at timestamp with time zone default timezone('utc', now())
);

-- 신규 유저 가입 시 profiles 자동 생성
create or replace function public.handle_new_user()
returns trigger as $$
begin
    insert into public.profiles (id, email)
    values (new.id, new.email);
    return new;
end;
$$ language plpgsql security definer;

create trigger on_auth_user_created
    after insert on auth.users
    for each row execute procedure public.handle_new_user();

-- 대화 테이블
create table public.conversations (
    id uuid default gen_random_uuid() primary key,
    user_id uuid references public.profiles(id) on delete cascade not null,
    title text,
    created_at timestamp with time zone default timezone('utc', now()),
    updated_at timestamp with time zone default timezone('utc', now())
);

-- 메시지 테이블
create table public.messages (
    id uuid default gen_random_uuid() primary key,
    conversation_id uuid references public.conversations(id) on delete cascade not null,
    role text not null check (role in ('user', 'assistant', 'system')),
    content text not null,
    model text,
    input_tokens integer default 0,
    output_tokens integer default 0,
    created_at timestamp with time zone default timezone('utc', now())
);

-- 토큰 사용량 테이블
create table public.token_usage (
    id uuid default gen_random_uuid() primary key,
    user_id uuid references public.profiles(id) on delete cascade not null,
    input_tokens integer not null default 0,
    output_tokens integer not null default 0,
    model text,
    cached boolean default false,
    created_at timestamp with time zone default timezone('utc', now())
);

-- 시맨틱 캐시 테이블 (pgvector 사용)
create table public.semantic_cache (
    id uuid default gen_random_uuid() primary key,
    query text not null,
    response text not null,
    embedding vector(1536),  -- text-embedding-3-small 차원
    created_at timestamp with time zone default timezone('utc', now()),
    -- 1시간 후 만료
    expires_at timestamp with time zone default timezone('utc', now()) + interval '1 hour'
);

-- RAG용 문서 청크 테이블
create table public.document_chunks (
    id uuid default gen_random_uuid() primary key,
    user_id uuid references public.profiles(id) on delete cascade,
    content text not null,
    embedding vector(1536),
    metadata jsonb default '{}',
    created_at timestamp with time zone default timezone('utc', now())
);

인덱스 및 RLS 설정

-- 성능 인덱스
create index idx_conversations_user_id
    on public.conversations(user_id, created_at desc);

create index idx_messages_conversation_id
    on public.messages(conversation_id, created_at asc);

create index idx_token_usage_user_date
    on public.token_usage(user_id, created_at desc);

-- pgvector 인덱스 (빠른 유사도 검색)
create index idx_semantic_cache_embedding
    on public.semantic_cache
    using ivfflat (embedding vector_cosine_ops)
    with (lists = 100);

create index idx_document_chunks_embedding
    on public.document_chunks
    using ivfflat (embedding vector_cosine_ops)
    with (lists = 100);

-- 만료된 캐시 자동 삭제
create index idx_semantic_cache_expires
    on public.semantic_cache(expires_at);

-- RLS (Row Level Security) 활성화
alter table public.profiles enable row level security;
alter table public.conversations enable row level security;
alter table public.messages enable row level security;
alter table public.token_usage enable row level security;
alter table public.document_chunks enable row level security;

-- RLS 정책 — 본인 데이터만 접근 가능
create policy "본인 프로필만 조회"
    on public.profiles for select
    using (auth.uid() = id);

create policy "본인 대화만 조회"
    on public.conversations for all
    using (auth.uid() = user_id);

create policy "본인 메시지만 조회"
    on public.messages for all
    using (
        conversation_id in (
            select id from public.conversations
            where user_id = auth.uid()
        )
    );

create policy "본인 사용량만 조회"
    on public.token_usage for all
    using (auth.uid() = user_id);

Supabase 함수 (RPC)

-- 시맨틱 캐시 유사도 검색 함수
create or replace function match_semantic_cache(
    query_embedding vector(1536),
    match_threshold float,
    match_count int
)
returns table (
    id uuid,
    query text,
    response text,
    similarity float
)
language sql stable
as $$
    select
        id,
        query,
        response,
        1 - (embedding <=> query_embedding) as similarity
    from public.semantic_cache
    where
        expires_at > now()
        and 1 - (embedding <=> query_embedding) > match_threshold
    order by embedding <=> query_embedding
    limit match_count;
$$;

-- RAG 문서 검색 함수
create or replace function match_documents(
    query_embedding vector(1536),
    match_threshold float,
    match_count int,
    p_user_id uuid
)
returns table (
    id uuid,
    content text,
    metadata jsonb,
    similarity float
)
language sql stable
as $$
    select
        id,
        content,
        metadata,
        1 - (embedding <=> query_embedding) as similarity
    from public.document_chunks
    where
        user_id = p_user_id
        and 1 - (embedding <=> query_embedding) > match_threshold
    order by embedding <=> query_embedding
    limit match_count;
$$;

-- 일별 토큰 사용량 집계 함수
create or replace function get_daily_token_usage(p_user_id uuid)
returns table (total bigint)
language sql stable
as $$
    select coalesce(sum(input_tokens + output_tokens), 0) as total
    from public.token_usage
    where
        user_id = p_user_id
        and created_at >= now() - interval '1 day';
$$;

Python에서 Supabase 사용

# db/supabase_client.py
from supabase import create_client, Client
import os

supabase: Client = create_client(
    os.environ["SUPABASE_URL"],
    os.environ["SUPABASE_SERVICE_KEY"]  # 백엔드는 서비스 키 사용
)

# 메시지 저장
async def save_message(conversation_id: str, role: str, content: str, model: str = None):
    supabase.table("messages").insert({
        "conversation_id": conversation_id,
        "role": role,
        "content": content,
        "model": model
    }).execute()

# 대화 기록 불러오기
async def get_conversation_history(conversation_id: str, user_id: str) -> list:
    result = supabase.table("messages") \
        .select("role, content") \
        .eq("conversation_id", conversation_id) \
        .order("created_at") \
        .execute()

    return [{"role": m["role"], "content": m["content"]} for m in result.data]

# RAG 검색
async def search_documents(query_embedding: list, user_id: str) -> list:
    result = supabase.rpc("match_documents", {
        "query_embedding": query_embedding,
        "match_threshold": 0.7,
        "match_count": 5,
        "p_user_id": user_id
    }).execute()

    return [r["content"] for r in result.data]

폴백 전략 — 장애 대응

# services/llm_with_fallback.py

class ResilientLLMService:
    MODELS = [
        {"provider": "anthropic", "model": "claude-sonnet-4-6"},
        {"provider": "openai", "model": "gpt-4o"},
        {"provider": "openai", "model": "gpt-4o-mini"},
    ]

    async def generate_with_fallback(self, messages: list) -> str:
        last_error = None

        for model_config in self.MODELS:
            try:
                response = await self._call_model(model_config, messages)
                if model_config["model"] != "claude-sonnet-4-6":
                    logging.warning(f"Using fallback model: {model_config['model']}")
                return response

            except RateLimitError:
                last_error = "rate_limit"
                continue

            except APIError as e:
                last_error = str(e)
                continue

        raise AllModelsFailedError(f"All models failed. Last error: {last_error}")

프로덕션 체크리스트

✅ 스트리밍 — 모든 대화형 응답은 SSE 방식
✅ 시맨틱 캐시 — Supabase pgvector로 유사 질문 캐시
✅ 사용량 제한 — 사용자별 토큰 예산 설정
✅ 폴백 — 기본 모델 장애 시 대체 모델 자동 전환
✅ 비동기 큐 — 30초 이상 걸리는 작업은 큐로
✅ 토큰 추적 — Supabase token_usage 테이블에 기록
✅ RLS — Row Level Security로 데이터 격리
✅ 에러 처리 — LLM 실패해도 앱이 죽지 않게
✅ 출력 검증 — LLM 응답을 그대로 신뢰하지 않기
✅ 레이트 리밋 — 한 사용자가 무한 루프 돌리면 차단

마무리

AI 네이티브 앱 아키텍처의 핵심 원칙 세 가지예요.

첫째, LLM은 동기 API가 아니다. 무조건 비동기 + 스트리밍으로 설계해야 해요. 동기 처리는 사용자 경험을 망치고 서버를 죽여요.

둘째, 비용은 아키텍처 수준에서 관리해야 한다. 시맨틱 캐시, 토큰 추적, 사용량 제한을 처음부터 넣지 않으면 나중에 청구서 보고 충격받아요.

셋째, LLM은 언제든 실패할 수 있다. 폴백, 재시도, 에러 처리가 없으면 프로덕션에서 버티지 못해요.

Supabase를 쓰면 PostgreSQL + pgvector + Auth + Realtime을 하나로 해결할 수 있어서 초기 인프라 설정이 훨씬 빨라요. 😄

 

반응형