Search

FastAPI 요청-응답 로깅하기

OO님, 앞으로 FastAPI 모든 요청-응답 로깅 합시다.

풀어야할 문제가 생겼고, 이를 해결해 봅시다.
요청(request)-응답(response)은 이미 있는 것이니, 언제, 어디서 접근하고, 어떻게 기록할지 파악해서 정리합니다.
단순히 request, response 메타 정보만 로깅 할 수도 있지만,
1.
body 데이터 로깅
2.
view 필터링 ( 민감 정보 담긴 body 노출 을 막기 위해서 )
2가지 조건을 추가로 상정하겠습니다.

작업 타이밍

주어진 요구조건에 따라 req(request)와 함께 resp(response)까지 함께 로깅해야 합니다. 접근시점은 view(path operation func)가 완료된 이후 입니다. view 가 처리된 직후, route가 처리된 이후, 미들웨어에서 요청과 응답에 접근할 수 있습니다. 후술할 시점별 방법들을 비교한 뒤에 접근시점을 결정합니다.
접근 시점에 대한 논의와 별개로, 로깅을 처리할 시점을 선택 합니다.
1.
요청-응답 컨텍스트 중
2.
요청-응답 컨텍스트 종료후
1 번은 로깅 처리에서 난 에러들이 요청-응답 컨텍스트에 영향을 줄 수 있습니다. (e.g. raise Exception, Blocking, Connection problem, etc) 따라서 2번을 선택하는 것이 타당해보입니다.
FastAPI는 flask의 @app.teardown_appcontext 와 같은 방법을 제공하지 않습니다. 대신 BackgroundTask 라는 요청-응답 컨텍스트 종료 이후, 동작 작업을 등록할 수 있는 Task를 제공합니다. 이를 사용하면 Request 종료 후 요청-응답 컨텍스트 독립 작업이 가능합니다. BackgroundTask는 동기, 비동기 함수 모두 사용이 가능하며, FastAPI Depends 컨텍스트 유지도 됩니다. Tiaglo(Fastapi 메인테이너)는 가벼운 잡에만 사용하는 것을 권장하고 있습니다. (오래 걸리는 잡은 celery 같은 별도 비동기 잡을 사용하셔야 합니다.)
BackgroundTask를 사용해서 요청-응답 컨텍스트 종료 후 요청-응답을 로깅합니다.

로깅 저장방법

간략하게 비교합니다. 제가 사용 경험이 없어서 코멘트 주시면 많은 도움이 됩니다.

외부 로깅 전송

fluentd, prometheus: 일반 로깅이 아닌 수치형 자료 (cpu 사용량, memory 사용량등)의 자료 수집에 특화되어 있습니다.
logstash: 서버 로그를 전달합니다. 수집 및 저장은 ES(ElasticSearch)를 사용해야 하고, Kibana를 통해 사용성을 제공합니다. (ES 또는 Kibana까지 서비스를 추가로 관리해야 합니다.)
sentry: 플러그인을 통해 앱 내 에러, 성능을 알아서 추적해서 보고서 를 제공합니다. 일반 로깅 저장이 되지는 않습니다.
datadog: 로그 수집 및 자료의 다양한 시각화를 가능합니다. (Java 에서는 성능 분석까지 되는걸로 알고 있으나, python에서는 일반 로그 수준만 수집이 되는걸로 보입니다.) 가격정책이 부담이 될 수 있습니다.

앱 내 처리

DB 저장: 서비스외의 DB 부하를 만듭니다. 이미 연결된 DB가 있을 경우 복잡성이 줄어듭니다.
File 저장: 직관의 방법이나 저장소 관리가 필요합니다. (File rotation을 통해 운영 부하를 최소화 할 수 있습니다.) 멀티 인스턴스로 앱을 띄울 경우 관리가 복잡해집니다.
바로 도입할 수 있는 DB 저장을 선택하겠습니다. 독자들이 처하신 상황에 맞춰서 선택하시면 됩니다.

