uvicorn 부터 endpoint function 까지
django 는 내부 로직 흐름을 정리해주는 글들을 대략이라도 본 기억이 있습니다만, FastAPI 에서는 못 본것 같아 한번 정리해보면서 FastAPI 에 대한 이해를 높여봅니다.
uvicorn → asyncio → FastAPI ( middleware layer → router → function ) 으로 이어지는 코드 흐름을 따라가보겠습니다.
flowchart TD uvicorn --> asyncio --> 1 subgraph 1["FastAPI(starlette)"] mm["middleware layer"] --> router --> route --> m2["endpoint function"] end
Mermaid
복사
대략 적인 흐름은 위와 같습니다.
따라갈 준비하기
Pycharm으로 endpoint function에 디버깅 포인트를 만들고, frame stack 제일 밑에서부터 따라갑니다.
# main.py
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
router = APIRouter(prefix="/api")
@router.get("/hello-world")
async def root():
return {"message": "Hello World"} # 🔴 set Debug Point
app = FastAPI()
app.include_router(router)
if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8000)
Python
복사
Pycharm Debug 모드로 main.py를 실행하고 http://localhost:8000/api/hello-world 요청을 날립니다.
스택프레임 추적
로 마킹된 라인이 실제로 동작되는 코드입니다.
한글로 적어놓은 코멘트들은 제가 작성한 것입니다. 영어로 적혀 있는 doc string 또는 주석들은 원본 코드에 있는 내용들을 그대로 적거나, 일부분만 적었습니다.
설명이 필요없는 코드들은 ... 으로 생략했습니다. 전체 맥락이 궁금하신 분들은 직접 확인하시길 바랍니다.
들여쓰기는 스페이스 2칸으로 조절 했습니다.
uvicorn
req-1
# .../uvicorn/main.py
def run(...):
...
server = Server(config=config)
... # 동작에 필요한 Config 들을 확인합니다.
🟠 server.run() #서버가 시작 됩니다.
... # 컨피그가 적합하지 않으면 종료합니다.
Python
복사
req-2
# .../uvicorn/server.py
def run(self, sockets: Optional[List[socket.socket]] = None) -> None:
self.config.setup_event_loop()
# asyncio를 사용해서 server의 method를 동작합니다.
🟠 return asyncio.run(self.serve(sockets=sockets))
Python
복사
asyncio
req-3
# .../asyncio/runners.py
def run(main, *, debug=None):
"""Execute the coroutine and return the result."""
...
# 독립된 이벤트루프를 가지고 제공받는 코루틴들을 처리하는 러너 입니다.
# 하나의 스레드에서 다른 이벤트 루프가 있을 경우 동작하지 않습니다.
with Runner(debug=debug) as runner:
🟠 return runner.run(main)
Python
복사
req-4
# .../asyncio/runners.py
def run(self, coro, *, context=None):
"""Run a coroutine inside the embedded event loop."""
# 코루틴 검사, 이벤트루프 검사, 동작스레드가 메인스레드인지 검사합니다.
...
# coroutine을 awaitable task로 만듭니다.
# coro: Server.serve()
task = self._loop.create_task(coro, context=context)
...
try:
# 이벤트루프에 Server.serve() Task를 등록합니다.
🟠 return self._loop.run_until_complete(task)
Python
복사
req-5
# .../asyncio/base_events.py
def run_until_complete(self, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task."""
# 루프의 상태 확인, future 객체 확인, 콜백 등록 후,
...
try:
# 이벤트 루프가 동작합니다.
🟠 self.run_forever()
Python
복사
req-6
# .../asyncio/base_events.py
def run_forever(self):
"""Run until stop() is called."""
...
# while 문이 등장합니다. self._stopping == True가 될때까지, _run_once()를 반복 합니다.
while True:
🟠 self._run_once()
if self._stopping:
break
Python
복사
req-7
# .../asyncio/base_events.py
def _run_once(self):
# doc string 으로 설명 갈음합니다.
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
...
else:
🟠 handle._run()
Python
복사
req-8
# .../asyncio/events.py
def _run(self):
try:
# 전달받은 콜백을 실행합니다.
🟠 self._context.run(self._callback, *self._args)
Python
복사
uvicorn
req-9
# .../uvicorn/protocols/http/h11_impl.py
# 다시 uvicorn 입니다.
# 이제 request, response 처리로 들어갑니다.
class RequestResponseCycle:
...
async def run_asgi(self, app: "ASGI3Application") -> None:
try:
# app: ProxyHeadersMiddleware(fastapi)에
# self.scope, self.receive, self.send 를 전달 합니다.
# self.scope: http 관련 요소들을 담고 있습니다. 아래에서 자세히 설명합니다.
# self.receive: ASGIReceiveEvent 를 반환하는 method 입니다.
# self.send: ASGISendEvent 를 처리하는 method 입니다.
🟠 result = await app( # type: ignore[func-returns-value]
self.scope, self.receive, self.send
)
Python
복사
req-10
# .../uvicorn/middleware/proxy_headers.py
# FastAPI 를 래핑 하고 있는 ProxyHeadersMiddleware 에서
# FastAPI 에 scope, receive, send를 전달합니다.
class ProxyHeadersMiddleware:
...
async def __call__(
self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable"
) -> None:
# request 관련 작업들이 이루어집니다.
...
🟠 return await self.app(scope, receive, send)
Python
복사
ASGI3Application, scope 타입보기
FastAPI(starlette)
req-11
# .../fastapi/applications.py
# Starlette을 상속받은 FastAPI 클래스가 등장합니다.
# Starlette __call__로 인자를 전달합니다.
class FastAPI(Starlette):
...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.root_path:
scope["root_path"] = self.root_path
# scope, receive, send를 Starlette으로 넘깁니다.
🟠 await super().__call__(scope, receive, send)
Python
복사
req-12
# .../starlette/applications.py
class Starlette:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
scope["app"] = self
# Starlette instance에서 사용할 middleware를 추가해줍니다.
# Starlette은 2개의 Middleware를 기본 사용합니다.
# 1. ServerErrorMiddleware: except 처리 안된 서버 에러를 다룹니다.
# 2. ExceptionMiddleware: except 처리 된 서버에러를 다룹니다. e.g. (@app.exception_handler(SomtException))
if self.middleware_stack is None:
self.middleware_stack = self.build_middleware_stack()
# middleware_stack 에 받은 scope, receive, send 를 다시 전달합니다.
# middleware_stack은 ServerErrorMiddleware instance 입니다.
🟠 await self.middleware_stack(scope, receive, send)
Python
복사
req-13
# .../starlette/middleware/errors.py
# ServerErrorMiddleware로 공이 넘어 왔습니다.
class ServerErrorMiddleware:
...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
...
try:
# self.app은 FastAPI 앱일까요?
# 아닙니다. ExceptionMiddleware instance 입니다.
# Middleware들이 양파같은 구조로 안쪽 middleware를 호출하고 있는 것을 알수 있습니다.
🟠 await self.app(scope, receive, _send)
Python
복사
req-14
# .../starlette/middlware/exceptions.py
class ExceptionMiddleware:
...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
...
try:
# 그럼 이제는 등장할까요?
# self.app: fastapi의 AsyncExitStackMiddleware instance 입니다.
🟠 await self.app(scope, receive, sender)
Python
복사
req-15
# .../fastapi/middlware/asyncexitstack.py
# fastapi 로 넘어왔습니다.
class AsyncExitStackMiddleware:
def __init__(self, app: ASGIApp, context_name: str = "fastapi_astack") -> None:
self.app = app
self.context_name = context_name
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
dependency_exception: Optional[Exception] = None
async with AsyncExitStack() as stack:
"""AsyncExitStack: Async context manager for dynamic management of a stack of exit
callbacks."""
# fastapi에서 async context를 다루는 매니저를 scope에 등록합니다.
scope[self.context_name] = stack
try:
# self.app: fastapi APIRouter object
# self.app에 scope, receive, send 를 전달합니다.
🟠 await self.app(scope, receive, send)
Python
복사
req-16
# .../starlette/routing.py
# fastapi APIRouter는 starlette의 Router를 상속 받은 것으로 보입니다.
# starlette Router의 __call__()을 진행합니다.
class Router:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
The main entry point to the Router class.
"""
...
for route in self.routes:
# Determine if any route matches the incoming scope,
# and hand over to the matching route if found.
match, child_scope = route.matches(scope)
if match == Match.FULL:
scope.update(child_scope)
# Path를 확인하고 일치할 경우 scope, receive, send 를 route.handle로 전달합니다.
🟠 await route.handle(scope, receive, send)
return
Python
복사
req-17
# .../starlette/routing.py
class Route(BaseRoute):
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.methods and scope["method"] not in self.methods:
headers = {"Allow": ", ".join(self.methods)}
if "app" in scope:
raise HTTPException(status_code=405, headers=headers)
else:
response = PlainTextResponse(
"Method Not Allowed", status_code=405, headers=headers
)
await response(scope, receive, send)
else:
# self.app: starlette 의 request_response func에서 생성된 app입니다.
🟠 await self.app(scope, receive, send)
Python
복사
req-18
# .../starlette/routing.py
def request_response(func: typing.Callable) -> ASGIApp:
"""
Takes a function or coroutine `func(request) -> response`,
and returns an ASGI application.
"""
async def app(scope: Scope, receive: Receive, send: Send) -> None:
# 전달 받은 정보들을 가지고 request 객체를 만들고,
# 이를 func에 전달합니다. func(request) 는 response 를 반환합니다.
request = Request(scope, receive=receive, send=send)
if is_coroutine:
# func: fastapi의 get_request_handler 입니다.
🟠 response = await func(request)
else:
# func가 코루틴이 아닐 경우, fastapi는 쓰레드풀에서 함수를 동작시킵니다.
response = await run_in_threadpool(func, request)
await response(scope, receive, send)
return app
Python
복사
req-19
# .../fastapi/routing.py
def get_request_handler(
...
) -> Callable[[Request], Coroutine[Any, Any, Response]]:
async def app(request: Request) -> Response:
...
# func에 달린 dependency들을 해결 합니다.
solved_result = await solve_dependencies(
request=request,
dependant=dependant,
body=body,
dependency_overrides_provider=dependency_overrides_provider,
)
values, errors, background_tasks, sub_response, _ = solved_result
# solve_dependencies에서 에러 발생시 RequestValidationError를 raise합니다.
if errors:
raise RequestValidationError(_normalize_errors(errors), body=body)
# 의존성과, 의존성에 주입할 대상들, 코루틴 플래그와 넘깁니다.
else:
🟠 raw_response = await run_endpoint_function(
dependant=dependant, values=values, is_coroutine=is_coroutine
)
Python
복사
req-20
# .../fastapi/routing.py
async def run_endpoint_function(
*, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
) -> Any:
# Only called by get_request_handler. Has been split into its own function to
# facilitate profiling endpoints, since inner functions are harder to profile.
assert dependant.call is not None, "dependant.call must be a function"
# 코루틴 여부에 따라 직접 await 하거나, run_in_threadpool에 전달합니다.
if is_coroutine:
# dependant.call <= main.py에 정의한 endpoint function 입니다.
🟠 return await dependant.call(**values)
else:
return await run_in_threadpool(dependant.call, **values)
Python
복사
req-21
# main.py
# endpoint function에 도착했습니다.
@router.get("/hello-world")
async def root():
🟠 return {"message": "Hello World"}
Python
복사
Custom Middleware가 들어갈 경우
req_with_custom_middleware-1
# main.py
...
import time
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time() # 🔴 set Debug Point
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
Python
복사
공식문서에 있는 커스텀미들웨어를 따라서 등록합니다.
req_with_custom_middleware-2
# req-13
# .../starlette/middleware/errors.py
# ServerErrorMiddleware 에서
# ==> .../starlette/middleware/base.py
# 으로 넘어 왔습니다.
class BaseHTTPMiddleware:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# awaitable을 다루기 쉽게 asynchronous context에 anyio 를 사용합니다.
async with anyio.create_task_group() as task_group:
request = Request(scope, receive=receive)
# self.dispatch_func: add_process_time_header 제가 위에서 작성한 middleware 입니다.
🟠 response = await self.dispatch_func(request, call_next)
await response(scope, receive, send)
response_sent.set()
Python
복사
req_with_custom_middleware-3
# main.py
# 제가 작성한 미들웨어가 동작합니다.
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
🟠 response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
Python
복사
req_with_custom_middleware-4
# .../starlette/middleware/base.py
# 다시 base middleware로 돌아왔습니다.
class BaseHTTPMiddleware:
...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
...
async def call_next(request: Request) -> Response:
...
async def coro() -> None:
nonlocal app_exc
async with send_stream:
try:
# self.app: req-14 의 ExceptionMiddleware 입니다.
🟠 await self.app(scope, receive_or_disconnect, send_no_error)
except Exception as exc:
app_exc = exc
Python
복사
req-14 부터 동일한 flow로 진행합니다.
After endpoint function
Response 처리
resp-1
# .../fastapi/routing.py
# get_request_handler 에서 response 가공을 시작합니다.
🟠raw_response = await run_endpoint_function(
dependant=dependant, values=values, is_coroutine=is_coroutine
)
# Background 작업을 달아줍니다.
if isinstance(raw_response, Response):
if raw_response.background is None:
raw_response.background = background_tasks
return raw_response
response_args: Dict[str, Any] = {"background": background_tasks}
# If status_code was set, use it, otherwise use the default from the
# response class, in the case of redirect it's 307
# 응답 코드 관련 작업을 합니다.
current_status_code = (
status_code if status_code else sub_response.status_code
)
if current_status_code is not None:
response_args["status_code"] = current_status_code
if sub_response.status_code:
response_args["status_code"] = sub_response.status_code
# response 데이터 직렬화 작업을 합니다.
content = await serialize_response(
...
)
# 새로운 Response 클래스로 만듭니다.
response = actual_response_class(content, **response_args)
if not is_body_allowed_for_status_code(response.status_code):
response.body = b""
response.headers.raw.extend(sub_response.headers.raw)
🟠return response
Python
복사
resp-2
# .../starlette/routing.py
def request_response(func: typing.Callable) -> ASGIApp:
async def app(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive=receive, send=send)
if is_coroutine:
# response 가 만들어 졌습니다.
# response는 starlette의 Response instance 입니다.
🟠 response = await func(request)
else:
response = await run_in_threadpool(func, request)
# response instance __call__ 메소드를 호출합니다.
🟠 await response(scope, receive, send)
return app
Python
복사
resp-3
# .../starlette/response.py
class Response:
...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# response가 시작되었음을 send func에 전달 합니다.
# send: <function ExceptionMiddleware.__call__.<locals>.sender at 0x10fbe8720>
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
# response body(bytes)를 send func에 전달 합니다.
# send: <function ExceptionMiddleware.__call__.<locals>.sender at 0x10fbe8720>
🟠 await send({"type": "http.response.body", "body": self.body})
# background 작업이 있으면 여기서 동작을 시작합니다.
if self.background is not None:
await self.background()
Python
복사
resp-4
# .../startlette/middleware/exceptions.py
class ExceptionMiddleware:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
...
async def sender(message: Message) -> None:
nonlocal response_started
if message["type"] == "http.response.start":
response_started = True
# 전달받은 메시지를 전달합니다.
# send: <function ServerErrorMiddleware.__call__.<locals>._send at 0x103448ea0>
await send(message)
Python
복사
응답 처리는 커스텀 미들웨어가 없는 경우, 있는 경우에 따라 프로세스가 조금 달라집니다.
커스텀 미들웨어가 없는 경우
ExceptionMiddleware → ServerErrorMiddleware → RequestResponseCycle 순으로 로직이 진행됩니다.
resp_without_custom_middleware-1
# .../startlette/middleware/errors.py
class ServerErrorMiddleware
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
...
async def _send(message: Message) -> None:
nonlocal response_started, send
if message["type"] == "http.response.start":
response_started = True
# 전달받은 메시지를 전달합니다.
# send: <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x103424390>>
await send(message)
Python
복사
resp_without_custom_middleware-2
# .../uvicorn/protocols/http/h11_impl.py
class RequestResponseCycle:
async def send(self, message: "ASGISendEvent") -> None:
message_type = message["type"]
...
# res-3 번에서 await send(...)를 두번 하는 이유가 여기에서 나옵니다.
# 1번 요청으로 요청 처리 준비를 합니다.
if not self.response_started:
...
# Console에 표기되는 응답 로깅을 여기서 합니다.
if self.access_log:
self.access_logger.info(
'%s - "%s %s HTTP/%s" %d',
get_client_addr(self.scope),
self.scope["method"],
get_path_with_query_string(self.scope),
self.scope["http_version"],
status,
)
# Write response status line and headers
reason = STATUS_PHRASES[status]
response = h11.Response(status_code=status, headers=headers, reason=reason)
output = self.conn.send(event=response)
# 1번 send(...)처리
🟠 self.transport.write(output)
# 2 번째 send(...) 로 보내진 내용을 처리합니다.
elif not self.response_complete:
# Sending response body
if message_type != "http.response.body":
msg = "Expected ASGI message 'http.response.body', but got '%s'."
raise RuntimeError(msg % message_type)
message = cast("HTTPResponseBodyEvent", message)
body = message.get("body", b"")
more_body = message.get("more_body", False)
# Write response body
data = b"" if self.scope["method"] == "HEAD" else body
output = self.conn.send(event=h11.Data(data=data))
self.transport.write(output)
# Handle response completion
if not more_body:
self.response_complete = True
self.message_event.set()
output = self.conn.send(event=h11.EndOfMessage())
🟠 self.transport.write(output)
else:
# Response already sent
msg = "Unexpected ASGI message '%s' sent, after response already completed."
raise RuntimeError(msg % message_type)
# Response 전달이 완료 되면, 커넥션을 종료합니다.
if self.response_complete:
if self.conn.our_state is h11.MUST_CLOSE or not self.keep_alive:
self.conn.send(event=h11.ConnectionClosed())
🟠 self.transport.close()
self.on_response()
Python
복사
커스텀 미들웨어가 있는 경우
ExceptionMiddleware → BaseHTTPMiddleware 에서 미들웨어들 처리가 될때까지 메모리 객체에 스트리밍으로 전달하고 _StreamingResponse 에서 ASGIReceiveEvent 가 다 처리되길 기다린다음, 메모리객체에서 값을 꺼내 RequestResponseCycle 로 전달합니다.
resp_with_custom_middleware-1
# .../starlette/middleware/base.py
class BaseHTTPMiddleware:
...
async def call_next(request: Request) -> Response:
...
async def send_no_error(message: Message) -> None:
try:
# send_stream: MemoryObjectSendStream
🟠 await send_stream.send(message)
Python
복사
resp_with_custom_middleware-2
# .../starlette/middleware/base.py
class BaseHTTPMiddleware:
...
async def call_next(request: Request) -> Response:
...
# MemoryObjectReceiveStream에서 처음 값을 꺼냅니다.
try:
# recv_stream: MemoryObjectReceiveStream
message = await recv_stream.receive()
info = message.get("info", None)
if message["type"] == "http.response.debug" and info is not None:
message = await recv_stream.receive()
except anyio.EndOfStream:
if app_exc is not None:
raise app_exc
raise RuntimeError("No response returned.")
assert message["type"] == "http.response.start"
# MemoryObjectReceiveStream에서 message를 꺼내는 Generator입니다.
async def body_stream() -> typing.AsyncGenerator[bytes, None]:
async with recv_stream:
async for message in recv_stream:
assert message["type"] == "http.response.body"
body = message.get("body", b"")
if body:
yield body
if app_exc is not None:
raise app_exc
# 리스폰스를 stream 리스폰스로 만들고 body_stream() 을 전달 합니다.
response = _StreamingResponse(
status_code=message["status"], content=body_stream(), info=info
)
response.raw_headers = message["headers"]
🟠 return response
async with anyio.create_task_group() as task_group:
request = Request(scope, receive=receive)
response = await self.dispatch_func(request, call_next)
# response: <starlette.middleware.base._StreamingResponse object at 0x10d259010>
🟠 await response(scope, receive, send)
response_sent.set()
Python
복사
resp_with_custom_middleware-3
# .../starlette/response.py
class StreamingResponse(Response):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# receive: <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x10d33e490>>
# send: <function ServerErrorMiddleware.__call__.<locals>._send at 0x10d313ec0>
async with anyio.create_task_group() as task_group:
async def wrap(func: "typing.Callable[[], typing.Awaitable[None]]") -> None:
await func()
task_group.cancel_scope.cancel()
task_group.start_soon(wrap, partial(self.stream_response, send))
# self.listen_for_disconnect: <bound method StreamingResponse.listen_for_disconnect of <starlette.middleware.base._StreamingResponse object at 0x10d119d10>>
🟠 await wrap(partial(self.listen_for_disconnect, receive))
if self.background is not None:
await self.background()
...
# ASGIReceiveEvent가 다 처리되길 await 합니다.
# message["type"] == http.disconnect가 나올때 까지 값을 꺼냅니다.
async def listen_for_disconnect(self, receive: Receive) -> None:
while True:
# receive: <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x10c816150>>
🟠 message = await receive()
if message["type"] == "http.disconnect":
break
Python
복사
resp_with_custom_middleware-4
# .../uvicorn/protocols/http/h11_impl.py
class RequestResponseCycle:
...
async def receive(self) -> "ASGIReceiveEvent":
...
# ASGIReceiveEvent를 다 처리합니다.
if not self.disconnected and not self.response_complete:
self.flow.resume_reading()
🟠 await self.message_event.wait()
self.message_event.clear()
Python
복사
resp_with_custom_middleware-5
# .../starlette/response.py
class StreamingResponse(Response):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# receive: <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x10d33e490>>
# send: <function ServerErrorMiddleware.__call__.<locals>._send at 0x10d313ec0>
async with anyio.create_task_group() as task_group:
async def wrap(func: "typing.Callable[[], typing.Awaitable[None]]") -> None:
await func()
task_group.cancel_scope.cancel()
# response send 하는 함수를 새로운 스레드 풀에서 시작합니다.
# self.stream_response: <bound method _StreamingResponse.stream_response of <starlette.middleware.base._StreamingResponse object at 0x10c79a190>>
# send: <function ServerErrorMiddleware.__call__.<locals>._send at 0x10c767880>
🟠 task_group.start_soon(wrap, partial(self.stream_response, send))
await wrap(partial(self.listen_for_disconnect, receive))
if self.background is not None:
await self.background()
Python
복사
resp_with_custom_middleware-6
# .../starlette/response.py
class _StreamingResponse(StreamingResponse):
...
async def stream_response(self, send: Send) -> None:
if self._info:
await send({"type": "http.response.debug", "info": self._info})
# 부모클래스(StreamingResponse)의 메소드를 호출합니다.
🟠 return await super().stream_response(send)
Python
복사
resp_with_custom_middleware-7
# .../starlette/response.py
class StreamingResponse(Response):
async def stream_response(self, send: Send) -> None:
# 첫 메시지를 send에 전달합니다.
# send: <function ServerErrorMiddleware.__call__.<locals>._send at 0x10c767880>
🟠 await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
# resp 바디를 generator 에서 꺼내면서 send에 전달합니다.
# self.body_iterator: <async_generator object BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.body_stream at 0x10c7eb740>
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
🟠 await send({"type": "http.response.body", "body": chunk, "more_body": True})
await send({"type": "http.response.body", "body": b"", "more_body": False})
Python
복사
resp_without_custom_middleware-1 로 이어집니다.
Request 완료
# req-21. endpoint function에서 역순으로 res-8까지 쭉 되돌아 올라갑니다.
# .../asyncio/events.py
class Handle:
def _run(self):
try:
self._context.run(self._callback, *self._args)
...
🟠 self = None # Needed to break cycles when an exception occurs.
Python
복사
# .../asyncio/base_events.py
def _run_once(self):
# doc string 으로 설명 갈음합니다.
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
...
else:
# 이벤트루프 이터레이션이 다 도는 것으로 Request 처리가 완료 되었습니다.
🟠 handle._run()
handle = None
Python
복사
마무리
1.
Pycham 으로 스택 추적해보기 한번 해볼만 합니다.
a.
사용되는 변수들이 어떤 값인지, 인스턴스라면 어떤 attr을 가지고 있는지 확인 할 수 있습니다.
2.
FastAPI는 Starlette 없이는 존재할 수 없을 것으로 보입니다.
a.
기본 클래스도 Starlette 상속받아 만들어 졌고, Request, Response, Router, Middleware 등이 모두 Starlette을 베이스로 만들어 졌습니다.
b.
FastAPI 코드보다 starlette 코드를 더 많이 보게 됩니다.
c.
FastAPI 떼고, Starlette으로 앱을 만들어 볼수도 있겠다는 생각이 듭니다.
3.
Uvicorn Starlette FastAPI 역할이 좀더 선명하게 느껴집니다.
a.
Uvicorn: 외부 통신 인터페이스
b.
Starlette: 서버 내부 동작 인터페이스
c.
FastAPI: Starlette 위에 endpoint func 작성의 편의성 (Depends(), Pydantic을 통한 Param의 검증 등등 )얹어서 Uvicorn과 상호 작용
4.
미들웨어에 대한 이해가 좀 더 생겼습니다.
a.
ServerErrorMiddleware → 커스텀 미들웨어들 → ExceptionMiddleware 로 Request 처리가 진행 됨을 알 수 있습니다.
b.
@app.exception_handler(Exception) 로 달아놓은 handler는 ExceptionMiddleware에서 처리가 되고, 그 외 서버에러는 ServerErrorMiddleware 에서 처리가 됩니다.
c.
미들웨어들 블락킹 방지하기 위해 병렬 처리 돌리는 call_next 처리가 쉽게 이해가 가지는 않습니다.
5.
단순해 보이는 글이지만 마냥 단순하지는 않은 글쓰기였습니다.
a.
동작 자체는 단순하지만, 내부에서 비동기로 미들웨어들을 돌리기 위해 사용하는 구조에서 헤맸습니다.
b.
디버깅으로 코드플로우 추적을 하게되면, 쉬울줄 알았는데 호출되는 코드로 추적을 하게되는것이라서 맥락 파악이 쉽지는 않았습니다.
P.S. 피드백은 언제나 환영입니다.
블로그 메인