44 lines
2.0 KiB
Python

from datetime import datetime, timedelta
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status, APIRouter
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from ..dependencies import users_token, permissions_checker
from ..models import token, users
router = APIRouter()
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@router.post("/token", tags=["token"])
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = users_token.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = users_token.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
content = {"roles":user.roles,"message": "Access token generated"}
response = JSONResponse(content=content)
response.set_cookie(key="access_token", value="Bearer {0}".format(access_token), httponly=True)
return response
@router.get("/token",tags=["token"])
async def check_token(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
content = {"message": "Check token"}
response = JSONResponse(content=content)
return response
@router.delete("/token",tags=["token"])
async def check_token(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
content = {"message": "Token deleted"}
response = JSONResponse(content=content)
response.set_cookie(key="access_token", value="", httponly=True)
return response