from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import JSONResponse from ..dependencies import users_token, permissions_checker, database from ..models import users from typing import Annotated from bson import ObjectId router = APIRouter() @router.get("/users", tags=["users"], response_model=list[users.UserOut]) async def read_users(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], skip: int = 0, limit: int = 20): if limit < 1 or skip < 0 or limit < skip: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip" ) limit = limit + skip listUsers = [] user_repository = users.UserRepository(database=database.database) for user_index in user_repository.find_by({}, limit=limit, skip=skip): user = users.UserOut(id=user_index.id, username=user_index.username, email=user_index.email, disabled=user_index.disabled, roles=user_index.roles, removed=user_index.removed, confirmed=user_index.confirmed) listUsers.append(user) return listUsers @router.get("/users/search", tags=["users"], response_model=list[users.UserOut]) async def read_users_id(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, key: str | None = None, value: str | None= None): if limit < 1 or skip < 0 or limit < skip: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip" ) if key is None or value is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Key or/and value parameter is empty" ) limit = limit + skip listUsers = [] user_repository = users.UserRepository(database=database.database) for user_index in user_repository.find_by({key: {'$regex': value}}, limit=limit, skip=skip): user = users.UserOut(id=user_index.id, username=user_index.username, disabled=user_index.disabled, roles=user_index.roles, email=user_index.email, removed=user_index.removed, confirmed=user_index.confirmed) listUsers.append(user) return listUsers @router.get("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "disabled"]) async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]): return current_user @router.get("/users/count", tags=["users"]) async def read_users_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]): count = database.database.get_collection("users").estimated_document_count() content = {"count":count} response = JSONResponse(content=content) return response @router.get("/users/{item_id}", tags=["users"], response_model=users.User) async def read_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]): user_repository = users.UserRepository(database=database.database) user = user_repository.find_one_by_id(ObjectId(item_id)) return user @router.delete("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "disabled"]) async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False): user_repository = users.UserRepository(database=database.database) current_user.disabled = True if remove is True: current_user.removed = True user_repository.save(current_user) return current_user @router.delete("/users/{item_id}", tags=["users"], response_model=users.User) async def read_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False): user_repository = users.UserRepository(database=database.database) user = user_repository.find_one_by_id(ObjectId(item_id)) user.disabled = True if remove is True: user.removed = True user_repository.save(user) return user @router.put("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "disabled"]) async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], userSingle: users.UserIn | None = None): user_repository = users.UserRepository(database=database.database) current_user.username = userSingle.username current_user.password = user_token.get_password_hash(userSingle.password) current_user.roles = userSingle.roles current_user.email = userSingle.email user_repository.save(current_user) return current_user @router.put("/users", tags=["users"], response_model=users.User, status_code=status.HTTP_200_OK) async def read_users_id(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], userSingle: users.UserIn | None = None): if userSingle is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Body request is empty" ) user_repository = users.UserRepository(database=database.database) user = user_repository.find_one_by({"username": {'$eq': userSingle.username}}) if user is None: response.status_code = status.HTTP_201_CREATED user = users.User() user.username = userSingle.username user.password = user_token.get_password_hash(userSingle.password) user.roles = userSingle.roles user.email = userSingle.email user_repository.save(user) return user