Compare commits

...

No commits in common. "1f80085ed08e2e3b3b309649525485843c0ab5eb" and "master" have entirely different histories.

32 changed files with 1810 additions and 40 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
.env*
__pycache__
.env*

11
Dockerfile Normal file
View File

@ -0,0 +1,11 @@
FROM python:3.11.6-alpine3.18
WORKDIR /app
COPY requierements.txt /app/
COPY app /app/app
RUN pip install --upgrade pip && pip install -r requierements.txt
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

0
README.md Normal file
View File

0
app/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,37 @@
from fastapi.security import OAuth2
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi import Request
from fastapi.security.utils import get_authorization_scheme_param
from fastapi import HTTPException
from fastapi import status
from typing import Optional
from typing import Dict
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.cookies.get("access_token") #changed to accept access token from httpOnly Cookie
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param

View File

@ -0,0 +1,7 @@
from pymongo import MongoClient
import os, redis
client = MongoClient("mongodb+srv://{0}:{1}@{2}/?retryWrites=true&w=majority&appName=AtlasApp".format(os.environ["MONGO_USER"], os.environ["MONGO_PASSWORD"], os.environ["MONGO_HOST"]))
database = client[os.environ["MONGO_DATABASE"]]
connect_redis = redis.Redis(host=os.environ["REDIS_URL"], port=os.environ["REDIS_PORT"], decode_responses=True)

14
app/dependencies/mail.py Normal file
View File

@ -0,0 +1,14 @@
from fastapi_mail import ConnectionConfig
from pathlib import Path
import os
conf = ConnectionConfig(
MAIL_USERNAME = os.environ["MAILER_USERNAME"],
MAIL_PASSWORD = os.environ["MAILER_PASSWORD"],
MAIL_FROM = os.environ["MAILER_FROM"],
MAIL_PORT = os.environ["MAILER_PORT"],
MAIL_SERVER = os.environ["MAILER_HOST"],
MAIL_STARTTLS = True,
MAIL_SSL_TLS = False,
TEMPLATE_FOLDER = Path(__file__).parents[1] / 'templates',
)

View File

@ -0,0 +1,18 @@
from ..dependencies import users_token
from fastapi import Depends, HTTPException, status
from ..models import users
class PermissionChecker:
def __init__(self, roles: list[str]) -> None:
self.roles = roles
def __call__(self, user: users.User = Depends(users_token.get_current_active_user)) -> bool:
for role in self.roles:
if role == user.roles:
return True
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Roles unauthorized'
)

View File

@ -0,0 +1,22 @@
from ..models import users
from ..dependencies import database
from passlib.context import CryptContext
from pydantic import EmailStr
def add(username="", password="", roles="User", status=1, email="test@toto.com"):
user_repository = users.UserRepository(database=database.database)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
result = user_repository.find_one_by({'username': username})
change = "added"
user = users.User(username=username, password=pwd_context.hash(password), roles=roles, status=status, email=email)
if result is not None:
result.password=pwd_context.hash(password)
result.roles=roles
result.status=status
result.email=email
user = result
change = "updated"
user_repository.save(user)
print("{0} {1}".format(username, change))

View File

@ -0,0 +1,87 @@
from datetime import datetime, timedelta
from typing import Annotated
from pymongo import MongoClient
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from ..models import users, token
from ..dependencies import database, cookie
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = cookie.OAuth2PasswordBearerWithCookie(tokenUrl="token")
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(username: str):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({'username': username})
return user
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.password):
return False
user.connected_at = datetime.today()
user_repository = users.UserRepository(database=database.database)
user_repository.save(user)
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token_str: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token_str, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = token.TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[users.User, Depends(get_current_user)]
):
if current_user.status == 0:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

48
app/main.py Normal file
View File

