Skip to the content.

セキュリティの実装

ここでは、Json Web Tokenを利用したセキュリティを実装します。

目次

Json Web Token(JWT)とは

JSON Web Token(ジェイソン・ウェブ・トークン)は、JSONデータに署名や暗号化を施す方法を定めたオープン標準である。(from Wikipedia)

JWTには、任意の情報を保持することができます。例えば、ユーザーIDやトークンの有効期限などの情報を持たせることができ、フロントエンドがそのトークンを持っていれば、認証済みであることの証明に使えます。

トークンは、秘密鍵によって署名されており、悪意を持ったユーザーがトークンを書き換えたとしても、署名が一致しないので、修正されたことを検知できます。

JWTは、ヘッダー、ペイロード、署名の3要素から成ります。ペイロードに任意の情報を持たせることができます。

※JWTの説明は、FastAPI公式サイトのパスワード(およびハッシュ化)によるOAuth2、JWTトークンによるBearerにもあります。

ヘッダー

{
  "alg" : "HS256",
  "typ" : "JWT"
}

署名生成に使用したアルゴリズムを格納します。左記のHS256は、このトークンがHMAC-SHA256で署名されていることを示します。

ペイロード

{
  "sub": "1234567890",
  "name": "John Doe",
  "iat": 1516239022
}

認証情報などを格納します。

署名

HMAC-SHA256(
 base64urlEncoding(header) + '.' +
 base64urlEncoding(payload),
 secret
)

トークン検証用の署名。署名はヘッダーとペイロードをBase64urlエンコーディングしてピリオドで結合したものから生成します。署名はヘッダーで指定された暗号化アルゴリズムにより生成されます。

フロントエンド、APIの認証の流れ

フロントエンドを考慮した認証の流れは以下のようになります。

sequenceDiagram
    participant front as フロントエンド
    participant api as API
    front->>api: ユーザー名+パスワード
    api->>api: ユーザー認証
    api->>front: JWT
    front->>api: JWT+リクエスト
    api->>api: JWT確認
    api->>front: レスポンス

※ここの内容は、FastAPI公式サイトのチュートリアルのセキュリティにも記述してあります。

パッケージのインストール

ここでは、フォームデータを扱うため、python-multipartが必要になります。以下のコマンドでインストールしてください。

pip install python-multipart

JWTの発行には、python-joseを使用します。また、暗号を扱うためのパッケージが追加で必要となるので、ここでは、推奨されているcryptographyを使用します。以下のコマンドでインストールできます。

pip install "python-jose[cryptography]"

JWTを作成する関数の実装

まず、JWTに持たせるトークンのペイロードのスキーマを定義します。ここでは、ユーザーIDを持たせることとします。以下のapp/schemas/token.pyを作成してください。トークンの有効期限は、必ず持たせることとするので、ここには、記述しなくてよいです。

app/schemas/token.py

from pydantic import BaseModel


class TokenPayload(BaseModel):
    sub: str

スキーマを新しく定義したら、合わせてapp/schemas/__init__.pyも編集しましょう。以下の記述を追加してください。

app/schemas/__init__.py

from .token import TokenPayload

次に、トークンの有効期限をトークン発行後、何分とするかの設定を記述します。ここでは、60分に設定します。また、JWTの署名に利用する秘密鍵も設定します。app/core/config.pyを以下のように編集してください。

app/core/config.py

import secrets
from pydantic import BaseModel, PostgresDsn


class Settings(BaseModel):
    SQLALCHEMY_DATABASE_URI: PostgresDsn = (
        "postgresql://postgres:postgres@localhost:5000/test"
    )
    SECRET_KEY: str = secrets.token_urlsafe(32)
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 60


settings = Settings()

注意: ここで、SECRET_KEYは、secrets.token_urlsafe(32)としていますが、デプロイの際は、secrets.token_urlsafe(32)の実行結果を設定するようにしてください。このままだと、APIを起動するごとにSECRET_KEYが変わることになり、APIがステートフルになってしまいます。そうなるとロードバランサを導入した際など複数のプロセスで動作させた時に、不都合が生じます。

さて、次にJWTを作成する関数を実装しましょう。この関数は、app/core/security.pyに記述します。以下ように変更してください。

