콘텐츠로 건너뛰기

FastAPI와 BigQuery 공개데이터로 REST API 만들기

  • 테크

Google Cloud의 BigQuery는 대규모 데이터를 SQL 기반으로 분석할 수 있는 서비스이며,
FastAPI는 Python 기반의 고성능 웹 프레임워크로 RESTful API 구현에 적합합니다.

이번 포스트에서는 Google이 제공하는 공개 데이터셋인 bigquery-public-data.google_analytics_sample을 FastAPI를 통해 외부에서 호출 가능한 API 형태로 변환함으로써, BigQuery 데이터를 서비스 백엔드 수준에서 다루는 과정을 단계별로 살펴보겠습니다.

1. 개발 환경 구축 및 GCP 설정

FastAPI와 BigQuery 클라이언트를 실행하기 위한 로컬 개발 환경과 GCP 프로젝트를 설정합니다. Python 가상환경을 만들고 필요한 패키지를 설치한 뒤, BigQuery API가 활성화된 GCP 프로젝트를 준비합니다.

1.1 python 및 라이브러리 설치
https://www.python.org/downloads/
설치 후 아래 명령어로 버전을 확인합니다.

python –version

1.2 가상환경 생성 및 활성화

python -m venv .venv
.venv/scripts/activate

1.3 pip 업그레이드 및 라이브러리 설치

python.exe -m pip install --upgrade pip
pip install fastapi[standard]  google-cloud-bigquery google-cloud-bigquery-storage pandas python-dotenv pydantic-settings pyarrow 

1.4 GCP 프로젝트 생성
https://console.cloud.google.com/
새 프로젝트를 생성합니다.

1.5 BigQuery API 활성화
API 및 서비스 → 라이브러리 → BigQuery API → 사용 설정

1.6 서비스 계정 생성
IAM 및 관리자 → 서비스 계정 → 서비스 계정 만들기
권한부여: BigQuery 작업 사용자, 데이터 뷰어

1.7 인증 키 발급
생성된 서비스 계정을 클릭 → 키 → 키 추가 → 새 키 만들기
JSON 형식의 비공개 키 파일을 다운로드 후 프로젝트 루트 디렉터리에 저장합니다.

2. 데이터 셋 소개

Rest API에 활용할 샘플 데이터는 bigquery-public-data.google_analytics_sample 입니다.
Google Merchandise Store의 실제 웹로그 데이터를 기반으로 만들어진 샘플이며, BigQuery에서 누구나 접근할 수 있습니다.

구글 빅쿼리 public ata.google_analytics_sample  이미지
bigquery-public-data.google_analytics_sample.INFORMATION_SCHEMA.COLUMNS

핵심 테이블은 ga_sessions_* 입니다. 날짜 단위로 쪼개져 있으며, 각 세션의 정보가 담겨 있습니다. 스키마는 다소 복잡하지만, 크게 보면 다음과 같은 그룹으로 구성됩니다.

  • 트랜잭션 정보: 구매 여부, 구매 건수, 매출 금액
  • 디바이스 정보: 브라우저, 운영체제, 모바일 여부 등
  • 지역 정보: 국가, 도시, 대륙
  • 행동 정보: 페이지뷰, 세션 길이 등

3. 프로젝트 파일 구조

아래는 FastAPI 애플리케이션의 계층형 구조 예시입니다. 각 디렉터리는 역할별로 모듈화되어 있으며, 의존 방향은 routers → services → repositories → core 순으로 구성됩니다.
현재 예제에서는 app.state.bq를 통해 BigQuery 클라이언트를 전역으로 주입하지만, 규모가 커질 경우 의존성 관리와 리소스 생명주기 제어를 위해 Provider(LOC 컨테이너) 역할을 하는 별도의 모듈이 필요합니다.

pytest/
├── app/
│   ├── main.py                   # FastAPI 앱 진입점
│   │
│   ├── core/                     # BigQuery 클라이언트
│   │   ├── __init__.py
│   │   └── bigquery.py
│   │
│   ├── models/                   # 데이터 모델
│   │   ├── __init__.py
│   │   └── ga_sessions.py
│   │
│   ├── repositories/             # 데이터 접근 계층
│   │   ├── __init__.py
│   │   └── ga_sessions.py
│   │
│   ├── services/                  # 비즈니스 로직 계층
│   │   ├── __init__.py
│   │   └── ga_sessions.py
│   │
│   └── routers/                   # API 엔드포인트 계층
│       ├── __init__.py
│       └── ga_sessions.py
│
└── config/                        # 설정 파일 (BigQuery 인증 키 파일 위치)

4. .env 설정

