[LLM 경제 뉴스 요약 서비스] # 02. FastAPI + PostgreSQL로 데이터 적재 REST API 구축

2026. 1. 3. 21:54Programming/Project

지난 n8n 워크플로우 구축에 이어서,
이번에는 DB에 데이터를 적재하는 간단한 REST API를 구축해보겠습니다.

FastAPI 기본 설정

환경

  • 패키지 매니저: poetry
  • 웹 프레임워크: FastAPI
  • ASGI 서버: uvicorn

FastAPI는 asyncio 기반의 비동기 프레임워크이기 때문에
DB 연결 역시 비동기 엔진(create_async_engine) 을 사용합니다.

  • create_engine
    • 동기(synchronous) 환경
    • 일반적인 Python 코드 기반
  • create_async_engine
    • 비동기(asynchronous) 환경
    • async/await 기반
    • AsyncSession + 비동기 드라이버(asyncpg, aiosqlite) 필요

핵심은 동기 vs 비동기,
즉 일반적인 Python 코드 흐름과 현대적인 비동기 애플리케이션 환경의 차이입니다.


아키텍처

구조

  1. Presentation Layer (클라이언트 요청 처리)
    • router
  2. Business Layer (핵심 비즈니스 로직 구현)
    • usecase / service(단순 CRUD 함수) / query
  3. Persistence Layer (데이터베이스 접근)
    • repository / query

src
├── application # usecase, query, service 묶음
│   ├── dto # 메인 DTO 모델
│   ├── query
│   ├── service
│   └── usecase
├── domain # 도메인 엔티티, repository 인터페이스
├── infrastructure
│   ├── config # 환경변수 
│   ├── database # session 객체
│   │   └── dao # db orm 객체
│   └── repository # repository 구현체
└── user_interface
    └── restapi
        ├── dto
        └── router # API 엔드포인트

 

설계 의도

  • Layered Architecture
    • 비즈니스 로직은 application 레이어에 집중
    • user_interface에서는 비즈니스 로직 제거
    • application 레이어의 DTO를 기준 모델로 사용
  • CQRS 패턴
    • 쓰기 작업: usecase
    • 읽기 작업: query
    • 읽기 쿼리 최적화 이점만 선택적으로 적용

PostgreSQL DB 연결

이제 DB를 세팅해보겠습니다-

환경 의존성, 이후 배포환경에서 컨테이너로 관리할 것을 고려해 Docker로 띄워서 구성하겠습니다.

1. Docker 이미지 pull

docker pull postgres:latest

2. Docker 컨테이너 실행

설정할 DB 비밀번호/이름/사용자 정보를 입력해서 실행해줍니다.

docker rm -f postgres || true
docker run --name postgres -itd -p 5432:5432 -e POSTGRES_PASSWORD=<DB비밀번호> -e POSTGRES_DB=<DB이름> -e POSTGRES_USER=<DB사용자> postgres:latest

 

MakeFile ver.
프로젝트 내에서 자주 사용하는 명령어는 MakeFile로 관리해줬습니다.
DOCKER := $(shell \[ -n "$$(command -v podman 2>/dev/null)" \] && echo podman || echo docker)

init\_db\_container:  
$(DOCKER) rm -f postgres || true  
$(DOCKER) run --name postgres -itd -p 5432:5432 -e POSTGRES\_PASSWORD=<DB비밀번호> -e POSTGRES\_DB=<DB이름> -e POSTGRES\_USER=<DB사용자> postgres:latest

3. 테스트

이제 PGAdmin에서 연결 가능합니다.


DB 모델링

이제 SQLAlchemy를 활용해서,
기사 요약 테이블(Summary) DAO를 모델링해보겠습니다.

Summary 테이블

id, content(내용), keywords(키워드 리스트), created_at(생성일) column을 선언했습니다.

# dao/summary.py
from datetime import datetime
from sqlalchemy import ARRAY, String, text
from sqlalchemy.orm import Mapped, mapped_column