from datetime import datetime, timedelta, UTC

import bcrypt
from jose import jwt

from app import schemas
from app.core.config import settings

ALGORITHM = "HS256"


def create_access_token(
    payload: schemas.TokenPayload,
    expires_delta: timedelta | None = None,
    return_expire=False,
):
    if expires_delta:
        exp = datetime.now(UTC) + expires_delta
    else:
        exp = datetime.now(UTC) + timedelta(
            minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
        )

    to_encode = {key: value for key, value in payload}
    to_encode.update({"exp": exp})

    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)

    if return_expire:
        return encoded_jwt, exp
    else:
        return encoded_jwt


def get_password_hash(password: str) -> str:
    pwd_bytes = password.encode("utf-8")
    salt = bcrypt.gensalt()
    hashed_password = bcrypt.hashpw(password=pwd_bytes, salt=salt)
    return hashed_password.decode("utf-8")


def verify_password(plain_password: str, hashed_password: str) -> bool:
    pwd_bytes = plain_password.encode("utf-8")
    hashed_pwd_bytes = hashed_password.encode("utf-8")
    return bcrypt.checkpw(password=pwd_bytes, hashed_password=hashed_pwd_bytes)

JWTを発行するエンドポイントの実装

ここでは、POST /login/tokenにJWTを発行するエンドポイントを実装します。

まず、トークンの発行にあたり、ユーザー認証を行う必要があります。そこでapp/crud/user.pyCRUDUserクラスにユーザー認証を行うメソッドを追加しましょう。このメソッドでsignin_idpasswordが正しいか確認します。app/crud/user.pyCRUDUserに以下のメソッドを追加してください。

app/crud/user.py

from app.core.security import verify_password

class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
  ...

  def authenticate(
        self, db: Session, signin_id: str, password: str
    ) -> Optional[User]:
        user = self.read_by_signin_id(db, signin_id=signin_id)
        if not user:
            return None
        if not verify_password(password, user.hashed_password):
            return None
        return user

次はレスポンスとして返却するトークンのスキーマを定義しましょう。app/schemas/token.pyに以下を追加してください。

app/schemas/token.py

class Token(BaseModel):
    access_token: str
    token_type: str

ここでも、新しくスキーマを定義したので、合わせてapp/schemas/__init__.pyも編集します。以下の記述を追加してください。

app/schemas/__init__.py

from .token import TokenPayload, Token

最後にパスオペレーション関数の定義です。app/api/endpoints/auth.pyを作成してください。

app/api/endpoints/auth.py

from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from app.api.deps import get_db
from app.core import security
from app import schemas, crud

router = APIRouter()


