Compare commits

...

15 Commits

Author SHA1 Message Date
ca983f1199 add routers tags 2024-12-21 21:38:14 +01:00
1afcd626ea add models tags 2024-12-21 20:59:43 +01:00
0f8a5b32b0 Merge pull request 'add fix tags' (#35) from hotfix/fix-tags into master
Reviewed-on: #35
2024-12-21 20:49:14 +01:00
c2c7b034b6 add fix tags 2024-12-21 20:47:33 +01:00
f880b1e6a4 Merge pull request 'fix list' (#34) from feature/tags-search into master
Reviewed-on: #34
2024-12-18 23:53:28 +01:00
32f571103d fix list 2024-12-18 23:52:42 +01:00
94d6db0866 Merge pull request 'fix list' (#33) from feature/tags-search into master
Reviewed-on: #33
2024-12-18 23:48:18 +01:00
be17ec9202 fix list 2024-12-18 23:47:40 +01:00
9cf1c64ef9 Merge pull request 'feature/tags-search' (#32) from feature/tags-search into master
Reviewed-on: #32
2024-12-18 22:11:35 +01:00
1d7ec61982 add in 2024-12-16 23:53:46 +01:00
3189b08b09 refactor code 2024-12-16 23:51:26 +01:00
81be2a08d1 Merge pull request 'add options i' (#31) from feature/options-case into master
Reviewed-on: #31
2024-11-28 23:19:15 +01:00
7f34e835e3 add options i 2024-11-28 23:17:19 +01:00
721d7a3a5d Merge pull request 'fix end_date' (#30) from feature/searchByDate into master
Reviewed-on: #30
2024-11-23 21:06:36 +01:00
f08bdd5f0a fix end_date 2024-11-23 21:03:44 +01:00
3 changed files with 443 additions and 383 deletions

24
app/models/tags.py Normal file
View File

@@ -0,0 +1,24 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
class Tags(BaseModel):
id: ObjectIdField = None
name: str
created_at: datetime = datetime.today()
class TagsOut(BaseModel):
id: ObjectIdField = None
name: str
class TagsIn(BaseModel):
name: str
class EventIDS(BaseModel):
ids: list[str]
class TagsRepository(AbstractRepository[Tags]):
class Meta:
collection_name = "tags"

View File

@@ -4,413 +4,219 @@ from datetime import datetime
from ..dependencies import users_token, permissions_checker, database from ..dependencies import users_token, permissions_checker, database
from ..models import events, users from ..models import events, users
from pydantic import EmailStr from pydantic import EmailStr
from typing import Annotated from typing import Annotated, Union
from bson import ObjectId from bson import ObjectId
from datetime import datetime from datetime import datetime
router = APIRouter() router = APIRouter()
def build_location_filter(min_lat, max_lat, min_lon, max_lon):
"""Build location-based query filters."""
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None:
return [
{"latitude": {"$gte": min_lat}},
{"latitude": {"$lte": max_lat}},
{"longitude": {"$gte": min_lon}},
{"longitude": {"$lte": max_lon}},
]
return []
def build_datetime_filter(current_datetime):
"""Build filters for current datetime."""
if current_datetime:
return {
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{
"$and": [
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # Ongoing
{"end_date": None}, # No end date
]},
],
},
],
}
return None
def build_date_filter(start_date, end_date):
"""Build date range filters."""
if start_date and end_date:
return [
{"start_date": {"$gte": datetime.combine(start_date, datetime.min.time())}},
{"start_date": {"$lte": datetime.combine(end_date, datetime.max.time())}},
]
return []
def build_text_filter(item):
"""Build text-based search filters."""
if item:
return {
"$or": [
{"name": {"$regex": item, "$options": "i"}},
{"tags": {"$regex": item, "$options": "i"}},
{"organizers": {"$regex": item, "$options": "i"}},
]
}
return None
@router.get("/events", tags=["events"], response_model=list[events.EventOut]) @router.get("/events", tags=["events"], response_model=list[events.EventOut])
async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int = 1, tags: str | None = None, organizers: str | None = None, current_datetime: datetime | None = None, date_event: datetime |None = None, start_date: datetime | None = None, end_date: datetime | None = None): async def read_events(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
id_event: str | None = None,
name: str | None = None,
status: int = 1,
tags: str | None = None,
organizers: str | None = None,
current_datetime: datetime | None = None,
date_event: datetime | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
):
# Validate `skip` and `limit`
if limit < 1 or skip < 0 or limit < skip: if limit < 1 or skip < 0 or limit < skip:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip" detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
) )
limit = limit + skip limit = limit + skip
listEvents = []
event_repository = events.EventRepository(database=database.database)
object_search = {}
date_selected: bool = False
if date_event is not None:
start_of_day = datetime.combine(date_event, datetime.min.time()) # 2024-11-23 00:00:00
end_of_day = datetime.combine(date_event, datetime.max.time())
date_selected = True
if start_date is not None and end_date is not None:
start_of_day = datetime.combine(start_date, datetime.min.time()) # 2024-11-23 00:00:00
end_of_day = datetime.combine(end_date, datetime.max.time())
date_selected = True
if current_datetime is not None:
object_search ={
"$or": [{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
if date_selected is True:
object_search ={
"$and": [{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}} # Upcoming events
]
}
if status is not 1:
object_search = {"status":{"$eq": status}}
if current_datetime is not None:
object_search = { "$and": [ {"status":{"$eq": status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
if date_selected is True:
object_search = { "$and": [ {"status":{"$eq": status}},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]}
if tags is not None:
object_search = {"$and":[{"tags":{"$eq": tags}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"tags":{"$eq": tags}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if date_selected is True:
object_search = {"$and":[{"tags":{"$eq": tags}},
{"status":{"$eq":status}},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]
}
if organizers is not None:
object_search = {"$and":[{"organizers":{"$eq": organizers}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"organizers":{"$eq": organizers}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
if date_selected is True:
object_search = {"$and":[{"organizers":{"$eq": organizers}},
{"status":{"$eq":status}},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]}
if id_event is not None:
eventid = ObjectId(id_event)
object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"id":{"$regex": eventid}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
if date_selected is True:
object_search = {"$and":[{"id":{"$regex": eventid}},
{"status":{"$eq":status}},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]}
if name is not None:
object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]}
if current_datetime is not None:
object_search = {"$and":[{"name":{"$regex": name}},
{"status":{"$eq":status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]}
if date_selected is True:
object_search = {"$and":[{"name":{"$regex": name}},
{"status":{"$eq":status}},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]}
# Initialize filters
filters = []
# Add status filter
filters.append({"status": {"$eq": status}})
# Add date filters
if date_event:
start_of_day = datetime.combine(date_event, datetime.min.time())
end_of_day = datetime.combine(date_event, datetime.max.time())
filters.extend(build_date_filter(start_of_day, end_of_day))
elif start_date and end_date:
filters.extend(build_date_filter(start_date, end_date))
# Add current datetime filter
datetime_filter = build_datetime_filter(current_datetime)
if datetime_filter:
filters.append(datetime_filter)
# Add text-based filters
if name:
filters.append(build_text_filter(name))
if tags:
filters.append({"tags": {"$eq": tags}})
if organizers:
filters.append({"organizers": {"$eq": organizers}})
# Add ID filter
if id_event:
try:
event_id = ObjectId(id_event)
filters.append({"_id": {"$eq": event_id}})
except Exception:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid event ID format.")
# Combine all filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
event_repository = events.EventRepository(database=database.database)
list_events = []
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip): for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, tags=event_index.tags, imgUrl=event_index.imgUrl, name=event_index.name, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date) event = events.EventOut(
listEvents.append(event) id=event_index.id,
return listEvents tags=event_index.tags,
imgUrl=event_index.imgUrl,
name=event_index.name,
description=event_index.description,
place=event_index.place,
status=event_index.status,
start_date=event_index.start_date,
end_date=event_index.end_date,
)
list_events.append(event)
return list_events
@router.get("/events/search", tags=["events"], response_model=list[events.EventOut]) @router.get("/events/search", tags=["events"], response_model=list[events.EventOut])
async def search_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, item: str | None = None, status: int = 1, min_lat: float | None = None, max_lat: float | None = None, min_lon: float | None = None, max_lon: float | None = None, current_datetime: datetime | None = None, date_event: datetime | None = None, start_date: datetime | None = None, end_date: datetime | None = None): async def search_events(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
item: Union[str, None] = None,
status: int = 1,
min_lat: Union[float, None] = None,
max_lat: Union[float, None] = None,
min_lon: Union[float, None] = None,
max_lon: Union[float, None] = None,
current_datetime: Union[datetime, None] = None,
date_event: Union[datetime, None] = None,
start_date: Union[datetime, None] = None,
end_date: Union[datetime, None] = None,
tags: Union[str, None] = None,
):
if limit < 1 or skip < 0 or limit < skip: if limit < 1 or skip < 0 or limit < skip:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip" detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
) )
limit = limit + skip limit = limit + skip
listEvents = []
event_repository = events.EventRepository(database=database.database) # Initialize filters
object_search = {} filters = [{"status": {"$eq": status}}]
date_selected: bool = False
if date_event is not None: # Date filters
start_of_day = datetime.combine(date_event, datetime.min.time()) # 2024-11-23 00:00:00 if date_event:
start_of_day = datetime.combine(date_event, datetime.min.time())
end_of_day = datetime.combine(date_event, datetime.max.time()) end_of_day = datetime.combine(date_event, datetime.max.time())
date_selected = True filters.extend(build_date_filter(start_of_day, end_of_day))
if start_date is not None and end_date is not None: else:
start_of_day = datetime.combine(start_date, datetime.min.time()) # 2024-11-23 00:00:00 filters.extend(build_date_filter(start_date, end_date))
end_of_day = datetime.combine(dateend_date_event, datetime.max.time())
date_selected = True # Add location filter
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None: filters.extend(build_location_filter(min_lat, max_lat, min_lon, max_lon))
object_search = {
"$and": [ # Add datetime filter
{"status": {"$eq": status}}, datetime_filter = build_datetime_filter(current_datetime)
{ if datetime_filter:
"latitude": {"$gte": min_lat}, # Minimum latitude filters.append(datetime_filter)
},
{ # Add text filter
"latitude": {"$lte": max_lat}, # Maximum latitude text_filter = build_text_filter(item)
}, if text_filter:
{ filters.append(text_filter)
"longitude": {"$gte": min_lon}, # Minimum longitude
}, if tags is not None:
{ filters.append({"tags": {"$eq": tags}})
"longitude": {"$lte": max_lon}, # Maximum longitude
} # Combine filters
] object_search = {"$and": filters} if filters else {}
}
if current_datetime is not None: # Fetch and return results
object_search = { event_repository = events.EventRepository(database=database.database)
"$and": [ list_events = []
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if date_selected is True:
object_search = {
"$and": [
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]
}
if item is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}}
]
}
if current_datetime is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if date_selected is True:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]
}
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
}
]
}
if current_datetime is not None:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
},
{
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{"$and": [ # Ongoing events
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # End date in the future
{"end_date": None} # No end date set
]}
]}
]
}
]
}
if date_selected is True:
object_search = {
"$and": [
{
"$or": [
{"name": {"$regex": item}},
{"tags": {"$regex": item}},
{"organizers": {"$regex": item}}
]
},
{"status": {"$eq": status}},
{
"latitude": {"$gte": min_lat}, # Minimum latitude
},
{
"latitude": {"$lte": max_lat}, # Maximum latitude
},
{
"longitude": {"$gte": min_lon}, # Minimum longitude
},
{
"longitude": {"$lte": max_lon}, # Maximum longitude
},
{"start_date": {"$gte": start_of_day}},
{"start_date": {"$lte": end_of_day}}
]
}
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip): for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, tags=event_index.tags, imgUrl=event_index.imgUrl, name=event_index.name, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date) event = events.EventOut(
listEvents.append(event) id=event_index.id,
return listEvents tags=event_index.tags,
imgUrl=event_index.imgUrl,
name=event_index.name,
description=event_index.description,
place=event_index.place,
status=event_index.status,
start_date=event_index.start_date,
end_date=event_index.end_date,
)
list_events.append(event)
return list_events
@router.get("/events/me",tags=["events"]) @router.get("/events/me",tags=["events"])

