From 928fe6578c78a1c1bad0cd5956944cc857f157fa Mon Sep 17 00:00:00 2001 From: Valentin CZERYBA Date: Fri, 29 Mar 2024 23:19:45 +0100 Subject: [PATCH] add models and routes for ip address --- app/models/ipaddress.py | 9 +- app/routers/ipaddress.py | 234 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 app/routers/ipaddress.py diff --git a/app/models/ipaddress.py b/app/models/ipaddress.py index cc8e1a9..e418c8e 100644 --- a/app/models/ipaddress.py +++ b/app/models/ipaddress.py @@ -8,19 +8,24 @@ class IpAddress(BaseModel): username: str ip: IPvAnyAddress socialnetwork: List[str] | None = None + status: int = 0 + created_at: datetime = datetime.today() + updated_at: datetime | None = None + deleted_at: datetime | None = None + disabled_at: datetime | None = None class IpAddressOut(BaseModel): id: ObjectIdField = None username: str ip: IPvAnyAddress socialnetwork: List[str] | None = None - + status: int = 0 class IpAddressIn(BaseModel): username: str ip: IPvAnyAddress socialnetwork: List[str] | None = None - + status: int = 0 class IpAddressRepository(AbstractRepository[IpAdress]): diff --git a/app/routers/ipaddress.py b/app/routers/ipaddress.py new file mode 100644 index 0000000..74a9969 --- /dev/null +++ b/app/routers/ipaddress.py @@ -0,0 +1,234 @@ +from fastapi import APIRouter, Depends, HTTPException, status, Response +from fastapi.responses import JSONResponse +from datetime import datetime +from ..dependencies import users_token, permissions_checker, database +from ..models import events, users +from pydantic import EmailStr +from typing import Annotated +from bson import ObjectId +router = APIRouter() + + + +@router.get("/ipaddress", tags=["ipaddress"], response_model=list[ipaddress.IpAddressOut]) +async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int | None = None): + if limit < 1 or skip < 0 or limit < skip: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip" + ) + limit = limit + skip + listEvents = [] + event_repository = events.EventRepository(database=database.database) + object_search = {} + if status is not None: + object_search = {"status":{"$eq": status}} + if id_event is not None: + eventid = ObjectId(id_event) + object_search = {"id": {"$regex": userid}} + if status is not None: + object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]} + if name is not None: + object_search = {"name": {"$regex": name}} + if status is not None: + object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]} + + + for event_index in event_repository.find_by(object_search, limit=limit, skip=skip): + event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date) + listEvents.append(event) + return listEvents + + +@router.get("/events/me",tags=["events"]) +async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]): + event_repository = events.EventRepository(database=database.database) + listOrganizers = [] + + for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip): + event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date) + listOrganizers.append(event) + + content = {"organizers":listOrganizers} + response = JSONResponse(content=content) + return response + +@router.get("/events/count", tags=["events"]) +async def read_events_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]): + count = database.database.get_collection("events").estimated_document_count() + content = {"count":count} + response = JSONResponse(content=content) + return response + + +@router.get("/events/{item_id}", tags=["events"], response_model=events.Event) +async def read_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]): + event_repository = events.EventRepository(database=database.database) + event = event_repository.find_one_by_id(ObjectId(item_id)) + return event + + +@router.delete("/events/me/{item_id}", tags=["events"]) +async def delete_event_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False): + event_repository = events.EventRepository(database=database.database) + event = event_repository.find_one_by_id(ObjectId(item_id)) + if remove is True: + event.deleted_at = datetime.today() + event.status = -1 + content = {"message": "event is deleted"} + else: + event.status = 0 + event.deleted_at = datetime.today() + content = {"message": "event is disabled"} + event_repository.save(event) + response = JSONResponse(content=content) + return response + +@router.delete("/events/groups",tags=["events"]) +async def delete_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove: bool = False, eventids: events.EventIDS | None = None): + if len(eventids.ids) == 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="eventids should be greater than 0" + ) + + event_repository = event.EventRepository(database=database.database) + for i in eventids.ids: + event = event_repository.find_one_by_id(ObjectId(i)) + if remove is True: + event.status = -1 + event.deleted_at = datetime.today() + content = {"message": "events are deleted "} + else: + event.status = 0 + event.disabled_at = datetime.today() + content = {"message": "events are disabled"} + event_repository.save(event) + + response = JSONResponse(content=content) + return response + +@router.delete("/events/{item_id}", tags=["events"]) +async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False): + event_repository = events.EventRepository(database=database.database) + event = event_repository.find_one_by_id(ObjectId(item_id)) + if remove is True: + event.status = -1 + event.deleted_at = datetime.today() + content = {"message": "events are deleted"} + else: + event.status = 0 + event.disabled_at = datetime.today() + content = {"message": "events are disabled"} + event_repository.save(event) + response = JSONResponse(content=content) + return response + +@router.put("/events/me/{item_id}",tags=["events"]) +async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None): + event_repository = event.EventRepository(database=database.database) + event = event_repository.find_one_by_id(ObjectId(item_id)) + event.name = EventSingle.name + event.place = EventSingle.place + event.start_date = eventSingle.start_date + event.end_date = eventSingle.end_date + event.latitude = eventSingle.latitude + event.longitude = eventSingle.longitude + event.updated_at = datetime.today() + event_repository.save(event) + content = {"message": "event is updated"} + response = JSONResponse(content=content) + return response + +@router.put("/events", tags=["events"], status_code=status.HTTP_201_CREATED) +async def update_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None): + if eventSingle is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Body request is empty" + ) + event_repository = events.EventRepository(database=database.database) + + event = event_repository.find_one_by({"name": {'$eq': eventSingle.name}}) + if event is not None: + raise HTTPException( + status_code=status.HTTP_204_NO_CONTENT, + detail="name" + ) + + + event = events.Event(name=eventSingle.name, place=eventSingle.place) + event.start_date = eventSingle.start_date + event.end_date = eventSingle.end_date + event.organizers = eventSingle.organizers + event.latitude = eventSingle.latitude + event.longitude = eventSingle.longitude + event.created_at = datetime.today() + event_repository.save(event) + content = {"message": "event is created"} + response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED) + return response + + +@router.put("/events/{item_id}", tags=["events"], status_code=status.HTTP_200_OK) +async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None, response: Response = Response): + if eventSingle is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Body request is empty" + ) + event_repository = events.EventRepository(database=database.database) + + event = event_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}}) + if event is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Event not found" + ) + + event.name = eventSingle.name + event.place = eventSingle.place + event.start_date = eventSingle.start_date + event.end_date = eventSingle.end_date + event.organizers = eventSingle.organizers + event.latitude = eventSingle.latitude + event.longitude = eventSingle.longitude + event.updated_at = datetime.today() + event_repository.save(event) + content = {"message": "event is updated"} + response = JSONResponse(content=content) + return response + + +@router.patch("/events/groups",tags=["events"]) +async def patch_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventids: events.EventIDS | None = None): + if len(eventids.ids) == 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="eventids should be greater than 0" + ) + + event_repository = events.EventRepository(database=database.database) + content = {"message": "events are enabled"} + for i in eventids.ids: + event = event_repository.find_one_by_id(ObjectId(i)) + event.status = 1 + event.disabled_at = None + event.deleted_at = None + event_repository.save(event) + + + response = JSONResponse(content=content) + return response + +@router.patch("/events/{item_id}", tags=["events"]) +async def patch_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]): + event_repository = events.EventRepository(database=database.database) + event = event_repository.find_one_by_id(ObjectId(item_id)) + event.status = 1 + event.disabled_at = None + event.deleted_at = None + event_repository.save(event) + content = {"message": "event is enabled"} + response = JSONResponse(content=content) + return response \ No newline at end of file