환경 변수 파일 .env를 통해 BigQuery 자격 증명 경로, 프로젝트 ID, 리전 등의 설정값을 관리합니다.
이 값들은 코드 내에서 os.getenv()로 불러와 클라이언트 초기화에 사용됩니다.

GOOGLE_APPLICATION_CREDENTIALS=
BIGQUERY_PROJECT_ID=
BIGQUERY_LOCATION=US

5. BigQuery 클라이언트 설정

앱 시작 시 FastAPI의 lifespan 컨텍스트 내에서 BigQuery 클라이언트를 초기화하고, 앱 종료 시 세션을 닫습니다.
이 클라이언트는 Depends를 통해 각 요청 핸들러에서 주입됩니다.

#app/core/bigquery.py
from dotenv import load_dotenv
from contextlib import asynccontextmanager
from google.cloud import bigquery
import os
from fastapi import Request

load_dotenv()

@asynccontextmanager
async def lifespan(app):
    client = bigquery.Client.from_service_account_json(
      os.getenv("GOOGLE_APPLICATION_CREDENTIALS"),
      project=os.getenv("BIGQUERY_PROJECT_ID"),
      location=os.getenv("BIGQUERY_LOCATION"),
    )
    app.state.bq = client
    yield
    client.close()

def get_bq_client(request: Request):
    return request.app.state.bq

6. 모델 정의

Pydantic 모델을 사용해 BigQuery 쿼리 결과를 구조화합니다.
필요한 주요 필드만 정의하여 응답 스키마를 간결하게 유지하고, 추가 필드는 무시합니다.

# app/models/ga_sessions.py
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, ConfigDict

class TotalsSlim(BaseModel):
    visits: Optional[int] = None
    hits: Optional[int] = None
    pageviews: Optional[int] = None
    timeOnSite: Optional[int] = None

class TrafficSourceSlim(BaseModel):
    source: Optional[str] = None
    medium: Optional[str] = None
    referralPath: Optional[str] = None

class DeviceSlim(BaseModel):
    browser: Optional[str] = None
    operatingSystem: Optional[str] = None
    isMobile: Optional[bool] = None

class GeoSlim(BaseModel):
    continent: Optional[str] = None
    country: Optional[str] = None

class CustomDimension(BaseModel):
    index: Optional[int] = None
    value: Optional[str] = None

class GaSessionSlim(BaseModel):
    visitorId: Optional[str] = Field(None, description="INT64 → string")
    visitId: Optional[str] = Field(None, description="INT64 → string")
    fullVisitorId: Optional[str] = None

    visitNumber: Optional[int] = None
    visitStartTime: Optional[int] = None
    date: Optional[str] = None
    channelGrouping: Optional[str] = None

    totals: Optional[TotalsSlim] = None
    trafficSource: Optional[TrafficSourceSlim] = None
    device: Optional[DeviceSlim] = None
    geoNetwork: Optional[GeoSlim] = None
    customDimensions: Optional[List[CustomDimension]] = None

    # 선언하지 않은 필드는 무시
    model_config = ConfigDict(extra="ignore")

7. Repository 구현: 공개데이터 조회

Repository 계층은 실제 BigQuery SQL을 실행해 데이터를 가져옵니다.
날짜 포맷 검증을 통해 SQL 인젝션이나 잘못된 요청을 방지하고, 파라미터 바인딩으로 limit을 제어합니다.

# app/repositories/ga_sessions.py
import re
from typing import Iterable, Dict, Any, List
from google.cloud import bigquery

_YMD_RE = re.compile(r"^\d{8}$")

def _validate_yyyymmdd(yyyymmdd: str) -> None:
    if not _YMD_RE.fullmatch(yyyymmdd):
        raise ValueError("yyyymmdd must be 8 digits like 20160801")

def query_sessions(bq: bigquery.Client, yyyymmdd: str, limit: int) -> Iterable[Dict[str, Any]]:
    _validate_yyyymmdd(yyyymmdd)
    table = f"`bigquery-public-data.google_analytics_sample.ga_sessions_{yyyymmdd}`"

    sql = f"""
    SELECT
      CAST(visitorId AS STRING)  AS visitorId,
      CAST(visitId   AS STRING)  AS visitId,
      fullVisitorId,
      visitNumber,
      visitStartTime,
      date,
      channelGrouping,
      totals.visits              AS totals_visits,
      totals.hits                AS totals_hits,
      totals.pageviews           AS totals_pageviews,
      totals.timeOnSite          AS totals_timeOnSite,
      trafficSource.source       AS trafficSource_source,
      trafficSource.medium       AS trafficSource_medium,
      trafficSource.referralPath AS trafficSource_referralPath,
      device.browser             AS device_browser,
      device.operatingSystem     AS device_operatingSystem,
      device.isMobile            AS device_isMobile,
      geoNetwork.continent       AS geoNetwork_continent,
      geoNetwork.country         AS geoNetwork_country,
      customDimensions           AS customDimensions
    FROM {table}
    LIMIT @limit
    """

    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("limit", "INT64", limit)]
    )
    rows = bq.query(sql, job_config=job_config).result()
    for r in rows:
        yield {k: r.get(k) for k in r.keys()}

