Google Cloud의 BigQuery는 대규모 데이터를 SQL 기반으로 분석할 수 있는 서비스이며,
FastAPI는 Python 기반의 고성능 웹 프레임워크로 RESTful API 구현에 적합합니다.
이번 포스트에서는 Google이 제공하는 공개 데이터셋인 bigquery-public-data.google_analytics_sample을 FastAPI를 통해 외부에서 호출 가능한 API 형태로 변환함으로써, BigQuery 데이터를 서비스 백엔드 수준에서 다루는 과정을 단계별로 살펴보겠습니다.
1. 개발 환경 구축 및 GCP 설정
FastAPI와 BigQuery 클라이언트를 실행하기 위한 로컬 개발 환경과 GCP 프로젝트를 설정합니다. Python 가상환경을 만들고 필요한 패키지를 설치한 뒤, BigQuery API가 활성화된 GCP 프로젝트를 준비합니다.
1.1 python 및 라이브러리 설치
https://www.python.org/downloads/
설치 후 아래 명령어로 버전을 확인합니다.
python –version
1.2 가상환경 생성 및 활성화
python -m venv .venv
.venv/scripts/activate
1.3 pip 업그레이드 및 라이브러리 설치
python.exe -m pip install --upgrade pip
pip install fastapi[standard] google-cloud-bigquery google-cloud-bigquery-storage pandas python-dotenv pydantic-settings pyarrow
1.4 GCP 프로젝트 생성
https://console.cloud.google.com/
새 프로젝트를 생성합니다.
1.5 BigQuery API 활성화
API 및 서비스 → 라이브러리 → BigQuery API → 사용 설정
1.6 서비스 계정 생성
IAM 및 관리자 → 서비스 계정 → 서비스 계정 만들기
권한부여: BigQuery 작업 사용자, 데이터 뷰어
1.7 인증 키 발급
생성된 서비스 계정을 클릭 → 키 → 키 추가 → 새 키 만들기
JSON 형식의 비공개 키 파일을 다운로드 후 프로젝트 루트 디렉터리에 저장합니다.
2. 데이터 셋 소개
Rest API에 활용할 샘플 데이터는 bigquery-public-data.google_analytics_sample 입니다.
Google Merchandise Store의 실제 웹로그 데이터를 기반으로 만들어진 샘플이며, BigQuery에서 누구나 접근할 수 있습니다.
핵심 테이블은 ga_sessions_* 입니다. 날짜 단위로 쪼개져 있으며, 각 세션의 정보가 담겨 있습니다. 스키마는 다소 복잡하지만, 크게 보면 다음과 같은 그룹으로 구성됩니다.
- 트랜잭션 정보: 구매 여부, 구매 건수, 매출 금액
- 디바이스 정보: 브라우저, 운영체제, 모바일 여부 등
- 지역 정보: 국가, 도시, 대륙
- 행동 정보: 페이지뷰, 세션 길이 등
3. 프로젝트 파일 구조
아래는 FastAPI 애플리케이션의 계층형 구조 예시입니다. 각 디렉터리는 역할별로 모듈화되어 있으며, 의존 방향은 routers → services → repositories → core 순으로 구성됩니다.
현재 예제에서는 app.state.bq를 통해 BigQuery 클라이언트를 전역으로 주입하지만, 규모가 커질 경우 의존성 관리와 리소스 생명주기 제어를 위해 Provider(LOC 컨테이너) 역할을 하는 별도의 모듈이 필요합니다.
pytest/
├── app/
│ ├── main.py # FastAPI 앱 진입점
│ │
│ ├── core/ # BigQuery 클라이언트
│ │ ├── __init__.py
│ │ └── bigquery.py
│ │
│ ├── models/ # 데이터 모델
│ │ ├── __init__.py
│ │ └── ga_sessions.py
│ │
│ ├── repositories/ # 데이터 접근 계층
│ │ ├── __init__.py
│ │ └── ga_sessions.py
│ │
│ ├── services/ # 비즈니스 로직 계층
│ │ ├── __init__.py
│ │ └── ga_sessions.py
│ │
│ └── routers/ # API 엔드포인트 계층
│ ├── __init__.py
│ └── ga_sessions.py
│
└── config/ # 설정 파일 (BigQuery 인증 키 파일 위치)
4. .env 설정
환경 변수 파일 .env를 통해 BigQuery 자격 증명 경로, 프로젝트 ID, 리전 등의 설정값을 관리합니다.
이 값들은 코드 내에서 os.getenv()로 불러와 클라이언트 초기화에 사용됩니다.
GOOGLE_APPLICATION_CREDENTIALS=
BIGQUERY_PROJECT_ID=
BIGQUERY_LOCATION=US
5. BigQuery 클라이언트 설정
앱 시작 시 FastAPI의 lifespan 컨텍스트 내에서 BigQuery 클라이언트를 초기화하고, 앱 종료 시 세션을 닫습니다.
이 클라이언트는 Depends를 통해 각 요청 핸들러에서 주입됩니다.
#app/core/bigquery.py
from dotenv import load_dotenv
from contextlib import asynccontextmanager
from google.cloud import bigquery
import os
from fastapi import Request
load_dotenv()
@asynccontextmanager
async def lifespan(app):
client = bigquery.Client.from_service_account_json(
os.getenv("GOOGLE_APPLICATION_CREDENTIALS"),
project=os.getenv("BIGQUERY_PROJECT_ID"),
location=os.getenv("BIGQUERY_LOCATION"),
)
app.state.bq = client
yield
client.close()
def get_bq_client(request: Request):
return request.app.state.bq
6. 모델 정의
Pydantic 모델을 사용해 BigQuery 쿼리 결과를 구조화합니다.
필요한 주요 필드만 정의하여 응답 스키마를 간결하게 유지하고, 추가 필드는 무시합니다.
# app/models/ga_sessions.py
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field, ConfigDict
class TotalsSlim(BaseModel):
visits: Optional[int] = None
hits: Optional[int] = None
pageviews: Optional[int] = None
timeOnSite: Optional[int] = None
class TrafficSourceSlim(BaseModel):
source: Optional[str] = None
medium: Optional[str] = None
referralPath: Optional[str] = None
class DeviceSlim(BaseModel):
browser: Optional[str] = None
operatingSystem: Optional[str] = None
isMobile: Optional[bool] = None
class GeoSlim(BaseModel):
continent: Optional[str] = None
country: Optional[str] = None
class CustomDimension(BaseModel):
index: Optional[int] = None
value: Optional[str] = None
class GaSessionSlim(BaseModel):
visitorId: Optional[str] = Field(None, description="INT64 → string")
visitId: Optional[str] = Field(None, description="INT64 → string")
fullVisitorId: Optional[str] = None
visitNumber: Optional[int] = None
visitStartTime: Optional[int] = None
date: Optional[str] = None
channelGrouping: Optional[str] = None
totals: Optional[TotalsSlim] = None
trafficSource: Optional[TrafficSourceSlim] = None
device: Optional[DeviceSlim] = None
geoNetwork: Optional[GeoSlim] = None
customDimensions: Optional[List[CustomDimension]] = None
# 선언하지 않은 필드는 무시
model_config = ConfigDict(extra="ignore")
7. Repository 구현: 공개데이터 조회
Repository 계층은 실제 BigQuery SQL을 실행해 데이터를 가져옵니다.
날짜 포맷 검증을 통해 SQL 인젝션이나 잘못된 요청을 방지하고, 파라미터 바인딩으로 limit을 제어합니다.
# app/repositories/ga_sessions.py
import re
from typing import Iterable, Dict, Any, List
from google.cloud import bigquery
_YMD_RE = re.compile(r"^\d{8}$")
def _validate_yyyymmdd(yyyymmdd: str) -> None:
if not _YMD_RE.fullmatch(yyyymmdd):
raise ValueError("yyyymmdd must be 8 digits like 20160801")
def query_sessions(bq: bigquery.Client, yyyymmdd: str, limit: int) -> Iterable[Dict[str, Any]]:
_validate_yyyymmdd(yyyymmdd)
table = f"`bigquery-public-data.google_analytics_sample.ga_sessions_{yyyymmdd}`"
sql = f"""
SELECT
CAST(visitorId AS STRING) AS visitorId,
CAST(visitId AS STRING) AS visitId,
fullVisitorId,
visitNumber,
visitStartTime,
date,
channelGrouping,
totals.visits AS totals_visits,
totals.hits AS totals_hits,
totals.pageviews AS totals_pageviews,
totals.timeOnSite AS totals_timeOnSite,
trafficSource.source AS trafficSource_source,
trafficSource.medium AS trafficSource_medium,
trafficSource.referralPath AS trafficSource_referralPath,
device.browser AS device_browser,
device.operatingSystem AS device_operatingSystem,
device.isMobile AS device_isMobile,
geoNetwork.continent AS geoNetwork_continent,
geoNetwork.country AS geoNetwork_country,
customDimensions AS customDimensions
FROM {table}
LIMIT @limit
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("limit", "INT64", limit)]
)
rows = bq.query(sql, job_config=job_config).result()
for r in rows:
yield {k: r.get(k) for k in r.keys()}
8. Service
Service 계층은 Repository에서 가져온 flat한 쿼리 결과를 Pydantic 모델 형태로 변환합니다.
이 과정을 통해 API 응답이 일관된 구조(GaSessionSlim)를 유지합니다.
# app/services/ga_sessions.py
from typing import Dict, Any, List
from google.cloud.bigquery import Client
from app.models.ga_sessions import GaSessionSlim
from app.repositories.ga_sessions import query_sessions
def _nest(record: Dict[str, Any]) -> Dict[str, Any]:
return {
"visitorId": record.get("visitorId"),
"visitId": record.get("visitId"),
"fullVisitorId": record.get("fullVisitorId"),
"visitNumber": record.get("visitNumber"),
"visitStartTime": record.get("visitStartTime"),
"date": record.get("date"),
"channelGrouping": record.get("channelGrouping"),
"totals": {
"visits": record.get("totals_visits"),
"hits": record.get("totals_hits"),
"pageviews": record.get("totals_pageviews"),
"timeOnSite": record.get("totals_timeOnSite"),
},
"trafficSource": {
"source": record.get("trafficSource_source"),
"medium": record.get("trafficSource_medium"),
"referralPath": record.get("trafficSource_referralPath"),
},
"device": {
"browser": record.get("device_browser"),
"operatingSystem": record.get("device_operatingSystem"),
"isMobile": record.get("device_isMobile"),
},
"geoNetwork": {
"continent": record.get("geoNetwork_continent"),
"country": record.get("geoNetwork_country"),
},
"customDimensions": record.get("customDimensions"),
}
def to_ga_sessions(records: List[Dict[str, Any]]) -> List[GaSessionSlim]:
return [GaSessionSlim(**_nest(r)) for r in records]
def get_ga_sessions(bq: Client, yyyymmdd: str, limit: int) -> List[GaSessionSlim]:
records = list(query_sessions(bq, yyyymmdd, limit))
return to_ga_sessions(records)
9. Router 및 요청 예시
라우터는 /ga_sessions/{yyyymmdd} 형태의 엔드포인트를 제공하며, limit 쿼리 파라미터로 결과 개수를 제한합니다.
# app/routers/ga_sessions.py
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query
from google.cloud.bigquery import Client
from app.core.bigquery import get_bq_client
from app.models.ga_sessions import GaSessionSlim
from app.services.ga_sessions import get_ga_sessions
router = APIRouter(prefix="/ga_sessions", tags=["ga_sessions"])
@router.get("/{yyyymmdd}", response_model=List[GaSessionSlim])
def list_ga_sessions(
yyyymmdd: str,
limit: int = Query(100, ge=1, le=1000),
bq: Client = Depends(get_bq_client),
):
try:
return get_ga_sessions(bq, yyyymmdd, limit)
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail="Query failed")
10. 실행 및 테스트
아래 명령어로 로컬 서버를 실행한 후 브라우저에서 /ga_sessions/20160801 엔드포인트를 호출해 응답을 확인합니다.
uvicorn app.main:app --reload --host 127.0.0.1 --port 8000
이번 포스트에서는 다음 과정을 통해 BigQuery 공개 데이터셋을 REST API 형태로 제공하는 전체 흐름을 구현했습니다. 공개 데이터셋뿐 아니라 다른 BigQuery 프로젝트에도 동일한 구조를 적용할 수 있으며, 인증·캐싱·비동기 쿼리 처리 등을 추가해 확장할 수도 있습니다.
Ref.
FastAPI
구글 애널리틱스 샘플
최신 마케팅/고객 데이터 활용 사례를 받아보실 수 있습니다.