230
app/routers/tags.py Normal file
View File

@@ -0,0 +1,230 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import tags, users
from pydantic import EmailStr
from typing import Annotated, Union
from bson import ObjectId
from datetime import datetime
router = APIRouter()
@router.get("/tags", tags=["tags"], response_model=list[tags.TagsOut])
async def read_tags(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
id_tags: str | None = None,
name: str | None = None
):
# Validate `skip` and `limit`
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
)
limit = limit + skip
# Initialize filters
filters = []
if name:
filters.append({"name": {"$eq": name, "$options": "i"}})
# Add ID filter
if id_tags:
try:
tags_id = ObjectId(id_tags)
filters.append({"_id": {"$eq": event_id}})
except Exception:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid event ID format.")
# Combine all filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
tags_repository = tags.TagsRepository(database=database.database)
list_tags = []
for tag_index in tags_repository.find_by(object_search, limit=limit, skip=skip):
tag = tags.TagsOut(
id=tag_index.id,
name=tag_index.name
)
list_tags.append(event)
return list_tags
@router.get("/tags/count", tags=["tags"])
async def read_tags_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
count = database.database.get_collection("tags").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/tags/{item_id}", tags=["tags"], response_model=tags.Tags)
async def read_tags_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
tags_repository = tags.TagsRepository(database=database.database)
tag = tags_repository.find_one_by_id(ObjectId(item_id))
return tag
@router.delete("/tags/groups",tags=["tags"])
async def delete_tags_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove: bool = False, tagsids: tags.TagsIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = event.EventRepository(database=database.database)
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted "}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/{item_id}", tags=["events"])
async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted"}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.put("/events/me/{item_id}",tags=["events"])
async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None):
event_repository = event.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.name = eventSingle.name
event.description = eventSingle.description
event.place = eventSingle.place
event.start_date = eventSingle.start_date
event.tags = eventSingle.tags
event.end_date = eventSingle.end_date
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.put("/events", tags=["events"], status_code=status.HTTP_201_CREATED)
async def update_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by({"name": {'$eq': eventSingle.name}})
if event is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="name"
)
event = events.Event(name=eventSingle.name, description=eventSingle.description, place=eventSingle.place)
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.imgUrl = eventSingle.imgUrl
event.tags = eventSingle.tags
event.status = 1
event.created_at = datetime.today()
event_repository.save(event)
content = {"message": "event is created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response
@router.put("/events/{item_id}", tags=["events"], status_code=status.HTTP_200_OK)
async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None, response: Response = Response):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}})
if event is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
event.name = eventSingle.name
event.place = eventSingle.place
event.description = eventSingle.description
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.tags = eventSingle.tags
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.patch("/events/groups",tags=["events"])
async def patch_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = events.EventRepository(database=database.database)
content = {"message": "events are enabled"}
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.patch("/events/{item_id}", tags=["events"])
async def patch_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
content = {"message": "event is enabled"}
response = JSONResponse(content=content)
return response