-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
144 lines (122 loc) · 5.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# `/login` endpoint is responsible for authenticating the user and generating the JWT access token. The `admin_only` dependency checks whether the current user has admin privileges, restricting access to certain endpoints accordingly. The `get_current_user` dependency validates the provided token and retrieves the user information for authorization purposes.
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from pydantic import BaseModel
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Sample data storage
data = []
# Define request body models
class Item(BaseModel):
name: str
price: float
class User(BaseModel):
username: str
password: str
is_admin: bool
# Hash and verify password
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# Authenticate user
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
# Encode JWT token
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Verify JWT token
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials")
token_data = TokenData(username=username)
return token_data
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials")
# Mock user database
users_db = [
{
"username": "admin",
"password": get_password_hash("admin123"),
"is_admin": True
},
{
"username": "user",
"password": get_password_hash("user123"),
"is_admin": False
}
]
# Get user by username
def get_user(username):
for user in users_db:
if user["username"] == username:
return User(**user)
return None
# Authenticate and generate access token
async def login(username: str, password: str):
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
token = create_access_token({"sub": user.username}, expires_delta=access_token_expires)
return {"access_token": token, "token_type": "bearer"}
# Dependency function to validate and retrieve token credentials
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token_data = verify_token(credentials.credentials)
return get_user(token_data.username)
# Authorization check for admin-only endpoints
async def admin_only(current_user: User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough privileges")
# GET request to retrieve all items
@app.get("/items")
async def get_items():
return data
# POST request to add a new item
@app.post("/items")
async def add_item(item: Item, current_user: User = Depends(get_current_user)):
if current_user.is_admin:
data.append(item)
return {"message": "Item added successfully"}
raise HTTPExcepätion(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough privileges")
# PUT request to update an existing item
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, current_user: User = Depends(get_current_user)):
if current_user.is_admin:
if item_id < len(data):
data[item_id] = item
return {"message": "Item updated successfully"}
raise HTTPExcepion(status_code=status.HTTP_404_NOT_FOUND, detail="Item not found")
raise HTTPExcepion(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough privileges")
# DELETE request to remove an item
@app.delete("/items/{item_id}")
async def delete_item(item_id: int, current_user: User = Depends(get_current_user)):
if current_user.is_admin:
if item_id < len(data):
del data[item_id]
return {"message": "Item deleted successfully"}
raise HTTPExcepion(status_code=status.HTTP_404_NOT_FOUND, detail="Item not found")
raise HTTPExcepion(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough privileges")
# Login endpoint to generate access token
@app.post("/login")
async def login_route(username: str, password: str):
return await login(username, password)