로깅 방법들

우리는 다양한 시점에 App들의 req-resp 사이클에 접근할 수 있습니다. flask에서는 접근할 수 있는 Context 데코레이터를 제공하고 있습니다.
app = Flask() @app.before_first_request ... @app.before_request ... @app.after_request ... @app.teardown_request ... @app.teardown_appcontext ...
Python
복사
FastAPI는 같은 방법을 제공 하고 있지 않고, middleware를 통한 접근을 알려줍니다.
FastAPI Github 이슈
409
issues

미들웨어 로깅

모든 Request와 Response가 통과하는 레이어이므로 제일 먼저 떠올릴 방법입니다.
1.
FastAPI가 의존하고 있는 Starlette에서 제약이 있습니다. request, response 가 streaming 객체로 되어 있어서 일반적인 접근이 안됩니다. Body에 접근을 하려면 streaming 객체를 읽고 Byte를 디코딩하고, 다시 새로운 streaming 객체를 만들어줘야 합니다. 이는 오버헤드가 될 여지가 있어 보입니다.
2.
일부 view를 로깅에서 예외로 하고 싶을 경우 별도 관리가 필요합니다. 미들웨어 스코프에서는 view context 접근이 불가능합니다. (dispatch param으로 받는 call_next 는 view가 아닙니다. ) request.url.path를 읽어서 별도관리 목록과 비교를 하는 조건절이 필요합니다.
3.
response 객체에 BackgroudTask에 데이터 처리 함수를 달아줍니다. 샘플용으로 단순한 함수를 달았고, DB 저장 함수를 달아주시면 요청-응답 프로세스 종료후 처리가 됩니다.
모든 요청-응답을 동일하게 처리하고 싶다면 한번에 정리할 수 있는 선택지 입니다.
# Ref: https://stackoverflow.com/questions/69670125/how-to-log-raw-http-request-response-in-python-fastapi from json import JSONDecodeError import json import logging from typing import Callable, Awaitable, Tuple, Dict, List from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response, StreamingResponse from starlette.types import Scope, Message from fastapi import BackgroundTasks # Set up your custom logger here # Below is for example fix for what you want def log_handler(data: dict): # data handling ... class RequestWithBody(Request): """Creation of new request with body""" def __init__(self, scope: Scope, body: bytes) -> None: super().__init__(scope, self._receive) self._body = body self._body_returned = False async def _receive(self) -> Message: if self._body_returned: return {"type": "http.disconnect"} else: self._body_returned = True return {"type": "http.request", "body": self._body, "more_body": False} class CustomLoggingMiddleware(BaseHTTPMiddleware): """ Use of custom middleware since reading the request body and the response consumes the bytestream. Hence this approach to basically generate a new request/response when we read the attributes for logging. """ async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]: """Getting the response parameters of a response and create a new response.""" response_byte_chunks: List[bytes] = [] response_status: List[int] = [] response_headers: List[Dict[str, str]] = [] async def send(message: Message) -> None: if message["type"] == "http.response.start": response_status.append(message["status"]) response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]}) else: response_byte_chunks.append(message["body"]) await response.stream_response(send) content = b"".join(response_byte_chunks) return content, response_headers[0], response_status[0] async def dispatch( # type: ignore self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] ) -> Response: # Store request body in a variable and generate new request as it is consumed. request_body_bytes = await request.body() request_with_body = RequestWithBody(request.scope, request_body_bytes) # Store response body in a variable and generate new response as it is consumed. response = await call_next(request_with_body) response_content_bytes, response_headers, response_status = await self._get_response_params(response) # Logging # If there is no request body handle exception, otherwise convert bytes to JSON. try: req_body = json.loads(request_body_bytes) except JSONDecodeError: req_body = "" resp_body = json.loads(response.body.decode("utf8")) # Logging of relevant variables. log_data = { # added from auth middleware 'user_id': request.user.user_id, 'group_id': request.user.group_id, "authority": request.user.user_authority, # basic infomation "ip": request.client.host, "port": request.client.port, 'method': request.method, "path": request.url.path, # custom infomation "agent": dict(request.headers.items())["user-agent"], "request_body": req_body, "response_body": response_body, "response_status": response.status_code } # add BackgroudTask pre_background = response.background response.background = BackgroundTasks() if pre_background: response.background = BackgroundTasks([pre_background]) response.background.add_task(func=log_handler, data={ "req-resp": data }) # Finally, return the newly instantiated response values return Response(response_content_bytes, response_status, response_headers) # Register Middleware # app.py from fastapi import FastAPI # First way app = FastAPI(...) app.add_middleware(CustomLoggingMiddleware) # Second way from fastapi import FastAPI from fastapi.middleware import Middleware app = FastAPI( middleware=[Middleware(TimeHeaderMiddleware)] )
Python
복사