@ -0,0 +1,48 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import users, token, mail, events, tags, password
from .dependencies import user_add
import os
app = FastAPI()
origins = [
"http://localhost:8084",
"https://backend.valczeryba.ovh",
"https://facebook.com",
"https://fetlife.com",
"https://backoffice.valczeryba.ovh"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(users.router)
app.include_router(token.router)
app.include_router(mail.router)
app.include_router(events.router)
app.include_router(tags.router)
app.include_router(password.router)
@app.on_event("startup")
async def startup_event():
if os.environ.get("USERNAME_ADMIN") is not None and os.environ.get("PASSWORD_ADMIN") is not None:
user_add.add(username=os.environ["USERNAME_ADMIN"], password=os.environ["PASSWORD_ADMIN"], roles="Admin")
else:
print("User admin not added")
if os.environ.get("USERNAME_TEST") is not None and os.environ.get("PASSWORD_TEST") is not None:
user_add.add(username=os.environ["USERNAME_TEST"], password=os.environ["PASSWORD_TEST"], roles="User")
else:
print("User test not added")
@app.get("/")
async def root():
return {"message": "Hello World !"}

0
app/models/__init__.py Normal file
View File

7
app/models/email.py Normal file
View File

@ -0,0 +1,7 @@
from pydantic import BaseModel, EmailStr
from typing import List, Any, Dict
class EmailSchema(BaseModel):
email: List[EmailStr]
body: Dict[str, Any]

53
app/models/events.py Normal file
View File

@ -0,0 +1,53 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
class Event(BaseModel):
id: ObjectIdField = None
name: str
place: str
description: str
imgUrl: str | None = None
status: int = 0
latitude: float = 0.0
longitude: float = 0.0
organizers: list[str] = []
tags: list[str] = []
start_date: datetime | None = None
end_date: datetime | None = None
created_at: datetime = datetime.today()
updated_at: datetime | None = None
deleted_at: datetime | None = None
disabled_at: datetime | None = None
class EventOut(BaseModel):
id: ObjectIdField = None
name: str
place: str
description: str
imgUrl: str | None = None
status: int = 0
start_date: datetime | None = None
end_date: datetime | None = None
tags: list[str] = []
class EventIn(BaseModel):
name: str
place: str
description: str
imgUrl: str | None = None
status: int = 0
organizers: list[str] = []
tags: list[str] = []
start_date: datetime | None = None
end_date: datetime | None = None
latitude: float = 0.0
longitude: float = 0.0
class EventIDS(BaseModel):
ids: list[str]
class EventRepository(AbstractRepository[Event]):
class Meta:
collection_name = "events"

24
app/models/tags.py Normal file
View File

@ -0,0 +1,24 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
class Tags(BaseModel):
id: ObjectIdField = None
name: str
created_at: datetime = datetime.today()
class TagsOut(BaseModel):
id: ObjectIdField = None
name: str
class TagsIn(BaseModel):
name: str
class TagsIDS(BaseModel):
ids: list[str]
class TagsRepository(AbstractRepository[Tags]):
class Meta:
collection_name = "tags"

9
app/models/token.py Normal file
View File

@ -0,0 +1,9 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None

60
app/models/users.py Normal file
View File

@ -0,0 +1,60 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
class User(BaseModel):
id: ObjectIdField = None
username: str
password: str
firstName: str = ""
name: str = ""
roles: str = "User"
status: int = 0
email: EmailStr
birth: datetime | None = None
created_at: datetime = datetime.today()
connected_at: datetime | None = None
updated_at: datetime | None = None
deleted_at: datetime | None = None
disabled_at: datetime | None = None
class UserOut(BaseModel):
id: ObjectIdField = None
username: str
roles: str
firstName: str
name: str
status: int = 0
email: EmailStr
class UserIn(BaseModel):
username: str
name: str
firstName: str
roles: str
password: str
birth: str
email: EmailStr
class UserCreate(BaseModel):
username: str
password: str
email: EmailStr
birth: str
firstName: str
name: str
class UserForgotPassword(BaseModel):
email: EmailStr
class UserInDB(User):
password: str
class UserIDS(BaseModel):
ids: list[str]
class UserRepository(AbstractRepository[User]):
class Meta:
collection_name = "users"

0
app/routers/__init__.py Normal file
View File

435
app/routers/events.py Normal file
View File

@ -0,0 +1,435 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import events, users, tags
from pydantic import EmailStr
from typing import Annotated, Union
from bson import ObjectId
from datetime import datetime
router = APIRouter()
def build_location_filter(min_lat, max_lat, min_lon, max_lon):
"""Build location-based query filters."""
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None:
return [
{"latitude": {"$gte": min_lat}},
{"latitude": {"$lte": max_lat}},
{"longitude": {"$gte": min_lon}},
{"longitude": {"$lte": max_lon}},
]
return []
def build_datetime_filter(current_datetime):
"""Build filters for current datetime."""
if current_datetime:
return {
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{
"$and": [
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # Ongoing
{"end_date": None}, # No end date
]},
],
},
],
}
return None
def build_date_filter(start_date, end_date):
"""Build date range filters."""
if start_date and end_date:
return [
{"start_date": {"$gte": datetime.combine(start_date, datetime.min.time())}},
{"start_date": {"$lte": datetime.combine(end_date, datetime.max.time())}},
]
return []
def build_text_filter(item):
"""Build text-based search filters."""
if item:
return {
"$or": [
{"name": {"$regex": item, "$options": "i"}},
{"tags": {"$regex": item, "$options": "i"}},
{"organizers": {"$regex": item, "$options": "i"}},
]
}
return None
@router.get("/events", tags=["events"], response_model=list[events.EventOut])
async def read_events(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
id_event: str | None = None,
name: str | None = None,
status: int = 1,
tags: str | None = None,
organizers: str | None = None,
current_datetime: datetime | None = None,
date_event: datetime | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
):
# Validate `skip` and `limit`
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
)
skip = limit * skip
# Initialize filters
filters = []
# Add status filter
filters.append({"status": {"$eq": status}})
# Add date filters
if date_event:
start_of_day = datetime.combine(date_event, datetime.min.time())
end_of_day = datetime.combine(date_event, datetime.max.time())
filters.extend(build_date_filter(start_of_day, end_of_day))
elif start_date and end_date:
filters.extend(build_date_filter(start_date, end_date))
# Add current datetime filter
datetime_filter = build_datetime_filter(current_datetime)
if datetime_filter:
filters.append(datetime_filter)
# Add text-based filters
if name:
filters.append(build_text_filter(name))
if tags:
filters.append({"tags": {"$eq": tags}})
if organizers:
filters.append({"organizers": {"$eq": organizers}})
# Add ID filter
if id_event:
try:
event_id = ObjectId(id_event)
filters.append({"_id": {"$eq": event_id}})
except Exception:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid event ID format.")
# Combine all filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
event_repository = events.EventRepository(database=database.database)
list_events = []
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(
id=event_index.id,
tags=event_index.tags,
imgUrl=event_index.imgUrl,
name=event_index.name,
description=event_index.description,
place=event_index.place,
status=event_index.status,
start_date=event_index.start_date,
end_date=event_index.end_date,
)
list_events.append(event)
return list_events
@router.get("/events/search", tags=["events"], response_model=list[events.EventOut])
async def search_events(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
item: Union[str, None] = None,
status: int = 1,
min_lat: Union[float, None] = None,
max_lat: Union[float, None] = None,
min_lon: Union[float, None] = None,
max_lon: Union[float, None] = None,
current_datetime: Union[datetime, None] = None,
date_event: Union[datetime, None] = None,
start_date: Union[datetime, None] = None,
end_date: Union[datetime, None] = None,
tags: Union[str, None] = None,
):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
)
skip = limit * skip
# Initialize filters
filters = [{"status": {"$eq": status}}]
# Date filters
if date_event:
start_of_day = datetime.combine(date_event, datetime.min.time())
end_of_day = datetime.combine(date_event, datetime.max.time())
filters.extend(build_date_filter(start_of_day, end_of_day))
else:
filters.extend(build_date_filter(start_date, end_date))
# Add location filter
filters.extend(build_location_filter(min_lat, max_lat, min_lon, max_lon))
# Add datetime filter
datetime_filter = build_datetime_filter(current_datetime)
if datetime_filter:
filters.append(datetime_filter)
# Add text filter
text_filter = build_text_filter(item)
if text_filter:
filters.append(text_filter)
if tags is not None:
filters.append({"tags": {"$eq": tags}})
# Combine filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
event_repository = events.EventRepository(database=database.database)
list_events = []
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(
id=event_index.id,
tags=event_index.tags,
imgUrl=event_index.imgUrl,
name=event_index.name,
description=event_index.description,
place=event_index.place,
status=event_index.status,
start_date=event_index.start_date,
end_date=event_index.end_date,
)
list_events.append(event)
return list_events
@router.get("/events/me",tags=["events"])
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
event_repository = events.EventRepository(database=database.database)
listOrganizers = []
for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, tags=event_index.tags, imgUrl=event_index.imgUrl, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listOrganizers.append(event)
content = {"organizers":listOrganizers}
response = JSONResponse(content=content)
return response
@router.get("/events/count", tags=["events"])
async def read_events_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
count = database.database.get_collection("events").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/events/{item_id}", tags=["events"], response_model=events.Event)
async def read_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
return event
@router.delete("/events/me/{item_id}", tags=["events"])
async def delete_event_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.deleted_at = datetime.today()
event.status = -1
content = {"message": "event is deleted"}
else:
event.status = 0
event.deleted_at = datetime.today()
content = {"message": "event is disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/groups",tags=["events"])
async def delete_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove: bool = False, eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = event.EventRepository(database=database.database)
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted "}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/{item_id}", tags=["events"])
async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted"}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.put("/events/me/{item_id}",tags=["events"])
async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None):
event_repository = event.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.name = eventSingle.name
event.description = eventSingle.description
event.place = eventSingle.place
event.start_date = eventSingle.start_date
event.tags = eventSingle.tags
event.end_date = eventSingle.end_date
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.put("/events", tags=["events"], status_code=status.HTTP_201_CREATED)
async def update_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
tags_repository = tags.TagsRepository(database=database.database)
event = event_repository.find_one_by( {"$and": [
{"start_date": {"$eq": eventSingle.start_date}}, # Already started
{"name": {"$eq": eventSingle.name}},
]})
if event is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="name"
)
event = events.Event(name=eventSingle.name, description=eventSingle.description, place=eventSingle.place)
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.imgUrl = eventSingle.imgUrl
event.tags = eventSingle.tags
for tag_name in eventSingle.tags:
tag = tags_repository.find_one_by({"name": {'$eq': tag_name}})
if tag is None:
tag = tags.Tags(name=tag_name)
tags_repository.save(tag)
event.status = 1
event.created_at = datetime.today()
event_repository.save(event)
content = {"message": "event is created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response
@router.put("/events/{item_id}", tags=["events"], status_code=status.HTTP_200_OK)
async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None, response: Response = Response):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
tags_repository = tags.TagsRepository(database=database)
event = event_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}})
if event is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
event.name = eventSingle.name
event.place = eventSingle.place
event.description = eventSingle.description
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.tags = eventSingle.tags
for tag_name in eventSingle:
tag = tags_repository.find_one_by({"name": {'$eq': tag_name}})
if tag is None:
tag = Tags(name=tag_name)
tags_repository.save(tag)
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.patch("/events/groups",tags=["events"])
async def patch_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = events.EventRepository(database=database.database)
content = {"message": "events are enabled"}
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.patch("/events/{item_id}", tags=["events"])
async def patch_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
content = {"message": "event is enabled"}
response = JSONResponse(content=content)
return response

