diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/dependencies/__init__.py b/app/dependencies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/dependencies/permissions_checker.py b/app/dependencies/permissions_checker.py new file mode 100644 index 0000000..896679f --- /dev/null +++ b/app/dependencies/permissions_checker.py @@ -0,0 +1,18 @@ +from ..dependencies import users_active +from fastapi import Depends, HTTPException, status +from ..models import users + + +class PermissionChecker: + + def __init__(self, roles: list[str]) -> None: + self.roles = roles + + def __call__(self, user: users.User = Depends(users_active.get_current_active_user)) -> bool: + for role in self.roles: + if role == user.roles: + return True + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail='Roles unauthorized' + ) diff --git a/app/dependencies/users_active.py b/app/dependencies/users_active.py new file mode 100644 index 0000000..9ca2b78 --- /dev/null +++ b/app/dependencies/users_active.py @@ -0,0 +1,83 @@ +from datetime import datetime, timedelta +from typing import Annotated + +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from jose import JWTError, jwt +from passlib.context import CryptContext + +from ..models import users, token + + +fake_users = [ + # password foo + {'id': 1, 'username': 'admin', 'password': '$2b$12$N.i74Kle18n5Toxhas.rVOjZreVC2WM34fCidNDyhSNgxVlbKwX7i', + 'roles': 'Admin', 'disabled': False + }, + # password bar + {'id': 2, 'username': 'client', 'password': '$2b$12$KUgpw1m0LF/s9NS1ZB5rRO2cA5D13MqRm56ab7ik2ixftXW/aqEyq', + 'roles':'User', 'disabled':False} +] + +SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password): + return pwd_context.hash(password) + +def get_user(db, username: str): + for user in db: + if username == user['username']: + return users.UserInDB(**user) + +def authenticate_user(fake_db, username: str, password: str): + user = get_user(fake_db, username) + if not user: + return False + if not verify_password(password, user.password): + return False + return user + +def create_access_token(data: dict, expires_delta: timedelta | None = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +async def get_current_user(token_str: Annotated[str, Depends(oauth2_scheme)]): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token_str, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = token.TokenData(username=username) + except JWTError: + raise credentials_exception + user = get_user(fake_users, username=token_data.username) + if user is None: + raise credentials_exception + return user + +async def get_current_active_user( + current_user: Annotated[users.User, Depends(get_current_user)] +): + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive user") + return current_user \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..28a1294 --- /dev/null +++ b/app/main.py @@ -0,0 +1,13 @@ +from fastapi import FastAPI + +from .routers import users, token + + +app = FastAPI() + +app.include_router(users.router) +app.include_router(token.router) + +@app.get("/") +async def root(): + return {"message": "Hello World !"} \ No newline at end of file diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/token.py b/app/models/token.py new file mode 100644 index 0000000..c3c0548 --- /dev/null +++ b/app/models/token.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel + +class Token(BaseModel): + access_token: str + token_type: str + + +class TokenData(BaseModel): + username: str | None = None \ No newline at end of file diff --git a/app/models/users.py b/app/models/users.py new file mode 100644 index 0000000..de11fcd --- /dev/null +++ b/app/models/users.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel + + +class User(BaseModel): + id: int + username: str + password: str + roles: str + disabled: bool + +class UserInDB(User): + password: str \ No newline at end of file diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/events.py b/app/routers/events.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/token.py b/app/routers/token.py new file mode 100644 index 0000000..4d99f28 --- /dev/null +++ b/app/routers/token.py @@ -0,0 +1,27 @@ +from datetime import datetime, timedelta + +from typing import Annotated +from fastapi import Depends, FastAPI, HTTPException, status, APIRouter +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from ..dependencies import users_active +from ..models import token + +router = APIRouter() + + +@router.post("/token", response_model=token.Token, tags=["token"]) +async def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()] +): + user = users_active.authenticate_user(users_active.fake_users, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=users_active.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = users_active.create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + return {"access_token": access_token, "token_type": "bearer"} \ No newline at end of file diff --git a/app/routers/users.py b/app/routers/users.py new file mode 100644 index 0000000..0ec1ccf --- /dev/null +++ b/app/routers/users.py @@ -0,0 +1,15 @@ +from fastapi import APIRouter, Depends +from ..dependencies import users_active, permissions_checker +from ..models import users +from typing import Annotated + + +router = APIRouter() + +@router.get("/users/", tags=["users"], response_model=list[users.User]) +async def read_users(current_user: Annotated[users.User, Depends(users_active.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]): + return users_active.fake_users + +@router.get("/users/me",tags=["users"], response_model=users.User) +async def read_users_me(current_user: Annotated[users.User, Depends(users_active.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]): + return current_user \ No newline at end of file diff --git a/requierements.txt b/requierements.txt new file mode 100644 index 0000000..5bd6d38 --- /dev/null +++ b/requierements.txt @@ -0,0 +1,6 @@ +fastapi +uvicorn[standard] +pydantic-mongo +python-jose[cryptography] +passlib[bcrypt] +python-multipart \ No newline at end of file