함수 데커레이터 로깅

요청이 처리된 직후도 떠올릴 만한 접근 지점입니다. Python에서는 Decorator를 통해 view 전, 후 문맥에 접근이 가능합니다.
1.
미들웨어와 달리 선택적인 로깅이 쉬워집니다. @데코레이터를 달기만 하면 로깅이 됩니다.
2.
view parameter 명시필요: request를 명시 해줘야 @데코레이터에서 request에 접근이 가능합니다.
3.
response는 starlette의 resp 객체가 아닌, func 의 resp입니다. (dict)
starlette resp의 attr이 없습니다. (status_code, background)
4.
BackgroudTask로 처리가 불가능하기 때문에 요청-응답에 영향이 갈 수 있습니다.
request에 대해서 확인하는 정도까지는 괜찮지만 추가 작업이 들어간다면, 악영향을 끼칠 수 있어서 좋은 선택지가 되긴 어려워 보입니다.
from fastapi import Request, BackgroundTasks def log_req_and_resp(func): @wraps(func) async def wrapper(request: Request, *args, **kwargs): resp = await func(request, *args, **kwargs) body_content = await request.body() # 바디 내용을 문자열로 변환 body_str = body_content.decode("utf-8") # JSON 문자열을 파이썬 딕셔너리로 변환 body_dict = json.loads(body_str) log_data = { # added from auth middleware 'user_id': request.user.user_id, 'group_id': request.user.group_id, "authority": request.user.user_authority, # basic infomation "ip": request.client.host, "port": request.client.port, 'method': request.method, "path": request.url.path, # custom infomation "agent": dict(request.headers.items())["user-agent"], # Below line raise Exception cause response has no status_code 'response_status': resp.status_code, } # Below 2 lines raise Exception cause resp is not starlette's Response resp.background = BackgroundTasks() resp.background.add_task(log_handler, log_data) # handle data log_handler(log_data) return resp # <= {"message": "OK"} return wrapper @app.get("/test") @log_req_and_resp async def test(): ... return {"message": "OK"}
Python
복사

라우트 클래스