8. Service

Service 계층은 Repository에서 가져온 flat한 쿼리 결과를 Pydantic 모델 형태로 변환합니다.
이 과정을 통해 API 응답이 일관된 구조(GaSessionSlim)를 유지합니다.

# app/services/ga_sessions.py
from typing import Dict, Any, List
from google.cloud.bigquery import Client
from app.models.ga_sessions import GaSessionSlim
from app.repositories.ga_sessions import query_sessions

def _nest(record: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "visitorId": record.get("visitorId"),
        "visitId": record.get("visitId"),
        "fullVisitorId": record.get("fullVisitorId"),
        "visitNumber": record.get("visitNumber"),
        "visitStartTime": record.get("visitStartTime"),
        "date": record.get("date"),
        "channelGrouping": record.get("channelGrouping"),
        "totals": {
            "visits":     record.get("totals_visits"),
            "hits":       record.get("totals_hits"),
            "pageviews":  record.get("totals_pageviews"),
            "timeOnSite": record.get("totals_timeOnSite"),
        },
        "trafficSource": {
            "source":       record.get("trafficSource_source"),
            "medium":       record.get("trafficSource_medium"),
            "referralPath": record.get("trafficSource_referralPath"),
        },
        "device": {
            "browser":         record.get("device_browser"),
            "operatingSystem": record.get("device_operatingSystem"),
            "isMobile":        record.get("device_isMobile"),
        },
        "geoNetwork": {
            "continent": record.get("geoNetwork_continent"),
            "country":   record.get("geoNetwork_country"),
        },
        "customDimensions": record.get("customDimensions"),
    }

def to_ga_sessions(records: List[Dict[str, Any]]) -> List[GaSessionSlim]:
    return [GaSessionSlim(**_nest(r)) for r in records]

def get_ga_sessions(bq: Client, yyyymmdd: str, limit: int) -> List[GaSessionSlim]:
    records = list(query_sessions(bq, yyyymmdd, limit))
    return to_ga_sessions(records)

9. Router 및 요청 예시

라우터는 /ga_sessions/{yyyymmdd} 형태의 엔드포인트를 제공하며, limit 쿼리 파라미터로 결과 개수를 제한합니다.

# app/routers/ga_sessions.py
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query
from google.cloud.bigquery import Client
from app.core.bigquery import get_bq_client
from app.models.ga_sessions import GaSessionSlim
from app.services.ga_sessions import get_ga_sessions

router = APIRouter(prefix="/ga_sessions", tags=["ga_sessions"])

@router.get("/{yyyymmdd}", response_model=List[GaSessionSlim])
def list_ga_sessions(
    yyyymmdd: str,
    limit: int = Query(100, ge=1, le=1000),
    bq: Client = Depends(get_bq_client),
):
    try:
        return get_ga_sessions(bq, yyyymmdd, limit)
    except ValueError as ve:
        raise HTTPException(status_code=400, detail=str(ve))
    except Exception as e:
        raise HTTPException(status_code=500, detail="Query failed")

10. 실행 및 테스트

아래 명령어로 로컬 서버를 실행한 후 브라우저에서 /ga_sessions/20160801 엔드포인트를 호출해 응답을 확인합니다.

uvicorn app.main:app --reload --host 127.0.0.1 --port 8000
/ga_sessions/20160801 호출 결과

이번 포스트에서는 다음 과정을 통해 BigQuery 공개 데이터셋을 REST API 형태로 제공하는 전체 흐름을 구현했습니다. 공개 데이터셋뿐 아니라 다른 BigQuery 프로젝트에도 동일한 구조를 적용할 수 있으며, 인증·캐싱·비동기 쿼리 처리 등을 추가해 확장할 수도 있습니다.

Ref.
FastAPI
구글 애널리틱스 샘플

최신 마케팅/고객 데이터 활용 사례를 받아보실 수 있습니다.

비즈스프링 뉴스레터 구독하기 →

"~에 맞는 제품 추천해줘" 잠재고객은 이제 검색창이 아닌 AI에게 묻습니다. 당신의 브랜드는 AI 대화창에서 추천되고 있습니까?

X