72
app/routers/mail.py Normal file
View File

@ -0,0 +1,72 @@
from fastapi import APIRouter, HTTPException, status, Request
from fastapi.templating import Jinja2Templates
from ..dependencies import users_token, database, mail
from ..models import users, email
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi_mail import MessageSchema, MessageType, FastMail
import random, os
router = APIRouter()
# Assurer que le chemin vers "templates" est correct
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
@router.post("/mail",tags=["mail"])
async def create_user(userSingle: users.UserCreate | None = None):
if userSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"$or": [{"username": {'$eq': userSingle.username}},{"email": {"$eq": userSingle.email}}] })
if user is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="User already exists"
)
fm = FastMail(mail.conf)
numberkey = str(random.Random())
key_hashed = users_token.get_password_hash(numberkey)
email_body = {"key":key_hashed, "username":userSingle.username}
email_schema = email.EmailSchema(email=[userSingle.email], body=email_body)
message = MessageSchema(
subject="Fastapi-Mail module",
recipients=email_schema.dict().get("email"),
template_body=email_schema.dict().get("body"),
subtype=MessageType.html,
)
await fm.send_message(message, template_name="mailer.html")
current_user = users.User(username=userSingle.username, password=users_token.get_password_hash(userSingle.password), email=userSingle.email, name=userSingle.name, firstName=userSingle.firstName)
current_user.status = 0
user_repository.save(current_user)
database.connect_redis.set(userSingle.username, key_hashed)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "email has been sent"})
@router.get("/mail", tags=["mail"])
async def confirm_user(request: Request, key: str | None = None, username: str | None = None):
if key is None or username is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Parameter key or/and username is empty"
)
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"username": {'$eq': username}})
key_hashed = database.connect_redis.get(username)
if key_hashed != key:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Key is invalid"
)
user.status = 1
user_repository.save(user)
# Rendre la page HTML avec Jinja2 et passer la variable username
return templates.TemplateResponse("confirm.html", {"request": request, "username": username})