* Router 와 Route의 차이 Route는 개별 endpoint func를 지칭합니다. Router는 Route의 집합 입니다.
Route 클래스는 view의 Wrapper입니다. view의 resp를 Response 객체로 만들어서 리턴합니다. route에서 접근시에는 view 데코레이터에서 접근이 안되었던 resp에 접근이 가능합니다. 라우트별 선택은 안되지만, 라우트 모음인 라우터에 선택 적용 가능합니다.
데코레이터처럼 단순하지는 않지만 선택 적용이 되고, req, resp 바디에 대한 접근이 가능합니다.
# log_router import time from typing import Callable from fastapi import Request, Response, BackgroundTasks from fastapi.routing import APIRoute from dependency_injector.wiring import Provide, inject class LogRoute(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler() async def custom_route_handler(request: Request) -> Response: response: Response = await original_route_handler(request) body_content = await request.body() req_body = json.loads(body_content.decode("utf-8")) resp_body = json.loads(response.body.decode("utf8")) log_data = { # added from auth middleware 'user_id': request.user.user_id, 'group_id': request.user.group_id, "authority": request.user.user_authority, # basic infomation "ip": request.client.host, "port": request.client.port, 'method': request.method, "path": request.url.path, "agent": dict(request.headers.items())["user-agent"], 'response_status': response.status_code, "req_body": req_body, "resp_body": resp_body } pre_background = response.background response.background = BackgroundTasks() if pre_background: response.background = BackgroundTasks([pre_background]) response.background.add_task(func=log_handler, data={ "req-resp": log_data }) return response return custom_route_handler # api.py some_router = APIRouter(route_class=LogRoute) # some_router로 decorated된 view는 이제 로깅이 됩니다. @some_router.get("/some") async def some_view(...): ... app.include_router(some_router)
Python
복사
상정한 2가지 조건에 부합하고, 로직 처리가 쉬운 라우트 클래스 overide를 선택합니다.

에러 처리

요청-응답 처리 도중 raise Exception을 하게 될 경우, 위의 3개 스택 중 어디서든지 escape 합니다.
그럼 어디서 로직 플로우에 접근을 할수 있을까요? fastapi.exception_handler를 통해서 가능합니다. CustomException 을 정의했다면 Exception별로 별도 로직을 정의할수 있습니다. 그리고 로깅이 필요하다면 로깅 로직을 추가하면 됩니다.
exception_handler 처리 규칙 1. Exception 위계가 있고, 상위, 하위 에러핸들러가 있을 경우, 하위 클래스 에러는 하위 클래스 에러 핸들러에서 처리 됩니다. 2. 하위 클래스 핸들러가 없을 경우, 상위 클래스 에러 핸들러에서 처리 됩니다. 3. 등록 순서는 상관이 없습니다. 하위 클래스를 나중에 등록해도 하위클래스에서 처리 됩니다. 4. raise 된 SomeException 핸들러가 없을 경우 Exception 핸들러로 넘어갑니다. 5. ExceptionGroup은 핸들러 등록이 되지 않습니다. raise될 경우 Exception 핸들러로 넘어갑니다.
# app.py app_ = FastAPI(...) @app_.exception_handler(Exception) async def base_exception_handler(request: Request, exc: Exception): """ 따로 명시 되지 않은 모든 에러를 잡아내는 핸들러 """ try: try: log_data = { "user_id": request.user.user_id, "group_id": request.user.group_id, "authority": request.user.user_authority, "ip": request.client.host if request.client else None, "port": request.client.port if request.client else None, "method": request.method, "path": request.url.path, "agent": dict(request.headers.items())["user-agent"], "response_status": HTTPStatus.INTERNAL_SERVER_ERROR, } # authentication middleware에서 Unhandled Error가 발생한 경우, # request.user 접근시 AssertionError가 발생합니다. except AssertionError: log_data = { "user_id": None, "group_id": None, "authority": None, "ip": request.client.host if request.client else None, "port": request.client.port if request.client else None, "method": request.method, "path": request.url.path, "agent": dict(request.headers.items())["user-agent"], "response_status": HTTPStatus.INTERNAL_SERVER_ERROR, } await log_handler(log_data) return JSONResponse( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, content={ "code": "99999", "message": "RUNTIME_ERROR", }, ) except Exception as e: ...
Python
복사

OO님, FastAPI 요청-응답을 정리하기 위해서 아래와 같이 결정합니다.

항목
결정사항
로그 데이터 처리 시점
요청-응답 종료 후, BackgroudTask 사용 (부하가 클 경우, Celery)
데이터 접근 시점
route handler 동작 이후
로그 방법
Route class overide(데이터 처리 함수 추가) → api router 주입
로그 저장
DB 저장 (DB 부하 클시 외부 솔루션 도입)
에러케이스 처리
exception_handler 등록
Ref.
블로그 메인