from fastapi import APIRouter, HTTPException, status, Request, Form from fastapi.templating import Jinja2Templates from ..dependencies import users_token, database, mail from ..models import users, email from fastapi.responses import JSONResponse, HTMLResponse from fastapi_mail import MessageSchema, MessageType, FastMail import random, os, bcrypt router = APIRouter() # Assurer que le chemin vers "templates" est correct BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) @router.post("/password/forgot", tags=["password"]) async def forgot_password(userSingle: users.UserForgotPassword): if not userSingle.email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email is required" ) # Recherche de l'utilisateur dans la base de données user_repository = users.UserRepository(database=database.database) user = user_repository.find_one_by({"email": {"$eq": userSingle.email}}) if user is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Génération d'un token temporaire pour réinitialisation reset_token = str(random.randint(100000, 999999)) key_hashed = users_token.get_password_hash(reset_token) # Créer le lien de réinitialisation reset_link = f"https://backend.valczeryba.ovh/password/reset?key={reset_token}&email={user.email}" # Préparer les données à envoyer au template email_body = { "username": user.username, "reset_link": reset_link } # Créer le message à envoyer message = MessageSchema( subject="Password Reset Request", recipients=[user.email], template_body=email_body, subtype=MessageType.html, ) # Utilisation de FastMail pour envoyer l'email fm = FastMail(mail.conf) await fm.send_message(message, template_name="forgot_password_email.html") # Stockage du token temporaire dans Redis avec une expiration d'1 heure database.connect_redis.setex(user.email, 3600, key_hashed) return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "Password reset email has been sent"}) @router.get("/password/reset", tags=["password"]) async def reset_password(request: Request, key: str | None = None, email: str | None = None): if not key or not email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Parameters 'key' and 'email' are required" ) # Récupérer la clé hachée depuis Redis key_hashed = database.connect_redis.get(email) if key_hashed is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset key" ) # Redis stocke les valeurs en `bytes`, donc il faut décoder si nécessaire if isinstance(key_hashed, bytes): key_hashed = key_hashed.decode() # Vérifier que la clé en clair correspond au hash stocké if not bcrypt.checkpw(key.encode(), key_hashed.encode()): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid reset key" ) # Afficher la page HTML de réinitialisation du mot de passe return templates.TemplateResponse("reset_password.html", {"request": request, "email": email, "key": key}) @router.post("/password/update", tags=["password"]) async def update_password(request: Request, email: str = Form(...), key: str = Form(...), new_password: str = Form(...)): # Vérification du token dans Redis # Récupérer la clé hachée depuis Redis key_hashed = database.connect_redis.get(email) if key_hashed is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset key" ) # Redis stocke les valeurs en `bytes`, donc il faut décoder si nécessaire if isinstance(key_hashed, bytes): key_hashed = key_hashed.decode() # Vérifier que la clé en clair correspond au hash stocké if not bcrypt.checkpw(key.encode(), key_hashed.encode()): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid reset key" ) # Recherche de l'utilisateur dans la base de données user_repository = users.UserRepository(database=database.database) user = user_repository.find_one_by({"email": {"$eq": email}}) if user is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Mise à jour du mot de passe de l'utilisateur user.password = users_token.get_password_hash(new_password) user_repository.save(user) # Suppression du token temporaire dans Redis database.connect_redis.delete(email) # Afficher un message de succès dans une réponse HTML return templates.TemplateResponse("password_update_success.html", {"request": request, "email": email})