2025-02-12 22:18:57 +01:00

100 lines
3.9 KiB
Python

from fastapi import APIRouter, HTTPException, status, Request, Form
from fastapi.templating import Jinja2Templates
from ..dependencies import users_token, database, mail
from ..models import users, email
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi_mail import MessageSchema, MessageType, FastMail
import random, os
router = APIRouter()
# Assurer que le chemin vers "templates" est correct
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
@router.post("/password/forgot", tags=["password"])
async def forgot_password(userSingle: users.UserForgotPassword):
if not userSingle.email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is required"
)
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"email": {"$eq": userSingle.email}})
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Génération d'un token temporaire
reset_token = str(random.randint(100000, 999999))
key_hashed = users_token.get_password_hash(reset_token)
email_body = {"key": reset_token, "username": user.username}
email_schema = email.EmailSchema(email=[user.email], body=email_body)
message = MessageSchema(
subject="Password Reset Request",
recipients=email_schema.dict().get("email"),
template_body=email_schema.dict().get("body"),
subtype=MessageType.html,
)
fm = FastMail(mail.conf)
await fm.send_message(message, template_name="reset_password.html")
# Stockage du token temporaire dans Redis avec une expiration
database.connect_redis.setex(user.email, 3600, key_hashed) # Expire dans 1 heure
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "Password reset email has been sent"})
@router.get("/password/reset", tags=["password"])
async def reset_password(request: Request, key: str | None = None, email: str | None = None):
if key is None or email is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Parameter key or/and email is empty"
)
key_hashed = database.connect_redis.get(email)
if key_hashed is None or key_hashed.decode() != key:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Key is invalid or expired"
)
return templates.TemplateResponse("reset_password.html", {"request": request, "email": email, "key": key})
@router.post("/password/update", tags=["password"])
async def update_password(email: str = Form(...), key: str = Form(...), new_password: str = Form(...), request: Request):
# Vérification du token dans Redis
key_hashed = database.connect_redis.get(email)
if key_hashed is None or key_hashed.decode() != key:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Key is invalid or expired"
)
# Recherche de l'utilisateur dans la base de données
user_repository = users.UserRepository(database=database.database)
user = user_repository.find_one_by({"email": {"$eq": email}})
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Mise à jour du mot de passe de l'utilisateur
user.password = users_token.get_password_hash(new_password)
user_repository.save(user)
# Suppression du token temporaire dans Redis
database.connect_redis.delete(email)
# Renvoyer une réponse HTML après la mise à jour réussie
return templates.TemplateResponse("password_update_success.html", {"request": request, "email": email})