FastAPI + Next.js JWT 인증과 TOTP 2FA 구현
JWT 3종 토큰 전략, pyotp TOTP 2FA, Next.js Edge 미들웨어 인증을 아우르는 실전 보안 구현
인증 아키텍처 전체 플로우
외부에 노출되는 웹 애플리케이션에서는 견고한 인증 시스템이 필수다. 이 글에서는 FastAPI 백엔드의 JWT + TOTP 2FA 인증과 Next.js 프론트엔드 미들웨어의 토큰 검증을 조합한 인증 아키텍처를 구현한다.
시스템 구성
인증 플로우 4단계
전체 인증 과정은 다음 4단계로 이루어진다.
1단계: 로그인 (비밀번호 검증)
POST /api/auth/login { username, password }
→ 200 { requires_2fa: true, temp_token: "..." }2단계: 2FA 검증
POST /api/auth/verify-2fa { temp_token, otp_code }
→ 200 { access_token, refresh_token }3단계: API 요청
GET /api/account/balance
Authorization: Bearer <access_token>
→ 200 { ... }4단계: 토큰 갱신
POST /api/auth/refresh { refresh_token }
→ 200 { access_token, new_refresh_token }JWT 3종 토큰 전략
단일 토큰 대신 용도별로 분리된 3종 토큰을 사용한다. 각 토큰은 type 클레임으로 구분되며, 만료 시간과 저장 위치가 다르다.
| 토큰 | 만료 | 저장 위치 | 용도 |
|---|---|---|---|
| Access Token | 30분 | 메모리 (JS 변수) | API 요청 인증 |
| Refresh Token | 7일 | HttpOnly Cookie | Access Token 갱신 |
| Temp Token | 5분 | 메모리 | 2FA 대기 중 임시 인증 |
Access Token을 메모리에만 저장하면 XSS 공격으로 토큰이 탈취될 위험이 줄어든다. Refresh Token은
HttpOnly+Secure+SameSite=Strict쿠키에 저장하여 JavaScript에서 접근할 수 없게 한다.
의존성 설치
# pyproject.toml
[project]
dependencies = [
"fastapi>=0.115",
"python-jose[cryptography]>=3.3", # JWT
"passlib[bcrypt]>=1.7", # 비밀번호 해싱
"pyotp>=2.9", # TOTP 2FA
"qrcode[pil]>=7.4", # QR 코드 생성
]인증 설정
# app/auth/config.py
from pydantic_settings import BaseSettings
class AuthSettings(BaseSettings):
# JWT
jwt_secret: str # openssl rand -hex 32
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 30 # 30분
refresh_token_expire_days: int = 7 # 7일
temp_token_expire_minutes: int = 5 # 2FA 대기 토큰 5분
# TOTP
totp_issuer: str = "PersonalTrader"
totp_interval: int = 30 # 30초 주기
# 보안
max_login_attempts: int = 5 # 5회 실패 시 잠금
lockout_minutes: int = 15 # 15분 잠금
class Config:
env_prefix = "AUTH_"비밀번호 해싱
평문 비밀번호를 절대 저장하지 않는다. bcrypt를 사용해 단방향 해싱한다.
# app/auth/password.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)JWT 토큰 생성 및 검증
3종 토큰은 모두 같은 시크릿 키로 서명하되, type 클레임으로 용도를 구분한다.
# app/auth/jwt.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from app.auth.config import AuthSettings
settings = AuthSettings()
def create_access_token(
sub: str,
extra: dict | None = None,
expires_delta: timedelta | None = None,
) -> str:
"""Access Token 생성 (기본 30분)."""
now = datetime.now(timezone.utc)
expire = now + (expires_delta or timedelta(
minutes=settings.access_token_expire_minutes
))
payload = {
"sub": sub,
"exp": expire,
"iat": now,
"type": "access",
}
if extra:
payload.update(extra)
return jwt.encode(
payload, settings.jwt_secret, algorithm=settings.jwt_algorithm
)
def create_refresh_token(sub: str) -> str:
"""Refresh Token 생성 (7일)."""
now = datetime.now(timezone.utc)
expire = now + timedelta(days=settings.refresh_token_expire_days)
payload = {
"sub": sub,
"exp": expire,
"iat": now,
"type": "refresh",
}
return jwt.encode(
payload, settings.jwt_secret, algorithm=settings.jwt_algorithm
)
def create_temp_token(sub: str) -> str:
"""2FA 대기용 임시 토큰 (5분)."""
now = datetime.now(timezone.utc)
expire = now + timedelta(
minutes=settings.temp_token_expire_minutes
)
payload = {
"sub": sub,
"exp": expire,
"iat": now,
"type": "temp_2fa",
}
return jwt.encode(
payload, settings.jwt_secret, algorithm=settings.jwt_algorithm
)
def decode_token(token: str, expected_type: str = "access") -> dict:
"""JWT 토큰 디코딩 및 type 클레임 검증."""
payload = jwt.decode(
token, settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
if payload.get("type") != expected_type:
raise ValueError(
f"Expected '{expected_type}', got '{payload.get('type')}'"
)
return payloadFastAPI 의존성 주입
보호가 필요한 라우트에 Depends를 사용해 인증을 적용한다.
# app/auth/deps.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from app.auth.jwt import decode_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
) -> dict:
"""현재 인증된 사용자 반환."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token, expected_type="access")
username: str | None = payload.get("sub")
if username is None:
raise credentials_exception
except (JWTError, ValueError):
raise credentials_exception
return {"username": username}
# 보호 라우트에서 사용
CurrentUser = Annotated[dict, Depends(get_current_user)]pyotp TOTP 구현과 QR 코드
TOTP 동작 원리
TOTP(Time-based One-Time Password)는 공유 비밀키와 현재 시간을 기반으로 일회용 코드를 생성한다. 서버와 클라이언트(인증 앱)가 동일한 비밀키를 보유하고, 30초 간격으로 같은 6자리 코드를 생성한다.
pyotp 기본 사용법
import pyotp
# 비밀키 생성 (사용자별 1회)
secret = pyotp.random_base32() # 'JBSWY3DPEHPK3PXP' 형태
# TOTP 객체 생성
totp = pyotp.TOTP(secret)
# 현재 OTP 코드 (6자리)
code = totp.now() # '492039'
# 코드 검증
totp.verify("492039") # True (30초 이내)
totp.verify("492039") # False (30초 경과 후)
# QR 코드용 프로비저닝 URI
uri = totp.provisioning_uri(
name="user@example.com",
issuer_name="PersonalTrader",
)
# → 'otpauth://totp/PersonalTrader:user@example.com?secret=...&issuer=PersonalTrader'2FA 설정 엔드포인트
# app/auth/routes.py
import io
import pyotp
import qrcode
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
router = APIRouter(prefix="/api/auth", tags=["auth"])
class OTPSetupResponse(BaseModel):
otp_secret: str
otp_uri: str
@router.post("/2fa/setup", response_model=OTPSetupResponse)
async def setup_2fa(current_user: CurrentUser):
"""2FA 설정 - OTP 비밀키 및 QR URI 반환."""
secret = pyotp.random_base32()
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(
name=current_user["username"],
issuer_name="PersonalTrader",
)
# DB에 otp_secret 임시 저장 (활성화 전)
# await user_repo.set_pending_otp_secret(username, secret)
return OTPSetupResponse(otp_secret=secret, otp_uri=uri)
@router.get("/2fa/qrcode")
async def get_2fa_qrcode(current_user: CurrentUser):
"""2FA QR 코드 이미지 반환."""
# DB에서 pending otp_secret 조회
secret = "PENDING_SECRET" # 실제 구현 시 DB 조회
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(
name=current_user["username"],
issuer_name="PersonalTrader",
)
img = qrcode.make(uri)
buf = io.BytesIO()
img.save(buf, format="PNG")
buf.seek(0)
return StreamingResponse(buf, media_type="image/png")사용자 플로우:
/2fa/setup호출 →otp_secret과otp_uri수신otp_uri를 QR 코드로 변환하거나/2fa/qrcode엔드포인트에서 이미지 수신- Google Authenticator 등에서 QR 스캔
- 인증 앱에 표시되는 코드로 검증 → 2FA 활성화
로그인 플로우 (비밀번호 + 2FA)
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
requires_2fa: bool = False
temp_token: str | None = None
access_token: str | None = None
refresh_token: str | None = None
token_type: str = "bearer"
@router.post("/login", response_model=LoginResponse)
async def login(req: LoginRequest):
"""1단계: 비밀번호 검증."""
# user = await user_repo.get_by_username(req.username)
# if not user or not verify_password(req.password, user.hashed_password):
# raise HTTPException(401, "Incorrect username or password")
# 로그인 시도 횟수 확인 (브루트포스 방지)
# if user.login_attempts >= settings.max_login_attempts:
# raise HTTPException(429, "Account locked. Try again later.")
has_2fa = True # user.otp_enabled
if has_2fa:
temp_token = create_temp_token(sub=req.username)
return LoginResponse(requires_2fa=True, temp_token=temp_token)
access_token = create_access_token(sub=req.username)
refresh_token = create_refresh_token(sub=req.username)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
)
class Verify2FARequest(BaseModel):
temp_token: str
otp_code: str
@router.post("/verify-2fa", response_model=LoginResponse)
async def verify_2fa_login(req: Verify2FARequest):
"""2단계: TOTP 코드 검증 후 실제 토큰 발급."""
try:
payload = decode_token(req.temp_token, expected_type="temp_2fa")
except Exception:
raise HTTPException(401, "Invalid or expired temp token")
username = payload["sub"]
# DB에서 otp_secret 조회
otp_secret = "USER_OTP_SECRET"
totp = pyotp.TOTP(otp_secret)
if not totp.verify(req.otp_code, valid_window=1): # +-30초 허용
raise HTTPException(400, "Invalid OTP code")
access_token = create_access_token(sub=username)
refresh_token = create_refresh_token(sub=username)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
)
valid_window=1은 현재 시간 기준으로 전후 1개 구간(총 90초)까지 코드를 허용한다. 서버와 클라이언트 간 시간 차이로 인한 인증 실패를 방지한다.
Refresh Token Rotation과 HttpOnly Cookie
Refresh Token Rotation
Refresh Token을 사용할 때마다 새로운 Refresh Token을 발급하고, 기존 토큰은 블랙리스트에 등록한다. 탈취된 토큰이 재사용되면 즉시 감지할 수 있다.
@router.post("/refresh")
async def refresh_token(req: RefreshRequest):
"""Refresh Token Rotation - 갱신 시 새 Refresh Token도 발급."""
payload = decode_token(req.refresh_token, expected_type="refresh")
# 기존 Refresh Token 무효화 (DB 블랙리스트)
# await token_repo.blacklist(req.refresh_token)
access_token = create_access_token(sub=payload["sub"])
new_refresh_token = create_refresh_token(sub=payload["sub"])
return LoginResponse(
access_token=access_token,
refresh_token=new_refresh_token,
)HttpOnly Cookie 설정
Refresh Token을 HttpOnly Cookie로 전달하면 JavaScript에서 접근할 수 없으므로 XSS 공격에 안전하다.
from fastapi.responses import JSONResponse
@router.post("/login-cookie")
async def login_with_cookie(req: LoginRequest):
"""Refresh Token을 HttpOnly Cookie로 설정."""
# ... 인증 로직 ...
access_token = create_access_token(sub=req.username)
refresh_token = create_refresh_token(sub=req.username)
response = JSONResponse(content={
"access_token": access_token,
"token_type": "bearer",
})
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True, # JS 접근 차단
secure=True, # HTTPS only
samesite="strict", # CSRF 방지
max_age=7 * 24 * 3600, # 7일
path="/api/auth/refresh", # 갱신 엔드포인트에서만 전송
)
return response쿠키 보안 옵션:
| 옵션 | 값 | 효과 |
|---|---|---|
httponly | True | document.cookie로 접근 불가 (XSS 방어) |
secure | True | HTTPS 연결에서만 쿠키 전송 |
samesite | strict | 다른 사이트에서의 요청 시 쿠키 미전송 (CSRF 방어) |
path | /api/auth/refresh | 갱신 엔드포인트로만 쿠키 범위 제한 |
Next.js Edge Runtime 미들웨어
Edge Runtime에서의 JWT 검증
Next.js Middleware는 Edge Runtime에서 실행된다. jsonwebtoken 라이브러리는 Node.js 전용이므로 사용할 수 없고, Edge 호환 라이브러리인 jose를 사용해야 한다.
pnpm add jose미들웨어 구현
// middleware.ts (프로젝트 루트)
import { NextRequest, NextResponse } from "next/server";
import { jwtVerify } from "jose";
const JWT_SECRET = new TextEncoder().encode(
process.env.JWT_SECRET!
);
// 인증 불필요한 경로
const PUBLIC_PATHS = ["/login", "/api/auth"];
export async function middleware(request: NextRequest) {
const { pathname } = request.nextUrl;
// 공개 경로는 통과
if (PUBLIC_PATHS.some((path) => pathname.startsWith(path))) {
return NextResponse.next();
}
// Access Token 확인 (Cookie 또는 Authorization 헤더)
const token =
request.cookies.get("access_token")?.value ||
request.headers
.get("authorization")
?.replace("Bearer ", "");
if (!token) {
return NextResponse.redirect(new URL("/login", request.url));
}
try {
// JWT 검증 (jose - Edge Runtime 호환)
const { payload } = await jwtVerify(token, JWT_SECRET, {
algorithms: ["HS256"],
});
// 사용자 정보를 헤더에 추가
const response = NextResponse.next();
response.headers.set("x-user", payload.sub as string);
return response;
} catch {
// 토큰 만료/무효 → 로그인 리디렉트
return NextResponse.redirect(new URL("/login", request.url));
}
}
export const config = {
matcher: [
"/((?!_next/static|_next/image|favicon.ico).*)",
],
};클라이언트 자동 토큰 갱신
Access Token 만료 시 자동으로 Refresh Token을 사용해 갱신하는 유틸리티 함수다.
// lib/auth.ts
let accessToken: string | null = null;
export async function fetchWithAuth(
url: string,
options: RequestInit = {},
): Promise<Response> {
const headers = new Headers(options.headers);
if (accessToken) {
headers.set("Authorization", `Bearer ${accessToken}`);
}
let response = await fetch(url, { ...options, headers });
// 401이면 Refresh Token으로 갱신 시도
if (response.status === 401) {
const refreshResponse = await fetch("/api/auth/refresh", {
method: "POST",
credentials: "include", // HttpOnly Cookie 전송
});
if (refreshResponse.ok) {
const data = await refreshResponse.json();
accessToken = data.access_token;
// 원래 요청 재시도
headers.set("Authorization", `Bearer ${accessToken}`);
response = await fetch(url, { ...options, headers });
} else {
// Refresh도 실패 → 로그인 페이지로
window.location.href = "/login";
}
}
return response;
}보안 헤더 설정
Next.js 보안 헤더
next.config.ts에서 모든 응답에 보안 헤더를 추가한다.
// next.config.ts
const securityHeaders = [
{
key: "X-DNS-Prefetch-Control",
value: "on",
},
{
key: "Strict-Transport-Security",
value: "max-age=63072000; includeSubDomains; preload",
},
{
key: "X-Frame-Options",
value: "SAMEORIGIN",
},
{
key: "X-Content-Type-Options",
value: "nosniff",
},
{
key: "Referrer-Policy",
value: "origin-when-cross-origin",
},
{
key: "Permissions-Policy",
value: "camera=(), microphone=(), geolocation=()",
},
];
const nextConfig = {
async headers() {
return [
{
source: "/(.*)",
headers: securityHeaders,
},
];
},
};
export default nextConfig;각 헤더의 역할:
| 헤더 | 효과 |
|---|---|
Strict-Transport-Security | HTTPS 강제 (2년, 서브도메인 포함) |
X-Frame-Options | 클릭재킹 방지 (iframe 삽입 차단) |
X-Content-Type-Options | MIME 스니핑 방지 |
Referrer-Policy | 외부 이동 시 전체 URL 대신 origin만 전송 |
Permissions-Policy | 카메라, 마이크, 위치 정보 API 비활성화 |
Content Security Policy (CSP)
CSP는 XSS 공격의 근본적 방어책이다. nonce 기반으로 신뢰할 수 있는 스크립트만 실행을 허용한다.
// middleware.ts (CSP 적용)
export function middleware(request: NextRequest) {
const nonce = Buffer.from(crypto.randomUUID())
.toString("base64");
const cspHeader = `
default-src 'self';
script-src 'self' 'nonce-${nonce}' 'strict-dynamic';
style-src 'self' 'unsafe-inline';
img-src 'self' blob: data:;
font-src 'self';
connect-src 'self' wss://jongkwan.dev ws://localhost:*;
frame-ancestors 'none';
base-uri 'self';
form-action 'self';
`.replace(/\s{2,}/g, " ").trim();
const response = NextResponse.next();
response.headers.set("Content-Security-Policy", cspHeader);
response.headers.set("x-nonce", nonce);
return response;
}FastAPI CORS 설정
프론트엔드와 백엔드가 다른 도메인(또는 포트)에서 동작하므로 CORS 설정이 필요하다.
# app/main.py
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://jongkwan.dev",
"http://localhost:3000", # 개발 환경
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
allow_credentials=True와allow_origins에 와일드카드(*)는 동시에 사용할 수 없다. 반드시 구체적인 origin을 명시해야 한다.
Rate Limiting
브루트포스 공격을 방지하기 위해 로그인 엔드포인트에 속도 제한을 적용한다.
# app/auth/middleware.py
from collections import defaultdict
from datetime import datetime, timezone
from fastapi import Request, HTTPException
login_attempts: dict[str, list[datetime]] = defaultdict(list)
RATE_LIMIT_WINDOW = 900 # 15분
RATE_LIMIT_MAX = 5 # 5회
async def check_login_rate_limit(request: Request):
"""로그인 시도 횟수 제한 (IP 기반)."""
client_ip = request.client.host if request.client else "unknown"
now = datetime.now(timezone.utc)
# 윈도우 밖의 기록 정리
attempts = login_attempts[client_ip]
cutoff = now.timestamp() - RATE_LIMIT_WINDOW
attempts[:] = [a for a in attempts if a.timestamp() > cutoff]
if len(attempts) >= RATE_LIMIT_MAX:
raise HTTPException(
status_code=429,
detail=f"Too many login attempts. "
f"Try again in {RATE_LIMIT_WINDOW // 60} minutes.",
)
attempts.append(now)설정 요약:
| 항목 | 값 |
|---|---|
| 최대 시도 횟수 | 15분당 5회 |
| 잠금 시간 | 15분 |
| 제한 기준 | 클라이언트 IP |
CVE-2025-29927 대응
취약점 개요
Next.js에서 x-middleware-subrequest 헤더를 통해 미들웨어를 완전히 우회할 수 있는 심각한 취약점이 발견되었다.
영향 버전: Next.js < 15.2.3, < 14.2.25, < 13.5.9
심각도: Critical이 취약점을 악용하면 공격자가 인증 미들웨어를 건너뛰고 보호된 라우트에 직접 접근할 수 있다.
대응 방법
최우선: Next.js 업데이트
pnpm add next@latest # 15.2.3 이상임시 방어 (업데이트 전)
// middleware.ts 최상단에 추가
if (request.headers.get("x-middleware-subrequest")) {
return new NextResponse(null, { status: 403 });
}이 임시 방어는 어디까지나 업데이트 전 긴급 조치다. 반드시 Next.js를 최신 버전으로 업데이트해야 한다.
보안 체크리스트
전체 인증 시스템의 보안을 점검하는 체크리스트다.
| 항목 | 설명 | 중요도 |
|---|---|---|
| JWT Secret | 최소 256비트 (openssl rand -hex 32) | 필수 |
| HTTPS Only | Cloudflare Tunnel이 보장 | 필수 |
| HttpOnly Cookie | Refresh Token의 XSS 방지 | 필수 |
| SameSite=Strict | CSRF 방지 | 필수 |
| Rate Limiting | 로그인 5회/15분 | 필수 |
| HSTS | max-age 2년, preload | 필수 |
| X-Frame-Options | SAMEORIGIN (클릭재킹 방지) | 필수 |
| Token Rotation | Refresh 시 새 토큰 발급 | 권장 |
| TOTP valid_window=1 | 전후 30초만 허용 | 권장 |
| CSP | Script-src nonce 기반 | 권장 |
| CVE-2025-29927 | Next.js 15.2.3 이상 | 필수 |
데이터 모델
# app/models/user.py
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy.sql import func
from app.db.base import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(255), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
# 2FA
otp_enabled = Column(Boolean, default=False)
otp_secret = Column(String(32), nullable=True)
otp_verified = Column(Boolean, default=False)
# 보안
login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime(timezone=True), nullable=True)
last_login = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())환경 변수
# JWT
AUTH_JWT_SECRET=<openssl rand -hex 32>
AUTH_JWT_ALGORITHM=HS256
AUTH_ACCESS_TOKEN_EXPIRE_MINUTES=30
AUTH_REFRESH_TOKEN_EXPIRE_DAYS=7
# TOTP
AUTH_TOTP_ISSUER=PersonalTrader
# Next.js
JWT_SECRET=<AUTH_JWT_SECRET과 동일한 값>JWT 시크릿은 반드시 openssl rand -hex 32로 생성한 최소 256비트 랜덤 값을 사용해야 한다. FastAPI와 Next.js가 동일한 시크릿을 공유해야 양쪽에서 토큰을 검증할 수 있다.