137
app/routers/password.py Normal file
View File

@ -0,0 +1,137 @@
from fastapi import APIRouter, HTTPException, status, Request, Form
from fastapi.templating import Jinja2Templates
from ..dependencies import users_token, database, mail
from ..models import users, email
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi_mail import MessageSchema, MessageType, FastMail
import random, os, bcrypt
router = APIRouter()
# Assurer que le chemin vers "templates" est correct
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
@router.post("/password/forgot", tags=["password"])
async def forgot_password(userSingle: users.UserForgotPassword):
if not userSingle.email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is required"
)
# Recherche de l'utilisateur dans la base de données
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"email": {"$eq": userSingle.email}})
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Génération d'un token temporaire pour réinitialisation
reset_token = str(random.randint(100000, 999999))
key_hashed = users_token.get_password_hash(reset_token)
# Créer le lien de réinitialisation
reset_link = f"https://backend.valczeryba.ovh/password/reset?key={reset_token}&email={user.email}"
# Préparer les données à envoyer au template
email_body = {
"username": user.username,
"reset_link": reset_link
}
# Créer le message à envoyer
message = MessageSchema(
subject="Password Reset Request",
recipients=[user.email],
template_body=email_body,
subtype=MessageType.html,
)
# Utilisation de FastMail pour envoyer l'email
fm = FastMail(mail.conf)
await fm.send_message(message, template_name="forgot_password_email.html")
# Stockage du token temporaire dans Redis avec une expiration d'1 heure
database.connect_redis.setex(user.email, 3600, key_hashed)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "Password reset email has been sent"})
@router.get("/password/reset", tags=["password"])
async def reset_password(request: Request, key: str | None = None, email: str | None = None):
if not key or not email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Parameters 'key' and 'email' are required"
)
# Récupérer la clé hachée depuis Redis
key_hashed = database.connect_redis.get(email)
if key_hashed is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset key"
)
# Redis stocke les valeurs en `bytes`, donc il faut décoder si nécessaire
if isinstance(key_hashed, bytes):
key_hashed = key_hashed.decode()
# Vérifier que la clé en clair correspond au hash stocké
if not bcrypt.checkpw(key.encode(), key_hashed.encode()):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid reset key"
)
# Afficher la page HTML de réinitialisation du mot de passe
return templates.TemplateResponse("reset_password.html", {"request": request, "email": email, "key": key})
@router.post("/password/update", tags=["password"])
async def update_password(request: Request, email: str = Form(...), key: str = Form(...), new_password: str = Form(...)): # Vérification du token dans Redis
# Récupérer la clé hachée depuis Redis
key_hashed = database.connect_redis.get(email)
if key_hashed is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset key"
)
# Redis stocke les valeurs en `bytes`, donc il faut décoder si nécessaire
if isinstance(key_hashed, bytes):
key_hashed = key_hashed.decode()
# Vérifier que la clé en clair correspond au hash stocké
if not bcrypt.checkpw(key.encode(), key_hashed.encode()):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid reset key"
)
# Recherche de l'utilisateur dans la base de données
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"email": {"$eq": email}})
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Mise à jour du mot de passe de l'utilisateur
user.password = users_token.get_password_hash(new_password)
user_repository.save(user)
# Suppression du token temporaire dans Redis
database.connect_redis.delete(email)
# Afficher un message de succès dans une réponse HTML
return templates.TemplateResponse("password_update_success.html", {"request": request, "email": email})

