from datetime import datetime, timedelta from typing import Annotated from fastapi import Depends, FastAPI, HTTPException, status, APIRouter from fastapi.responses import JSONResponse from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from ..dependencies import users_token, permissions_checker from ..models import token, users router = APIRouter() ACCESS_TOKEN_EXPIRE_MINUTES = 30 @router.post("/token", tags=["token"]) async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): user = users_token.authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = users_token.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) content = {"roles":user.roles,"message": "Access token generated"} response = JSONResponse(content=content) response.set_cookie(key="access_token", value="Bearer {0}".format(access_token), httponly=True) return response @router.get("/token",tags=["token"]) async def check_token(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]): content = {"message": "Check token"} response = JSONResponse(content=content) return response @router.delete("/token",tags=["token"]) async def check_token(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]): content = {"message": "Token deleted"} response = JSONResponse(content=content) response.set_cookie(key="access_token", value="", httponly=True) return response