@router.post("/login/token", response_model=schemas.Token)
def login(
    db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
):
    user = crud.user.authenticate(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect id or password")
    payload = schemas.TokenPayload(sub=str(user.id))
    access_token = security.create_access_token(payload)
    return {"access_token": access_token, "token_type": "bearer"}

作成したパスオペレーション関数をapiに含めましょう。app/api/api.pyを以下のように編集してください。

app/api/api.py

from fastapi import APIRouter

from app.api.endpoints import root, users, auth

api_router = APIRouter()
api_router.include_router(root.router, tags=["test"])
api_router.include_router(auth.router, tags=["auth"])
api_router.include_router(users.router, tags=["users"], prefix="/users")

それでは、SwaggerUIでトークンが発行できるか確かめてみましょう。サーバーを起動uvicorn app.main:app --reload --port 8000し、SwaggerUIにアクセスしてください。

POST /login/tokenを試してみましょう。Try it outを押し、usernamepasswordを入力し、Executeを押してみましょう。以下のようにaccess_tokenが返されれば、成功です。 JWT endpoint

IDやパスワードを間違えていると以下のようなレスポンスになるはずです。

Bad Request

パスオペレーション関数:依存関係の追加

ここからは、パスオペレーション関数がトークンを受け取り、トークンの検証を行えるようにしましょう。トークンを検証し、トークンのユーザーを返す関数を作成します。この関数は、パスオペレーション関数に依存関係として加えるので、app/api/deps.pyに記述します。以下のようにファイルを編集してください。

app/api/deps.py

from typing import Generator

from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session

from app.db.session import SessionLocal
from app import models, schemas, crud
from app.core.config import settings
from app.core import security


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login/token")


def get_db() -> Generator:
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()


def get_current_user(
    db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
    try:
        payload = jwt.decode(
            token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
        )
        token_data = schemas.TokenPayload(**payload)
    except (jwt.JWTError, ValidationError):
        raise HTTPException(
            status_code=403,
            detail="Could not validate credentials",
        )
    user = crud.user.read(db, id=token_data.sub)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

パスオペレーション関数の引数にOAuth2PasswordBearerのインスタンスであるoauth2_schemeを使ってtoken: str = Depends(oauth2_scheme)とすることで、リクエストのAuthorizationヘッダーを探しに行き、その値がBearerとトークンを含んでいるかどうかチェックし、トークンをstrとして返します。

もしAuthorizationヘッダーが見つからなかったり、値がBearerトークンを持っていなかったりすると、401 ステータスコードエラー (UNAUTHORIZED) で直接応答します。

また、OAuth2PasswordBearerの引数にtokenUrlにJWTを発行するURLを追加しておくと、Swagger UIに認証フォームが追加され、JWTによるユーザー認証のテストが行えるので、ここでは、oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login/token")としています。

このget_current_userは、パスオペレーション関数の引数に以下のように加えることで、そのエンドポイントには、認証が必要になります。

@router.get()
def test(current_user = Depends(get_current_user)):
    pass

それでは、app/api/endpoints/users.pyのパスオペレーション関数に、JWTによる認証を付けましょう。パスオペレーション関数の引数に以下を加えてください。

current_user: models.User = Depends(get_current_user)

変更後のapp/api/endpoints/users.pyは以下のようになります。

app/api/endpoints/users.py

from typing import List

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

from app.api.deps import get_db, get_current_user
from app import crud, schemas, models

router = APIRouter()


@router.post("", response_model=schemas.UserResponse)
def create_user(
    user_create: schemas.UserCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user),
):
    user = crud.user.read_by_signin_id(db, user_create.signin_id)
    if user:
        raise HTTPException(
            status_code=400,
            detail="The id already exists in the system.",
        )
    user = crud.user.create(db, user_create)
    return user


@router.get("", response_model=List[schemas.UserResponse])
def read_all_users(
    db: Session = Depends(get_db),
    skip: int = 0,
    limit: int = 100,
    current_user: models.User = Depends(get_current_user),
):
    users = crud.user.read_multi(db, skip=skip, limit=limit)
    return users


@router.put("/{signin_id}", response_model=schemas.UserResponse)
def update_user(
    signin_id: str,
    user_update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user),
):
    db_obj = crud.user.read_by_signin_id(db, signin_id)
    if db_obj is None:
        raise HTTPException(status_code=404, detail="User not found")
    user = crud.user.update(db, user_update, db_obj)
    return user


@router.delete("/{signin_id}", response_model=None)
def delete_user(
    signin_id: str,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user),
):
    user = crud.user.read_by_signin_id(db, signin_id)
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    crud.user.delete(db, user.id)

サーバーを起動し、Swagger UIで見てみましょう。

Swagger UI after JWT

上部にAuthorizeボタンが追加され、JWTの認証を付けたエンドポイントには、鍵のアイコンが付いていることが分かります。

実際に試してみましょう。鍵のアイコンがついたエンドポイントを試してみると、Unauthorizedとなり、レスポンスは以下のようになっているはずです。

Unauthorized

それでは、鍵のアイコンが付いたエンドポイントは、JWTが必要です。Authorizeボタンを押してください。以下のようなフォームが表示されるので、usernamepasswordを入力して、フォーム下のAuthorizeボタンを押してください。

oauth2 form

認証に成功すると、モーダル画面が以下のようになります。

oauth2 form after login

これで、鍵のアイコンが付いたエンドポイントも使えるようになりました。GET usersを試してみましょう。今度は、うまく実行できるはずです。

Execute後のResponsesにあるCurlを見てみましょう。Curlは、HTTPリクエストを送るコマンドです。curlの後にリクエストの内容が含まれています。そこを見てみると次のようなAuthorizationヘッダーが追加されていることが分かります。