119
app/routers/tags.py Normal file
View File

@ -0,0 +1,119 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import tags, users
from pydantic import EmailStr
from typing import Annotated, Union
from bson import ObjectId
from datetime import datetime
router = APIRouter()
@router.get("/tags", tags=["tags"], response_model=list[tags.TagsOut])
async def read_tags(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
id_tags: str | None = None,
name: str | None = None
):
# Validate `skip` and `limit`
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
)
limit = limit + skip
# Initialize filters
filters = []
if name:
filters.append({"name": {"$regex": name, "$options": "i"}})
# Add ID filter
if id_tags:
try:
tags_id = ObjectId(id_tags)
filters.append({"_id": {"$eq": event_id}})
except Exception:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid event ID format.")
# Combine all filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
tags_repository = tags.TagsRepository(database=database.database)
list_tags = []
for tag_index in tags_repository.find_by(object_search, limit=limit, skip=skip):
tag = tags.TagsOut(
id=tag_index.id,
name=tag_index.name
)
list_tags.append(tag)
return list_tags
@router.get("/tags/count", tags=["tags"])
async def read_tags_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
count = database.database.get_collection("tags").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/tags/{item_id}", tags=["tags"], response_model=tags.Tags)
async def read_tags_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
tags_repository = tags.TagsRepository(database=database.database)
tag = tags_repository.find_one_by_id(ObjectId(item_id))
return tag
@router.delete("/tags/groups",tags=["tags"])
async def delete_tags_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], tagsids: tags.TagsIDS | None = None):
if len(tagsids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
tag_repository = tags.TagsRepository(database=database.database)
for i in tagsids.ids:
tag = tag_repository.find_one_by_id(ObjectId(i))
tag_repository.delete_one(tag)
content = {"message": "tags removed"}
response = JSONResponse(content=content)
return response
@router.delete("/tags/{item_id}", tags=["tags"])
async def delete_tags_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
tag_repository = tags.TagsRepository(database=database.database)
tag = tag_repository.find_one_by_id(ObjectId(item_id))
event_repository.delete_one(event)
content = {"message": "tags delete"}
response = JSONResponse(content=content)
return responsed
@router.put("/tags", tags=["tags"], status_code=status.HTTP_201_CREATED)
async def update_tags(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], tagSingle: tags.TagsIn | None = None):
if tagSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
tag_repository = tags.TagsRepository(database=database.database)
tag = tag_repository.find_one_by({"name": {'$eq': tagSingle.name}})
if tag is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="name"
)
tag = tags.Tags(name=tagSingle.name)
tag.created_at = datetime.today()
tag_repository.save(event)
content = {"message": "tags created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response

48
app/routers/token.py Normal file
View File

@ -0,0 +1,48 @@
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status, APIRouter, Form
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from ..dependencies import users_token, permissions_checker
from ..models import token, users
router = APIRouter()
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@router.post("/token", tags=["token"])
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
remember_me: bool = Form(False)):
user = users_token.authenticate_user(form_data.username, form_data.password)
expires_access_token_time = ACCESS_TOKEN_EXPIRE_MINUTES
if remember_me:
expires_access_token_time=120
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=expires_access_token_time)
access_token = users_token.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
content = {"roles":user.roles,"message": "Access token generated"}
response = JSONResponse(content=content)
response.set_cookie(key="access_token", value="Bearer {0}".format(access_token), httponly=True)
return response
@router.get("/token",tags=["token"])
async def check_token(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
content = {"message": "Check token"}
response = JSONResponse(content=content)
return response
@router.delete("/token",tags=["token"])
async def check_token(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
content = {"message": "Token deleted"}
response = JSONResponse(content=content)
response.set_cookie(key="access_token", value="", httponly=True)
return response

254
app/routers/users.py Normal file
View File

@ -0,0 +1,254 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import users
from pydantic import EmailStr
from typing import Annotated
from bson import ObjectId
router = APIRouter()
@router.get("/users", tags=["users"], response_model=list[users.UserOut])
async def read_users(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_user: str | None = None, roles: str | None = None, status: int | None = None, email: EmailStr | None = None, name: str | None = None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
limit = limit + skip
listUsers = []
user_repository = users.UserRepository(database=database.database)
object_search = {}
if status is not None and roles is not None:
object_search = {"$and":[{"roles":{"$eq": roles}}, {"status":{"$eq":status}}]}
else:
if status is not None:
object_search = {"status":{"$eq": status}}
if roles is not None:
object_search = {"roles":{"$eq":roles}}
if id_user is not None:
userid = ObjectId(id_user)
object_search = {"id": {"$regex": userid}}
if status is not None and roles is not None:
object_search = {"$and":[{"id":{"$regex": userid}}, {"roles":{"$eq": roles}}, {"status":{"$eq":status}}]}
else:
if status is not None:
object_search = {"$and":[{"id":{"$regex": userid}}, {"status":{"$eq":status}}]}
if roles is not None:
object_search = {"$and":[{"id":{"$regex": userid}}, {"roles":{"$eq":roles}}]}
if email is not None:
object_search = {"email": {"$eq": email}}
if status is not None and roles is not None:
object_search = {"$and":[{"email":{"$eq": email}}, {"roles":{"$eq": roles}}, {"status":{"$eq":status}}]}
else:
if status is not None:
object_search = {"$and":[{"email":{"$eq": email}}, {"status":{"$eq":status}}]}
if roles is not None:
object_search = {"$and":[{"email":{"$eq": email}}, {"roles":{"$eq":roles}}]}
if name is not None:
object_search = {"username": {"$regex": name}}
if status is not None and roles is not None:
object_search = {"$and":[{"username":{"$regex": name}}, {"roles":{"$eq": roles}}, {"status":{"$eq":status}}]}
else:
if status is not None:
object_search = {"$and":[{"username":{"$regex": name}}, {"status":{"$eq":status}}]}
if roles is not None:
object_search = {"$and":[{"username":{"$regex": name}}, {"roles":{"$eq":roles}}]}
for user_index in user_repository.find_by(object_search, limit=limit, skip=skip):
user = users.UserOut(id=user_index.id, username=user_index.username, email=user_index.email, status=user_index.status, roles=user_index.roles, firstName=user_index.firstName, name=user_index.name)
listUsers.append(user)
return listUsers
@router.get("/users/me",tags=["users"], response_model=users.User, response_model_exclude=["id", "password", "roles", "status"])
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
return current_user
@router.get("/users/count", tags=["users"])
async def read_users_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
count = database.database.get_collection("users").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/users/{item_id}", tags=["users"], response_model=users.User)
async def read_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by_id(ObjectId(item_id))
return user
@router.delete("/users/me",tags=["users"])
async def delete_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False):
user_repository = users.UserRepository(database=database.database)
if remove is True:
current_user.deleted_at = datetime.today()
current_user.status = -1
content = {"message": "users are deleted"}
else:
current_user.status = 0
current_user.deleted_at = datetime.today()
content = {"message": "users are disabled"}
user_repository.save(current_user)
response = JSONResponse(content=content)
return response
@router.delete("/users/groups",tags=["users"])
async def delete_users_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove: bool = False, userids: users.UserIDS | None = None):
if len(userids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="userids should be greater than 0"
)
user_repository = users.UserRepository(database=database.database)
for i in userids.ids:
user = user_repository.find_one_by_id(ObjectId(i))
if remove is True:
user.status = -1
user.deleted_at = datetime.today()
content = {"message": "users are deleted "}
else:
user.status = 0
user.disabled_at = datetime.today()
content = {"message": "users are disabled"}
user_repository.save(user)
response = JSONResponse(content=content)
return response
@router.delete("/users/{item_id}", tags=["users"])
async def delete_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
user.status = -1
user.deleted_at = datetime.today()
content = {"message": "users are deleted"}
else:
user.status = 0
user.disabled_at = datetime.today()
content = {"message": "users are disabled"}
user_repository.save(user)
response = JSONResponse(content=content)
return response
@router.put("/users/me",tags=["users"])
async def update_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], userSingle: users.UserIn | None = None):
user_repository = users.UserRepository(database=database.database)
current_user.username = userSingle.username
if len(userSingle.password) > 0:
current_user.password = user_token.get_password_hash(userSingle.password)
current_user.email = userSingle.email
current_user.name = userSingle.name
current_user.firstName = userSingle.firstName
current_user.birth = userSingle.birth
user_repository.save(current_user)
content = {"message": "user is updated"}
response = JSONResponse(content=content)
return current_user
@router.put("/users", tags=["users"], status_code=status.HTTP_201_CREATED)
async def update_users(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], userSingle: users.UserIn | None = None):
if userSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"$or":[{"username": {'$eq': userSingle.username}}, {"email": {"$eq": userSingle.email}}]})
if user is not None:
if user.username == userSingle.username:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="username"
)
if user.email == userSingle.email:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="email"
)
user = users.User(username=userSingle.username, password=users_token.get_password_hash(userSingle.password), email=userSingle.email)
user.roles = userSingle.roles
user.firstName = userSingle.firstName
user.name = userSingle.name
user.birth = userSingle.birth
user.created_at = datetime.today()
user_repository.save(user)
content = {"message": "user is created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response
@router.put("/users/{item_id}", tags=["users"], status_code=status.HTTP_200_OK)
async def update_users_id(item_id: str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], userSingle: users.UserIn | None = None, response: Response = Response):
if userSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}})
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user.username = userSingle.username
if len(userSingle.password) > 0:
user.password = users_token.get_password_hash(userSingle.password)
user.roles = userSingle.roles
user.email = userSingle.email
user.firstName = userSingle.firstName
user.name = userSingle.name
user.birth = userSingle.birth
user.updated_at = datetime.today()
user_repository.save(user)
content = {"message": "user is updated"}
response = JSONResponse(content=content)
return response
@router.patch("/users/groups",tags=["users"])
async def patch_users_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], userids: users.UserIDS | None = None):
if len(userids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="userids should be greater than 0"
)
user_repository = users.UserRepository(database=database.database)
content = {"message": "users are enabled"}
for i in userids.ids:
user = user_repository.find_one_by_id(ObjectId(i))
user.status = 1
user.disabled_at = None
user.deleted_at = None
user_repository.save(user)
response = JSONResponse(content=content)
return response
@router.patch("/users/{item_id}", tags=["users"], response_model=users.User)
async def patch_users_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by_id(ObjectId(item_id))
user.status = 1
user.disabled_at = None
user.deleted_at = None
user_repository.save(user)
content = {"message": "user is enabled"}
response = JSONResponse(content=content)
return response

