109 lines
5.7 KiB
Python
Raw Normal View History

2023-10-14 15:16:38 +02:00
from fastapi import APIRouter, Depends, HTTPException, status
2023-10-14 17:34:34 +02:00
from ..dependencies import users_token, permissions_checker, database
2023-10-12 00:04:18 +02:00
from ..models import users
2023-10-12 00:14:50 +02:00
from typing import Annotated
2023-10-14 15:48:45 +02:00
from bson import ObjectId
2023-10-14 21:28:21 +02:00
from passlib.context import CryptContext
2023-10-11 23:45:12 +02:00
2023-10-10 22:13:47 +02:00
router = APIRouter()
2023-10-14 17:41:34 +02:00
2023-10-13 22:35:04 +02:00
@router.get("/users", tags=["users"], response_model=list[users.UserOut])
2023-10-14 15:16:38 +02:00
async def read_users(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], skip: int = 0, limit: int = 20):
if limit < 1 or skip < 0 or limit < skip:
2023-10-14 11:53:31 +02:00
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
2023-10-14 15:16:38 +02:00
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
2023-10-14 11:53:31 +02:00
)
limit = limit + skip
2023-10-13 14:59:57 +02:00
listUsers = []
2023-10-13 22:55:52 +02:00
user_repository = users.UserRepository(database=database.database)
2023-10-14 11:53:31 +02:00
for user_index in user_repository.find_by({}, limit=limit, skip=skip):
2023-10-14 18:20:08 +02:00
user = users.UserOut(id=user_index.id, username=user_index.username, disabled=user_index.disabled, roles=user_index.roles, removed=user_index.removed, confirmed=user_index.confirmed)
2023-10-13 14:59:57 +02:00
listUsers.append(user)
return listUsers
2023-10-12 00:14:50 +02:00
2023-10-14 15:48:45 +02:00
@router.get("/users/search", tags=["users"], response_model=list[users.UserOut])
async def read_users_id(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, key: str | None = None, value: str | None= None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
if key is None or value is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Key or/and value parameter is empty"
)
limit = limit + skip
listUsers = []
user_repository = users.UserRepository(database=database.database)
for user_index in user_repository.find_by({key: {'$regex': value}}, limit=limit, skip=skip):
2023-10-14 18:20:08 +02:00
user = users.UserOut(id=user_index.id, username=user_index.username, disabled=user_index.disabled, roles=user_index.roles, removed=user_index.removed, confirmed=user_index.confirmed)
2023-10-14 15:48:45 +02:00
listUsers.append(user)
return listUsers
2023-10-14 17:41:34 +02:00
@router.get("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "disabled"])
2023-10-14 17:34:34 +02:00
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
2023-10-14 15:48:45 +02:00
return current_user
@router.get("/users/{item_id}", tags=["users"], response_model=users.User)
async def read_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by_id(ObjectId(item_id))
return user
2023-10-14 18:20:08 +02:00
2023-10-14 18:33:05 +02:00
@router.delete("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "disabled"])
2023-10-14 18:20:08 +02:00
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False):
user_repository = users.UserRepository(database=database.database)
current_user.disabled = True
if remove is True:
current_user.removed = True
2023-10-14 18:29:07 +02:00
user_repository.save(current_user)
2023-10-14 18:20:08 +02:00
return current_user
@router.delete("/users/{item_id}", tags=["users"], response_model=users.User)
async def read_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by_id(ObjectId(item_id))
user.disabled = True
if remove is True:
user.removed = True
2023-10-14 18:29:07 +02:00
user_repository.save(user)
2023-10-14 18:20:08 +02:00
return user
2023-10-14 21:28:21 +02:00
@router.put("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "disabled"])
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], userSingle: users.UserIn | None = None):
user_repository = users.UserRepository(database=database.database)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
current_user.username = userSingle.username
current_user.password = pwd_context.hash(userSingle.password)
current_user.roles = userSingle.roles
user_repository.save(current_user)
return current_user
@router.put("/users", tags=["users"], response_model=users.User, status_code=status.HTTP_200_OK)
async def read_users_id(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], userSingle: users.UserIn | None = None):
user_repository = users.UserRepository(database=database.database)
if userSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
user = user_repository.find_one_by({"username": {'$eq': userSingle.username}})
if user is None:
response.status_code = status.HTTP_201_CREATED
user = users.User()
user.username = userSingle.username
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
user.password = pwd_context.hash(userSingle.password)
user.roles = userSingle.roles
user_repository.save(user)
return user