curl -X 'GET' \
  'http://localhost:8000/users?skip=0&limit=100' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOjEsImV4cCI6MTY5Nzk1NTE4OH0.yxjWDw7QRmXbG32ntVoSH7EUEvlRbxE5AW1gkX2bl1A'

Swagger UIの機能で、先ほどのモーダル画面から取得したトークンをリクエストのAuthorizationヘッダーに自動的に追加しています。

ユーザーロールによるエンドポイントの制限

ここでは、管理者のみが扱えるエンドポイントを実装します。ユーザーの作成、削除、任意のユーザー情報の更新は、管理者のみが使えるものにしたいです。

実装方法は、様々です。すぐ実装できるのは、それぞれのパスオペレーション関数でcurrent_user.role == "Admin"の判定を行い、Falseであれば、raise HTTPExceptionとすれば、できそうです。

実際に、この方法でも実装することができますが、使いまわしが容易で、ロジックが分離されているように実装するため、パスオペレーション関数の依存関係として、get_current_userではなく、get_current_admin_userを実装しましょう。get_current_admin_userでは、トークンに記述されているIDのユーザー情報をDBから読み込み、そのユーザーがadminであるか判定を行います。

まず、ユーザーがアドミンであるかを判定するメソッドをapp/curd/user.pyCRUDUserクラスに加えましょう。以下のメソッドをCRUDUserクラスに追加してください。

app/crud/user.py

class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
    ...

    def is_admin(self, user: User) -> bool:
        return user.role == "Admin"

app/api/deps.pyget_current_admin_userを定義しましょう。app/api/deps.pyに以下の関数を追加してください。

app/api/deps.py

def get_current_admin_user(current_user: models.User = Depends(get_current_user)):
    if not crud.user.is_admin(current_user):
        raise HTTPException(
            status_code=400, detail="The user doesn't have enough privileges"
        )
    return current_user

管理者のみが使えるエンドポイントのパスオペレーション関数の依存関係を編集しましょう。編集内容は、簡単で、Depends(get_current_user)Depends(get_current_admin_user)に変更するだけです。

最終的にapp/api/ednpoints/users.pyは、以下のようになります。

from typing import List

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

from app.api.deps import get_db, get_current_user, get_current_admin_user
from app import crud, schemas, models

router = APIRouter()


@router.post("", response_model=schemas.UserResponse)
def create_user(
    user_create: schemas.UserCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_admin_user),
):
    user = crud.user.read_by_signin_id(db, user_create.signin_id)
    if user:
        raise HTTPException(
            status_code=400,
            detail="The id already exists in the system.",
        )
    user = crud.user.create(db, user_create)
    return user


@router.get("", response_model=List[schemas.UserResponse])
def read_all_users(
    db: Session = Depends(get_db),
    skip: int = 0,
    limit: int = 100,
    current_user: models.User = Depends(get_current_user),
):
    users = crud.user.read_multi(db, skip=skip, limit=limit)
    return users


@router.put("/{signin_id}", response_model=schemas.UserResponse)
def update_user(
    signin_id: str,
    user_update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_admin_user),
):
    db_obj = crud.user.read_by_signin_id(db, signin_id)
    if db_obj is None:
        raise HTTPException(status_code=404, detail="User not found")
    user = crud.user.update(db, user_update, db_obj)
    return user


@router.delete("/{signin_id}", response_model=None)
def delete_user(
    signin_id: str,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_admin_user),
):
    user = crud.user.read_by_signin_id(db, signin_id)
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    crud.user.delete(db, user.id)

(付録)JWTをHttpOnlyなCookieに保存するパターン

JWTをフロントエンドのどこに保持するか?は、よく話題に上がりますが、その中でもHttpOnlyなCookieに保存するというパターンが多いように感じます。

HttpOnlyなCookieは、Javascriptから操作することはできないので、バックエンド側で対応が必要になってきます。

そこで、ここでは、FastAPIでJWTをHttpOnlyなCookieに保存させるエンドポイントの実装と、Cookieに保存されたJWTを取得する方法について説明します。

JWTをHttpOnlyなCookieに保存させるエンドポイントの実装

フロントエンドにCookieを保存させるためには、レスポンスヘッダーのSet-Cookieを用います。