View File

@ -0,0 +1,58 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Votre compte est activé</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}
.container {
max-width: 600px;
margin: 20px auto;
background: #ffffff;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
text-align: center;
}
h2 {
color: #333;
}
p {
color: #666;
font-size: 16px;
}
.button {
display: inline-block;
background: #007BFF;
color: white;
padding: 12px 20px;
text-decoration: none;
border-radius: 5px;
font-size: 16px;
font-weight: bold;
margin-top: 20px;
}
.button:hover {
background: #0056b3;
}
.footer {
margin-top: 20px;
font-size: 12px;
color: #888;
}
</style>
</head>
<body>
<div class="container">
<h2>Félicitations, {{ username }} ! 🎉</h2>
<p>Votre compte a été activé avec succès.</p>
<p>Vous pouvez maintenant vous connecter et profiter pleinement de nos services.</p>
<p class="footer">Si vous avez des questions, n'hésitez pas à nous contacter.</p>
</div>
</body>
</html>

View File

@ -0,0 +1,69 @@
<!-- forgot_password_email.html -->
<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Réinitialisation de votre mot de passe</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f9;
color: #333;
margin: 0;
padding: 0;
}
.container {
width: 100%;
max-width: 600px;
margin: 50px auto;
padding: 20px;
background-color: white;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
border-radius: 8px;
}
h1 {
color: #4CAF50;
text-align: center;
}
p {
font-size: 16px;
line-height: 1.5;
text-align: center;
}
.cta-button {
display: inline-block;
background-color: #4CAF50;
color: white;
padding: 10px 20px;
text-decoration: none;
border-radius: 4px;
font-weight: bold;
text-align: center;
}
.cta-button:hover {
background-color: #45a049;
}
.footer {
margin-top: 20px;
font-size: 14px;
color: #888;
text-align: center;
}
</style>
</head>
<body>
<div class="container">
<h1>Demande de réinitialisation du mot de passe</h1>
<p>Bonjour {{ username }},</p>
<p>Nous avons reçu une demande pour réinitialiser votre mot de passe sur notre site. Si vous n'êtes pas à l'origine de cette demande, vous pouvez ignorer cet email.</p>
<p>Pour réinitialiser votre mot de passe, cliquez sur le lien ci-dessous :</p>
<p><a href="{{ reset_link }}" class="cta-button">Réinitialiser mon mot de passe</a></p>
<p>Le lien est valable pendant une heure. Si vous ne pouvez pas cliquer dessus, copiez et collez-le dans votre navigateur.</p>
</div>
<div class="footer">
<p>&copy; Covas - Tous droits réservés.</p>
</div>
</body>
</html>