from src.infrastructure.database.dao.base import Base, UTCTimestamp

class SummaryDao(Base):
    __tablename__ = "summary"

    id: Mapped[int] = mapped_column("summary_id", primary_key=True)
    content: Mapped[str] = mapped_column("content", String(500), nullable=False)
    keywords: Mapped[list[str]] = mapped_column("keywords", ARRAY(String), nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        "created_at",
        UTCTimestamp,
        server_default=text("CURRENT_TIMESTAMP"),
    )
[SQLAlchemy 2.0 변경점]
SQLAlchemy 2.0부터는 Mapped[T] + mapped_column() 문법 도입
(선택) [Base + UTC Timestamp]
Base 모델과 타임stamp 일관성을 위한 공용 모델을 선언해줬습니다.
# dao/base.py
from datetime import UTC
from sqlalchemy import TIMESTAMP, TypeDecorator
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
# 추후 공통 util 처리를 위함.
    pass

class UTCTimestamp(TypeDecorator):
    impl = TIMESTAMP
    cache_ok = True

    def process_bind_param(self, value, _):
        if value is not None:
            if not value.tzinfo:
                raise TypeError("tzinfo is required")
            value = value.astimezone(UTC).replace(tzinfo=None)
        return value

    def process_result_value(self, value, _):
        if value is not None:
            value = value.replace(tzinfo=UTC)
        return value

DB 마이그레이션 설정 (Alembic)

위 코드로 선언한 모델을 DB에 반영하기 위한 마이그레이션 도구로 Alembic을 설정해보겠습니다.

1. 설치 후 초기화

설치 후 init 명령어를 수행하면,

alembic.ini 파일과 migrations/ 폴더가 생성됩니다

poetry add alembic
alembic init migrations

2. evn.py

Alembic이 내 소스 코드의 Base 모델을 인식할 수 있도록 migrations/env.py 파일을 수정해야 합니다.

# migrations/env.py 상단
from my_app.models import Base  # 본인의 Base 클래스 경로
target_metadata = Base.metadata

# 파일 내의 sqlalchemy.url 부분은 
# 실제 DB 접속 정보(예: postgresql://user:pw@localhost/dbname)로 수정해야 합니다.

3. 마이그레이션 파일 생성

명령어를 통해 자동으로 마이그레이션 파일이 생성됩니다.

alembic revision --autogenerate -m "create summary table"

 

def upgrade() -> None:  
"""Upgrade schema."""  
\# ### commands auto generated by Alembic - please adjust! ###  
op.create\_table('summary',  
sa.Column('summary\_id', sa.Integer(), nullable=False),  
sa.Column('content', sa.String(length=500), nullable=False),  
sa.Column('keywords', sa.ARRAY(sa.String()), nullable=True),  
sa.PrimaryKeyConstraint('summary\_id')  
)  
\# ### end Alembic commands ###

4. DB에 적용

최상위 버전으로 DB를 업그레이드 해줍니다.

alembic upgrade head

 

테스트 확인


API 구현

이제 (GET/POST)API를 구현해보겠습니다.

GET /summary

DTO

요약 내용들은 list로 받기 위해,

PaginationDto와 response 모델을 선언해줬습니다.

class PaginationDto(BaseModel):
    total: int
    offset: int
    limit: int
    asc: bool
    
class SummaryGetResponse(BaseModel):
    id: int
    content: str
    keywords: list[str]
    created_at: datetime

class SummaryGetResponseWithPagination(PaginationDto):
    summaries: list[SummaryGetResponse]

Query 로직 (비지니스 계층)

  • count 조회
  • created_at 기준 정렬
  • offset / limit 페이징
