Compare commits

..

2 Commits

Author SHA1 Message Date
928fe6578c add models and routes for ip address 2024-03-29 23:19:45 +01:00
7fd9c6ff1a add ip address models 2024-03-29 23:03:23 +01:00
5 changed files with 282 additions and 284 deletions

View File

@@ -10,10 +10,7 @@ app = FastAPI()
origins = [ origins = [
"http://localhost:8084", "http://localhost:8084",
"https://backend.valczeryba.ovh", "https://backend.valczeryba.ovh"
"https://facebook.com",
"https://fetlife.com",
"https://backoffice.valczeryba.ovh"
] ]
app.add_middleware( app.add_middleware(

View File

@@ -6,13 +6,10 @@ class Event(BaseModel):
id: ObjectIdField = None id: ObjectIdField = None
name: str name: str
place: str place: str
description: str
imgUrl: str | None = None
status: int = 0 status: int = 0
latitude: float = 0.0 latitude: float = 0.0
longitude: float = 0.0 longitude: float = 0.0
organizers: list[str] = [] organizers: list[str] = []
tags: list[str] = []
start_date: datetime | None = None start_date: datetime | None = None
end_date: datetime | None = None end_date: datetime | None = None
created_at: datetime = datetime.today() created_at: datetime = datetime.today()
@@ -24,21 +21,15 @@ class EventOut(BaseModel):
id: ObjectIdField = None id: ObjectIdField = None
name: str name: str
place: str place: str
description: str
imgUrl: str | None = None
status: int = 0 status: int = 0
start_date: datetime | None = None start_date: datetime | None = None
end_date: datetime | None = None end_date: datetime | None = None
tags: list[str] = []
class EventIn(BaseModel): class EventIn(BaseModel):
name: str name: str
place: str place: str
description: str
imgUrl: str | None = None
status: int = 0 status: int = 0
organizers: list[str] = [] organizers: list[str] = []
tags: list[str] = []
start_date: datetime | None = None start_date: datetime | None = None
end_date: datetime | None = None end_date: datetime | None = None
latitude: float = 0.0 latitude: float = 0.0

33
app/models/ipaddress.py Normal file
View File

@@ -0,0 +1,33 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
from pydantic.networks import IPvAnyAddress
class IpAddress(BaseModel):
id: ObjectIdField = None
username: str
ip: IPvAnyAddress
socialnetwork: List[str] | None = None
status: int = 0
created_at: datetime = datetime.today()
updated_at: datetime | None = None
deleted_at: datetime | None = None
disabled_at: datetime | None = None
class IpAddressOut(BaseModel):
id: ObjectIdField = None
username: str
ip: IPvAnyAddress
socialnetwork: List[str] | None = None
status: int = 0
class IpAddressIn(BaseModel):
username: str
ip: IPvAnyAddress
socialnetwork: List[str] | None = None
status: int = 0
class IpAddressRepository(AbstractRepository[IpAdress]):
class Meta:
collection_name = "ipaddress"

View File

@@ -6,13 +6,12 @@ from ..models import events, users
from pydantic import EmailStr from pydantic import EmailStr
from typing import Annotated from typing import Annotated
from bson import ObjectId from bson import ObjectId
from datetime import datetime
router = APIRouter() router = APIRouter()
@router.get("/events", tags=["events"], response_model=list[events.EventOut]) @router.get("/events", tags=["events"], response_model=list[events.EventOut])
async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int = 1, tags: str | None = None, organizers: str | None = None, current_datetime: datetime | None = None): async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int | None = None):
if limit < 1 or skip < 0 or limit < skip: if limit < 1 or skip < 0 or limit < skip:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
@@ -22,268 +21,21 @@ async def read_events(authorize: Annotated[bool, Depends(permissions_checker.Per
listEvents = [] listEvents = []
event_repository = events.EventRepository(database=database.database) event_repository = events.EventRepository(database=database.database)
object_search = {} object_search = {}
if status is not 1: if status is not None:
object_search = {"status":{"$eq": status}} object_search = {"status":{"$eq": status}}
if current_datetime is not None:
object_search = { "$and": [ {"status":{"$eq": status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
] }
if tags is not None:
object_search = {"$and":[{"tags":{"$eq": tags}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"tags":{"$eq": tags}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if organizers is not None:
object_search = {"$and":[{"organizers":{"$eq": organizers}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"organizers":{"$eq": organizers}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
if id_event is not None: if id_event is not None:
eventid = ObjectId(id_event) eventid = ObjectId(id_event)
object_search = {"id": {"$regex": userid}}
if status is not None:
object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]} object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"id":{"$regex": eventid}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
if name is not None: if name is not None:
object_search = {"name": {"$regex": name}}
if status is not None:
object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]} object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"name":{"$regex": name}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip): for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, tags=event_index.tags, imgUrl=event_index.imgUrl, name=event_index.name, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date) event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listEvents.append(event)
return listEvents
@router.get("/events/search", tags=["events"], response_model=list[events.EventOut])
async def search_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, item: str | None = None, status: int = 1, min_lat: float | None = None, max_lat: float | None = None, min_lon: float | None = None, max_lon: float | None = None, current_datetime: datetime | None = None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
limit = limit + skip
listEvents = []
event_repository = events.EventRepository(database=database.database)
object_search = {}
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None:
object_search = {
"$and": [
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
}
]
}
if current_datetime is not None:
object_search = {
"$and": [
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if item is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}}
]
}
if current_datetime is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
}
]
}
if current_datetime is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, tags=event_index.tags, imgUrl=event_index.imgUrl, name=event_index.name, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listEvents.append(event) listEvents.append(event)
return listEvents return listEvents
@@ -294,7 +46,7 @@ async def read_users_me(current_user: Annotated[users.User, Depends(users_token.
listOrganizers = [] listOrganizers = []
for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip): for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, tags=event_index.tags, imgUrl=event_index.imgUrl, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date) event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listOrganizers.append(event) listOrganizers.append(event)
content = {"organizers":listOrganizers} content = {"organizers":listOrganizers}
@@ -376,16 +128,13 @@ async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(per
async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None): async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None):
event_repository = event.EventRepository(database=database.database) event_repository = event.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id)) event = event_repository.find_one_by_id(ObjectId(item_id))
event.name = eventSingle.name event.name = EventSingle.name
event.description = eventSingle.description event.place = EventSingle.place
event.place = eventSingle.place
event.start_date = eventSingle.start_date event.start_date = eventSingle.start_date
event.tags = eventSingle.tags
event.end_date = eventSingle.end_date event.end_date = eventSingle.end_date
event.latitude = eventSingle.latitude event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude event.longitude = eventSingle.longitude
event.updated_at = datetime.today() event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event) event_repository.save(event)
content = {"message": "event is updated"} content = {"message": "event is updated"}
response = JSONResponse(content=content) response = JSONResponse(content=content)
@@ -408,15 +157,12 @@ async def update_events(authorize: Annotated[bool, Depends(permissions_checker.P
) )
event = events.Event(name=eventSingle.name, description=eventSingle.description, place=eventSingle.place) event = events.Event(name=eventSingle.name, place=eventSingle.place)
event.start_date = eventSingle.start_date event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude event.longitude = eventSingle.longitude
event.imgUrl = eventSingle.imgUrl
event.tags = eventSingle.tags
event.status = 1
event.created_at = datetime.today() event.created_at = datetime.today()
event_repository.save(event) event_repository.save(event)
content = {"message": "event is created"} content = {"message": "event is created"}
@@ -442,15 +188,12 @@ async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(perm
event.name = eventSingle.name event.name = eventSingle.name
event.place = eventSingle.place event.place = eventSingle.place
event.description = eventSingle.description
event.start_date = eventSingle.start_date event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers event.organizers = eventSingle.organizers
event.tags = eventSingle.tags
event.latitude = eventSingle.latitude event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude event.longitude = eventSingle.longitude
event.updated_at = datetime.today() event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event) event_repository.save(event)
content = {"message": "event is updated"} content = {"message": "event is updated"}
response = JSONResponse(content=content) response = JSONResponse(content=content)

234
app/routers/ipaddress.py Normal file
View File

@@ -0,0 +1,234 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import events, users
from pydantic import EmailStr
from typing import Annotated
from bson import ObjectId
router = APIRouter()
@router.get("/ipaddress", tags=["ipaddress"], response_model=list[ipaddress.IpAddressOut])
async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int | None = None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
limit = limit + skip
listEvents = []
event_repository = events.EventRepository(database=database.database)
object_search = {}
if status is not None:
object_search = {"status":{"$eq": status}}
if id_event is not None:
eventid = ObjectId(id_event)
object_search = {"id": {"$regex": userid}}
if status is not None:
object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]}
if name is not None:
object_search = {"name": {"$regex": name}}
if status is not None:
object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]}
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listEvents.append(event)
return listEvents
@router.get("/events/me",tags=["events"])
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
event_repository = events.EventRepository(database=database.database)
listOrganizers = []
for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listOrganizers.append(event)
content = {"organizers":listOrganizers}
response = JSONResponse(content=content)
return response
@router.get("/events/count", tags=["events"])
async def read_events_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
count = database.database.get_collection("events").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/events/{item_id}", tags=["events"], response_model=events.Event)
async def read_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
return event
@router.delete("/events/me/{item_id}", tags=["events"])
async def delete_event_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.deleted_at = datetime.today()
event.status = -1
content = {"message": "event is deleted"}
else:
event.status = 0
event.deleted_at = datetime.today()
content = {"message": "event is disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/groups",tags=["events"])
async def delete_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove: bool = False, eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = event.EventRepository(database=database.database)
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted "}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/{item_id}", tags=["events"])
async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted"}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.put("/events/me/{item_id}",tags=["events"])
async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None):
event_repository = event.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.name = EventSingle.name
event.place = EventSingle.place
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.put("/events", tags=["events"], status_code=status.HTTP_201_CREATED)
async def update_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by({"name": {'$eq': eventSingle.name}})
if event is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="name"
)
event = events.Event(name=eventSingle.name, place=eventSingle.place)
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.created_at = datetime.today()
event_repository.save(event)
content = {"message": "event is created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response
@router.put("/events/{item_id}", tags=["events"], status_code=status.HTTP_200_OK)
async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None, response: Response = Response):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}})
if event is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
event.name = eventSingle.name
event.place = eventSingle.place
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.patch("/events/groups",tags=["events"])
async def patch_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = events.EventRepository(database=database.database)
content = {"message": "events are enabled"}
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.patch("/events/{item_id}", tags=["events"])
async def patch_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
content = {"message": "event is enabled"}
response = JSONResponse(content=content)
return response