59
app/templates/mailer.html Normal file
View File

@ -0,0 +1,59 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Confirmation de votre compte</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}
.container {
max-width: 600px;
margin: 20px auto;
background: #ffffff;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
text-align: center;
}
h2 {
color: #333;
}
p {
color: #666;
font-size: 16px;
}
.button {
display: inline-block;
background: #28a745;
color: white;
padding: 12px 20px;
text-decoration: none;
border-radius: 5px;
font-size: 16px;
font-weight: bold;
margin-top: 20px;
}
.button:hover {
background: #218838;
}
.footer {
margin-top: 20px;
font-size: 12px;
color: #888;
}
</style>
</head>
<body>
<div class="container">
<h2>Bienvenue, {{ username }} ! 🎉</h2>
<p>Merci de vous être inscrit sur notre plateforme.</p>
<p>Pour finaliser votre inscription, veuillez confirmer votre compte en cliquant sur le bouton ci-dessous :</p>
<a href="https://backend.valczeryba.ovh/mail?key={{ key }}&username={{ username }}" class="button">Confirmer mon compte</a>
<p class="footer">Si vous n'êtes pas à l'origine de cette inscription, ignorez simplement cet email.</p>
</div>
</body>
</html>

View File

@ -0,0 +1,65 @@
<!-- password_update_success.html -->
<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Mot de Passe Mis à Jour</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f9;
color: #333;
margin: 0;
padding: 0;
}
.container {
width: 100%;
max-width: 600px;
margin: 50px auto;
padding: 20px;
background-color: white;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
border-radius: 8px;
}
h1 {
color: #4CAF50;
text-align: center;
}
p {
font-size: 16px;
line-height: 1.5;
text-align: center;
}
.cta-button {
display: inline-block;
background-color: #4CAF50;
color: white;
padding: 10px 20px;
text-decoration: none;
border-radius: 4px;
font-weight: bold;
text-align: center;
}
.cta-button:hover {
background-color: #45a049;
}
.footer {
margin-top: 20px;
font-size: 14px;
color: #888;
text-align: center;
}
</style>
</head>
<body>
<div class="container">
<h1>Votre mot de passe a été mis à jour avec succès</h1>
<p>Votre mot de passe a été réinitialisé avec succès. Vous pouvez maintenant utiliser votre nouveau mot de passe pour vous connecter.</p>
</div>
<div class="footer">
<p>&copy; {{ current_year }} Covas - Tous droits réservés.</p>
</div>
</body>
</html>

