はじめに
今回は、FastAPIのAPIのエンドポイントを定義する方法である
Path Operation
に渡す関数を
async def
にした時の挙動について調べました。この手の記事はすでにたくさんありますが、今回自分で調べて裏をとりたかったため、調べた内容をまとめてみました。
Path Operationと並行処理
FastAPIでAPIを定義するには、Path Operationデコレータを使います。以下がFastAPIの最小限のコードです。
1 2 3 4 5 6 7 | from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"Hello": "World"} |
@app.get("/")
といったデコレータに関数を登録することで、指定されたパスとメソッドへのリクエストに対応する関数を定義できます。
この関数は
async def
にすることもできます。
1 2 3 | @app.get("/") async def read_root(): return {"Hello": "World"} |
どちらのコードでも、実際に動かしてみると一見挙動に違いは見られません。しかし、実はこの関数の呼び出し方が異なるのです。
通常の
def
は、APIリクエストがあった時に直接呼び出されるのではなく、FastAPIのスレッドプールによって呼び出され、 awaitされます。こうすることで、同期関数で重たい処理があっても、他のリクエストをブロックすることはなく、「並列」処理されます。
ところが、
async def
はイベントループから呼び出されます。イベントループは「並行」処理なので、シングルスレッドでタスクを切り替えながら実行されます。そのため、同期で重たい処理やI/O待ちなどが発生してしまうと、制御が解放されず、他のタスクが実行待ちのままとなってしまいます。
結果として、他のAPIリクエストを受け付けられなくなってしまい、ブロッキングしてしまうというわけです。
挙動の確認
以下のコードを使って実験してみます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import asyncpg import requests from fastapi import FastAPI app = FastAPI() def request_get(): # レスポンスに10秒かかる遅いAPIを呼び出す response = requests.get("http://localhost:8001/very_slow_task") return response.json() # ブロッキングされていないか確認するためのエンドポイント @app.get("/") def read_root(): return {"Hello": "World"} # asyncなのにI/Oブロックしてしまうエンドポイント @app.get("/req_very_slow_task") async def req_very_slow_task(): # asyncpgを使用しているので、async defにせざるを得ない conn = await asyncpg.connect( user="postgres", password="example", database="sample", host="127.0.0.1" ) values = await conn.fetch("SELECT * FROM users") await conn.close() # しかし、同期でリクエストを送信しているため、ここでI/Oブロックしてしまう!!!! res = request_get() return {"task_result": res, "db_result": values} |
このサンプルでは、asyncpgを使ってDBにSELECTしつつ、requestsで外部APIにリクエストしています。asyncpgは非同期なので、このAPIの関数も
async def
にせざるを得ません。
にもかかわらず、requestsは同期処理となっているため、外部APIのレスポンス待ちが発生し、I/Oブロックしてしまいます。
対応方法
この問題の対応方法を3パターン考えてみました。
asyncioを使って自力で非同期化する
組み込みモジュールであるasyncioを使い、イベントループ内で関数を実行することで、制御を解放します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | import asyncio import asyncpg import requests from fastapi import FastAPI app = FastAPI() def request_get(): # レスポンスに10秒かかる遅いAPIを呼び出す response = requests.get("http://localhost:8001/very_slow_task") return response.json() # ブロッキングされていないか確認するためのエンドポイント @app.get("/") def read_root(): return {"Hello": "World"} @app.get("/req_very_slow_task") async def req_very_slow_task(): # asyncpgを使用しているので、async defにせざるを得ない conn = await asyncpg.connect( user="postgres", password="example", database="sample", host="127.0.0.1" ) values = await conn.fetch("SELECT * FROM users") await conn.close() # asyncioを使ってイベントループを取得 loop = asyncio.get_running_loop() # ブロッキング処理を非同期処理させる res = await loop.run_in_executor(None, request_get) return {"task_result": res, "db_result": values} |
このコードを使うと、
GET /req_very_slow_task
のレスポンスを待っている最中にも、
GET /
はブロッキングされず、すぐレスポンスが返ってきます。
今回は、requestsを使いましたが、例えばboto3でも同じ手法で非同期化することができます。
async対応済みのパッケージを使う
先ほどの例ではrequestsを使いましたが、同じHTTPクライアントである
httpx
は初めから非同期をサポートしています。(ちなみに、requestsはメンテナンスが止まっているので、今現在では同期の場合でもhttpxがおすすめです。)
httpxを使って、次のようにコードを変更します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | import asyncpg import httpx from fastapi import FastAPI app = FastAPI() async def request_get(): # httpxの非同期クライアントをオープン async with httpx.AsyncClient() as client: # レスポンスに10秒かかる遅いAPIを呼び出す response = await client.get("http://localhost:8001/very_slow_task") return response.json() # ブロッキングされていないか確認するためのエンドポイント @app.get("/") def read_root(): return {"Hello": "World"} # asyncなのにI/Oブロックしてしまうエンドポイント @app.get("/req_very_slow_task") async def req_very_slow_task(): # asyncpgを使用しているので、async defにせざるを得ない conn = await asyncpg.connect( user="postgres", password="example", database="sample", host="127.0.0.1" ) values = await conn.fetch("SELECT * FROM users") await conn.close() # レスポンスに10秒かかる遅いAPIをawaitで呼び出す res = await request_get() return {"task_result": res, "db_result": values} |
httpx.AsyncClient
を使うことで、自力でasyncioを使わなくても簡単に非同期化することができています。そのため、先ほどよりコードがすっきりしました。
その他の同期パッケージも、他の代替えがある場合にはそれを使用することで、自力でasyncioを使わなくとも非同期化することができます。
やりたいこと | 同期 | 非同期 |
HTTPクライアント | httpx or requestsなど | httpxなど |
AWS クライアント | boto3 | aioboto3 ※全機能が使えるのはs3のみ |
ファイルI/O | 組み込みの open() 関数 | aiofiles |
PostgreSQL | psycopgなど | psycopg or asyncpg など |
MySQL | mysqlclient or PyMySQL など | asyncmy or aiomysql ※開発は止まっている模様 |
async defをやめる
アプリケーション全体でasyncpgを使うと決めた場合、おそらくほとんどのエンドポイントが
async def
になると思いますが、それは至るとこでI/Oバウンドなどによるブロッキングが起きないか、気をつけなければならないことになります。
そこまでパフォーマンスにシビアでなければ、アプリケーション全体で使用するであろう処理を同期にすることで、コード全体をすっきりとさせ、やるべきことに集中することができるかもしれません。
冒頭で話した通り、同期の
def
もスレッドプール内のスレッドによって実行されるため、他のAPIをブロッキングすることはありません。もちろん、スレッドの切り替え(コンテキストスイッチ)が発生することから、asyncよりかはパフォーマンスが劣るみたいですが、パフォーマンスをどこまで切り詰めるかと、コードのわかりやすさのバランスが重要だと思います。
さいごに
非同期は奥が深いですね。CPUをどのように使うのか、並列と並行ではアプローチが異なることも含めて、改めて勉強になりました。