Merge pull request 'mongo' (#2) from mongo into master

Reviewed-on: #2
This commit is contained in:
v4l3n71n 2023-10-14 13:50:35 +00:00
commit f8f2c83d6d
8 changed files with 107 additions and 35 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
__pycache__
.env

View File

@ -0,0 +1,5 @@
from pymongo import MongoClient
import os
client = MongoClient("mongodb+srv://{0}:{1}@{2}/?retryWrites=true&w=majority&appName=AtlasApp".format(os.environ["MONGO_USER"], os.environ["MONGO_PASSWORD"], os.environ["MONGO_HOST"]))
database = client[os.environ["MONGO_DATABASE"]]

View File

@ -0,0 +1,20 @@
from ..models import users
from ..dependencies import database
from passlib.context import CryptContext
def add(username="", password="", roles="User", disabled=False):
user_repository = users.UserRepository(database=database.database)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
result = user_repository.find_one_by({'username': username})
change = "added"
user = users.User(username=username, password=pwd_context.hash(password), roles=roles, disabled=disabled)
if result is not None:
result.password=pwd_context.hash(password)
result.roles=roles
result.disabled=disabled
user = result
change = "updated"
user_repository.save(user)
print("{0} {1}".format(username, change))

View File

@ -1,27 +1,17 @@
from datetime import datetime, timedelta
from typing import Annotated
from pymongo import MongoClient
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from ..models import users, token
fake_users = [
# password foo
{'id': 1, 'username': 'admin', 'password': '$2b$12$N.i74Kle18n5Toxhas.rVOjZreVC2WM34fCidNDyhSNgxVlbKwX7i',
'roles': 'Admin', 'disabled': False
},
# password bar
{'id': 2, 'username': 'client', 'password': '$2b$12$KUgpw1m0LF/s9NS1ZB5rRO2cA5D13MqRm56ab7ik2ixftXW/aqEyq',
'roles':'User', 'disabled':False}
]
from ..dependencies import database
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@ -33,13 +23,14 @@ def verify_password(plain_password, hashed_password):
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
for user in db:
if username == user['username']:
return users.UserInDB(**user)
def get_user(username: str):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({'username': username})
return user
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.password):
@ -70,7 +61,8 @@ async def get_current_user(token_str: Annotated[str, Depends(oauth2_scheme)]):
token_data = token.TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users, username=token_data.username)
user = get_user(token_data.username)
if user is None:
raise credentials_exception
return user

View File

@ -1,13 +1,19 @@
from fastapi import FastAPI
from .routers import users, token
from .dependencies import user_add
app = FastAPI()
app.include_router(users.router)
app.include_router(token.router)
@app.on_event("startup")
async def startup_event():
user_add.add(username="Peter93", password="toto", roles="Admin")
user_add.add(username="Robert80", password="titi", roles="User")
@app.get("/")
async def root():
return {"message": "Hello World !"}

View File

@ -1,12 +1,22 @@
from pydantic import BaseModel
from pydantic_mongo import AbstractRepository, ObjectIdField
class User(BaseModel):
id: int
id: ObjectIdField = None
username: str
password: str
roles: str
disabled: bool
class UserOut(BaseModel):
id: ObjectIdField = None
username: str
roles: str
disabled: bool
class UserInDB(User):
password: str
class UserRepository(AbstractRepository[User]):
class Meta:
collection_name = "users"

View File

@ -7,20 +7,20 @@ from ..dependencies import users_active
from ..models import token
router = APIRouter()
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@router.post("/token", response_model=token.Token, tags=["token"])
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = users_active.authenticate_user(users_active.fake_users, form_data.username, form_data.password)
user = users_active.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=users_active.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = users_active.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)

View File

@ -1,15 +1,53 @@
from fastapi import APIRouter, Depends
from ..dependencies import users_active, permissions_checker
from fastapi import APIRouter, Depends, HTTPException, status
from ..dependencies import users_active, permissions_checker, database
from ..models import users
from typing import Annotated
from bson import ObjectId
router = APIRouter()
@router.get("/users/", tags=["users"], response_model=list[users.User])
async def read_users(current_user: Annotated[users.User, Depends(users_active.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
return users_active.fake_users
@router.get("/users", tags=["users"], response_model=list[users.UserOut])
async def read_users(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], skip: int = 0, limit: int = 20):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
limit = limit + skip
listUsers = []
user_repository = users.UserRepository(database=database.database)
for user_index in user_repository.find_by({}, limit=limit, skip=skip):
user = users.UserOut(id=user_index.id, username=user_index.username, disabled=user_index.disabled, roles=user_index.roles)
listUsers.append(user)
return listUsers
@router.get("/users/me",tags=["users"], response_model=users.User)
@router.get("/users/search", tags=["users"], response_model=list[users.UserOut])
async def read_users_id(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, key: str | None = None, value: str | None= None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
if key is None or value is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Key or/and value parameter is empty"
)
limit = limit + skip
listUsers = []
user_repository = users.UserRepository(database=database.database)
for user_index in user_repository.find_by({key: {'$regex': value}}, limit=limit, skip=skip):
user = users.UserOut(id=user_index.id, username=user_index.username, disabled=user_index.disabled, roles=user_index.roles)
listUsers.append(user)
return listUsers
@router.get("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["password"])
async def read_users_me(current_user: Annotated[users.User, Depends(users_active.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
return current_user
@router.get("/users/{item_id}", tags=["users"], response_model=users.User)
async def read_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by_id(ObjectId(item_id))
return user