View File

@ -0,0 +1,87 @@
<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Réinitialisation du mot de passe</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
background-color: #f4f7fc;
color: #333;
}
.container {
width: 100%;
max-width: 600px;
margin: 50px auto;
padding: 20px;
background-color: #ffffff;
border-radius: 8px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
}
h2 {
text-align: center;
color: #2a9d8f;
}
.form-group {
margin-bottom: 20px;
}
label {
font-weight: bold;
display: block;
margin-bottom: 5px;
}
input[type="password"], input[type="email"] {
width: 100%;
padding: 10px;
margin: 5px 0 10px 0;
border: 1px solid #ddd;
border-radius: 4px;
}
input[type="submit"] {
background-color: #2a9d8f;
color: white;
border: none;
padding: 10px 20px;
font-size: 16px;
cursor: pointer;
border-radius: 4px;
width: 100%;
}
input[type="submit"]:hover {
background-color: #1e7c68;
}
.message {
text-align: center;
margin-top: 20px;
}
.message a {
color: #2a9d8f;
text-decoration: none;
}
</style>
</head>
<body>
<div class="container">
<h2>Réinitialisation du mot de passe</h2>
<form method="post" action="/password/update">
<input type="hidden" name="email" value="{{ email }}">
<input type="hidden" name="key" value="{{ key }}">
<div class="form-group">
<label for="new_password">Nouveau mot de passe :</label>
<input type="password" id="new_password" name="new_password" required>
</div>
<input type="submit" value="Mettre à jour le mot de passe">
</form>
<div class="message">
<p>Vous avez des questions ? <a href="mailto:support@votresite.com">Contactez-nous</a></p>
</div>
</div>
</body>
</html>

View File

@ -1,39 +0,0 @@
version: "3.3"
services:
redis:
image: redis/redis-stack-server:latest
backend:
build:
context: ../backend-collector
dockerfile: Dockerfile
depends_on:
- redis
env_file:
- .env-backend
ports:
- "8083:8000"
frontend:
depends_on:
- backend
image: nginx:1.23.0
ports:
- "8084:80"
volumes:
- "../backoffice/web:/usr/share/nginx/html:ro"
client:
depends_on:
- backend
image: nginx:1.23.0
ports:
- "8085:80"
volumes:
- "../client:/usr/share/nginx/html:ro"
volumes:
app-db-data:

8
requierements.txt Normal file
View File

@ -0,0 +1,8 @@
fastapi
uvicorn[standard]
pydantic-mongo
python-jose[cryptography]
passlib[bcrypt]
python-multipart
fastapi-mail
redis