FastAPIでレスポンスヘッダーを編集するには、パスオペレーション関数の引数にresponse: Responseを設定し、このresponseに対して操作を行うことで、編集することができます。

from fastapi import APIRouter, Response

router = APIRouter()

@router.get("/headers")
def get_headers(response: Response):
    # ヘッダーの設定
    response.headers["X-Cat-Dog"] = "alone in the world"
    # Set-Cookieヘッダーの設定
    response.set_cookie(key="access_token", value="sample token")
    return None

それでは、JWTをHttpOnly Cookieに保存させるエンドポイントを実装しましょう。app/api/endpoints/auth.pyに記述していきます。

また、Cookieには有効期限も設定できるので、JWTと同じ有効期限を設定します。

app/api/endpoints/auth.py

# 追加
from fastapi import Response

@router.post("/login", response_model=None)
def login_cookie(
    response: Response,
    db: Session = Depends(get_db),
    form_data: OAuth2PasswordRequestForm = Depends(),
):
    user = crud.user.authenticate(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect id or password")
    payload = schemas.TokenPayload(sub=str(user.id))
    access_token, expire = security.create_access_token(payload, return_expire=True)
    response.set_cookie(
        key="access_token",
        value=f"Bearer {access_token}",
        httponly=True,
        expires=expire,
    )
    return None

サーバーを起動し、Swagger UIで動作を確認してみましょう。

authタグの/loginを実際に試してみてください。

loginのリクエストボディ

レスポンスは次のようになります。

loginのレスポンス

ここで、Chromeの検証ツールを使ってCookieの中身を見てみましょう。 検証ツールのApplicationタブ内でCookieの確認ができます。

クッキーの中身

Cookieに保存されたJWTの取得

次にCookieに保存されたJWTの取得方法です。HttpOnly Cookieに保存されたJWTは、リクエストヘッダーのCookieに設定されます。

FastAPIでリクエストヘッダーのCookieを読むためには、パスオペレーション関数の引数にrequest: Requestを設定し、request.cookiesから取得できます。また、request.cookiesdict[str, str]なので、request.cookies.get("access_token")とすることで、valueを取得できます。

@router.get("/cookie")
def cookie(request: Request):
    # cookiesはdict[str, str]なので、以下のように取得できる
    access_token = request.cookies.get("access_token")
    # access_token = "Bearer eyJhbGciOiJIUzI1NiIsInR5cC..."
    return None

またaccess_tokenは、f"{scheme} {param}"の形式で保存されており、JWTのデコードを行うのは、{param}の部分です。そのため、schemeparamに分解する必要があります。これは、FastAPIに実装があるので、それを使いましょう。

from fastapi.security.utils import get_authorization_scheme_param

@router.get("/cookie")
def cookie(request: Request):
    access_token = request.cookies.get("access_token")
    scheme, param = get_authorization_scheme_param(access_token)
    # scheme -> "Bearer", param -> "eyJhbGciOiJIUzI1NiIsInR5cC..."
    return None

認証が必要なエンドポイントで毎回この処理をやるわけにもいかないので、Depenedsの中に入れれる関数を作っていきましょう。また、access_tokenが取得できない時や、schemeBearerでない場合に、HTTPExceptionとなるようにしましょう。

app/api/deps.pyに以下の関数を追加してください。

app/api/deps.py

# 追加
from fastapi import Request
from fastapi.security.utils import get_authorization_scheme_param

def get_access_token_from_cookie(request: Request):
    access_token = request.cookies.get("access_token")
    scheme, param = get_authorization_scheme_param(access_token)
    if not access_token or scheme.lower() != "bearer":
        raise HTTPException(
            status_code=401,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return param

これまで、oauth2_schemeを使っていた関数get_current_userを書き換えてください。

app/api/deps.py

# 引数を変更
def get_current_user(
    db: Session = Depends(get_db), token: str = Depends(get_access_token_from_cookie)
) -> models.User:
    ...

これで、Cookieに保存されたJWTを取得し、このJWTを使ってログインユーザーを取得するようになりました。

サーバーを起動し、Swagger UIで動作を確認してみましょう。

最初にログインを実行し、その後でusersタグのエンドポイントを試してみてください。

Cookieの削除(ログアウト処理)

次にログアウト処理として、Cookieからaccess_tokenを削除させるエンドポイントを作成しましょう。

このエンドポイントの実装は簡単です。Cookieに保存させるエンドポイントは、response.set_cookieで行いましたが、削除はresponse.delete_cookieで行います。

app/api/endpoints/auth.pyに以下のパスオペレーション関数を追加してください。

# 追加
from app.api.deps import get_current_user
from app import models

@router.post("/logout", response_model=None)
def logout(response: Response, current_user: models.User = Depends(get_current_user)):
    response.delete_cookie("access_token")
    return None

このエンドポイントも試してみて、Chromeの検証ツールでCookieが削除されているか確認してみましょう。

Swagger UIの動作を戻す

ここまで、JWTをHttpOnlyなCookieに保存させるパターンを実装してきました。実装後、Swagger UIを見て気づいた方もいるかもしれませんが、これまでSwagger UIにあったAuthorizationのボタンや鍵のアイコンが無くなっています。

これは、Swagger UIがJWTをHttpOnlyなCookieに保存させるパターンに対応していないためです。HttpOnly CookieはJavascriptからアクセスできないため、JWTの状態をUIに反映できず、対応ができません。

ここでは、Swagger UIのAuthorizationのボタンや鍵のアイコンを残しつつ、JWTをHttpOnlyなCookieに保存させるエンドポイントも残す実装を行なっていきます。

注意: この実装では、JWTがクッキーにある場合でも、Authorizationヘッダーにある場合でも動作するようになります。

これは、FastAPI公式のissue:Cookie based JWT tokens #480にあったコードを参考にしたものです。

WTをHttpOnlyなCookieに保存させるパターンの実装前、JWTは、OAuth2PasswordBearerを使って取得していました。このOAuth2PasswordBearerに似たようなクラスを定義し、元の動作にプラスして、cookieのJWTも取得するようにします。

app/core/security.pyに以下のクラスを追加してください。

# 追加
from typing import Optional
from fastapi import Request, HTTPException
from fastapi.security import OAuth2
from fastapi.security.utils import get_authorization_scheme_param
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel


class OAuth2PasswordBearerWithCookie(OAuth2):
    def __init__(
        self,
        tokenUrl: str,
        scheme_name: str = None,
        scopes: dict = None,
        auto_error: bool = True,
    ):
        if not scopes:
            scopes = {}
        flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
        super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)

    async def __call__(self, request: Request) -> Optional[str]:
        authorization: str = request.headers.get("Authorization")
        if not authorization:
            authorization: str = request.cookies.get("access_token")

        scheme, param = get_authorization_scheme_param(authorization)
        if not authorization or scheme.lower() != "bearer":
            if self.auto_error:
                raise HTTPException(
                    status_code=401,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None

        return param

app/api/deps.pyoauth2_schemeを以下のように変更しましょう。

from app.core.security import OAuth2PasswordBearerWithCookie

oauth2_scheme = OAuth2PasswordBearerWithCookie(tokenUrl="login/token")

次にapp/api/deps.pyget_current_userでJWTを取得する際に、oauth2_schemeを使うように変更しましょう。

def get_current_user(
    db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
) -> models.User:
    ...

これで完了です。サーバーを起動し、Swagger UIで動作を確認してみましょう。

最初にCookieを削除するようにしてください。これでSwagger UIの認証も使えますし、CookieにJWTを保存するパターンでも使えます。

おまけ

一般ユーザーが自分自身のデータを取得するエンドポイントGET /users/myselfや、自分自身のデータを更新できるエンドポイントPUT /users/myselfを作成してみましょう。

答えは以下。


app/api/endpoints/users.pyに以下のパスオペレーション関数を追加

@router.get("/myself", response_model=schemas.UserResponse)
def read_myself(
    current_user: models.User = Depends(get_current_user),
):
    return current_user


@router.put("/myself", response_model=schemas.UserResponse)
def update_myself(
    user_update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user),
):
    user = crud.user.update(db, user_update, current_user)
    return user

Next: Chapter6 Alembicを使ったマイグレーション

Prev: Chapter4 DBとの連携