-
Notifications
You must be signed in to change notification settings - Fork 4
/
app.py
123 lines (93 loc) · 3.88 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""JWT in httpOnly cookies with OAuth2 password flow."""
from calendar import timegm
from datetime import datetime, timedelta, UTC
from typing import List, Optional
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from starlette.requests import Request
from starlette.responses import JSONResponse
from pydantic import BaseModel
class OAuth2PasswordCookie(OAuth2PasswordBearer):
"""OAuth2 password flow with token in a httpOnly cookie."""
def __init__(self, *args, token_name: Optional[str] = None, **kwargs):
super().__init__(*args, **kwargs)
self._token_name = token_name or "my-jwt-token"
@property
def token_name(self) -> str:
"""Get the name of the token's cookie."""
return self._token_name
async def __call__(self, request: Request) -> str:
"""Extract and return a JWT from the request cookies.
Raises:
HTTPException: 403 error if no token cookie is present.
"""
token = request.cookies.get(self._token_name)
if not token:
raise HTTPException(status_code=403, detail="Not authenticated")
return token
oauth2_scheme = OAuth2PasswordCookie( # pylint: disable=invalid-name
tokenUrl="/auth/token", scopes={"user": "User", "admin": "Administrator"}
)
app = FastAPI() # pylint: disable=invalid-name
def validate_jwt_payload(token: str) -> dict:
"""Decode and validate a JSON web token.
Args:
token (str): JWT token.
Returns:
dict
Raises:
HTTPException: 401 error if the credentials have expired or failed
validation.
"""
try:
payload = jwt.decode(token, "i am a secret", algorithms=["HS256"])
utc_now = timegm(datetime.now(UTC).utctimetuple())
if payload["exp"] <= utc_now:
raise HTTPException(401, detail="Credentials have expired")
return payload
except jwt.PyJWTError:
raise HTTPException(401, detail="Could not validate credentials")
class UserData(BaseModel):
"""User name, ID and scopes."""
user: str
user_id: int
scopes: List[str]
def is_authenticated(token: str = Security(oauth2_scheme)) -> UserData:
"""Dependency on user being authenticated."""
payload = validate_jwt_payload(token)
return UserData(
user=payload["user"], user_id=payload["user_id"], scopes=payload["scopes"]
)
# dummy users 'database'
_USER_DB = {
"jeff": {"password": "secret", "scopes": ["user"], "user_id": 0},
"pete": {"password": "secret", "scopes": ["user", "admin"], "user_id": 0},
}
@app.post("/auth/token")
async def authenticate(form_data: OAuth2PasswordRequestForm = Depends()):
"""Verify login details and issue JWT in httpOnly cookie.
Raises:
HTTPException: 401 error if username or password are not recognised.
"""
user = _USER_DB.get(form_data.username)
if user is None or user["password"] != form_data.password:
raise HTTPException(status_code=401, detail="Incorrect email or password")
data = {
"user": form_data.username,
"user_id": user["user_id"],
"scopes": user["scopes"],
}
issued_at = datetime.now(UTC)
expire = issued_at + timedelta(minutes=15)
data.update({"exp": expire, "iat": issued_at, "sub": "jwt-cookies-test"})
encoded_jwt = jwt.encode(data, "i am a secret", algorithm="HS256")
response = JSONResponse({"status": "authenticated"})
# NOTE: we should also set secure=True to mark this as a `secure` cookie
# https://tools.ietf.org/html/rfc6265#section-4.1.2.5
response.set_cookie(oauth2_scheme.token_name, encoded_jwt, httponly=True)
return response
@app.get("/")
async def home(_: Request, user: UserData = Depends(is_authenticated)):
"""Return sample JSON iff the user is authenticated."""
return {"status": "ok", "user": user}