class SummaryQuery:
    @staticmethod
    async def get_list(
        session: AsyncSession,
        offset: int,
        limit: int,
        asc: bool,
    ):
        base_query = select(SummaryDao)

        total_count: int | None = await session.scalar(
            select(func.count()).select_from(base_query)
        )

        query = base_query.order_by(
            SummaryDao.created_at.asc() if asc else SummaryDao.created_at.desc()
        )

        rows = (await session.execute(query)).unique().all()[offset : offset + limit]
        result: dict[str, int | bool | list] = {
            "total": total_count if total_count else 0,
            "offset": offset,
            "limit": limit,
            "asc": asc,
        }
        summaries = []

        for [summary_dao] in rows:
            summary = SummaryMapper.dao_to_entity(summary_dao)
            summaries.append(summary.model_dump())

        result["summaries"] = summaries
        return result

Router

위에서 만든 모델과 Query 함수로 router를 구성해줍니다.

summary_router = APIRouter(
    prefix="/summary",
    tags=["summary"],
)
SessionDeps = typing.Annotated[AsyncSession, Depends(get_db)]


@summary_router.get(
    "",
    response_model=SummaryGetResponseWithPagination,
)
async def get_summary_list_v5(
    session: SessionDeps,
    limit: int = Query(10, description="페이지 당 데이터 개수"),
    offset: int = Query(0, description="페이지 인덱스"),
    asc: bool = Query(False, description="오름차순 정렬 여부"),
):
    return await SummaryQuery.get_list(
        session,
        offset=offset,
        limit=limit,
        asc=asc,
    )

 

POST /summary

Persistence 계층

Repository 클래스(app-db 연결 레벨)의 bulk함수에 리스트를 add_all 해주는 함수

# infrastructure/repository/summary.py


class SummaryRepository(IBaseRepository[Summary]):
    def __init__(self, session: AsyncSession):
        self.session = session
	...
    async def bulk(self, entities: list[Summary]) -> None:
        daos = [SummaryMapper.entity_to_dao(entity) for entity in entities]
        self.session.add_all(daos)

 

Usecase 비지니스 계층

service(단순 CRUD 수준의 query만 존재) 과 usecase에 create_list 함수를 선업해줍니다.

# application/service/summary.py

class SummaryService:
    def __init__(self, session: AsyncSession):
        self.session = session

        self._summary_repo = SummaryRepository(session)

    async def create_list(
        self,
        summary_list_data: list[CreateSummaryData],
    ) -> list[Summary]:
        return await self._summary_repo.bulk(
            [
                Summary(content=summary.content, keywords=summary.keywords)
                for summary in summary_list_data
            ]
        )
        
# application/usecase/summary.py

class SummaryUsecase(BaseUseCase):
    def __init__(
        self,
        session: AsyncSession,
    ):
        super().__init__(session)
        self._summary_service = SummaryService(session)

    async def create_list(
        self,
        summary_list_create_data: list[CreateSummaryData],
    ) -> None:
        return await self._summary_service.create_list(summary_list_create_data)

 

Router

위에서 만든 usecase 함수로 router를 구성해줍니다.

@summary_router.post("", status_code=status.HTTP_201_CREATED, response_model=None)
async def create_project_v5(
    session: SessionDeps,
    body: SummaryListCreateData,
):
    async with SummaryUsecase(session) as usecase:
        return await usecase.create_list(body.summary_list)

테스트

서버 실행 후 postman으로 테스트 해보면 정상 동작을 확인할 수 있습니다!

uvicorn src.user_interface.restapi:app --reload

 



다음 포스팅에서는 배포 후 n8n 워크플로우랑 연결까지 해보겠습니다.

 

간단 회고

 

간단한 API 기능이지만 이후 확장될 서비스를 고려해

아키텍쳐 구조와 패턴 내용도 담아봤어요-

 

개발 프로젝트를 하게 될 수록

서비스의 구조를 초반에 어떻게 잡느냐.. 그 설계의 중요성을 뼈저리게 느끼게 되더라구요.

 

이전 직장에서 사용하던 구조를 배워 적용해 봤는데,

직접 제로부터 구축하니 어떤 문제를 고려했는지 다시 짚어볼 수 있어 의미있었네요.

 

반응형