Compare commits

..

2 Commits

Author SHA1 Message Date
928fe6578c add models and routes for ip address 2024-03-29 23:19:45 +01:00
7fd9c6ff1a add ip address models 2024-03-29 23:03:23 +01:00
10 changed files with 306 additions and 405 deletions

View File

@@ -1,7 +1,7 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import users, token, mail, events, tags
from .routers import users, token, mail, events
from .dependencies import user_add
import os
@@ -10,10 +10,7 @@ app = FastAPI()
origins = [
"http://localhost:8084",
"https://backend.valczeryba.ovh",
"https://facebook.com",
"https://fetlife.com",
"https://backoffice.valczeryba.ovh"
"https://backend.valczeryba.ovh"
]
app.add_middleware(
@@ -29,7 +26,6 @@ app.include_router(users.router)
app.include_router(token.router)
app.include_router(mail.router)
app.include_router(events.router)
app.include_router(tags.router)
@app.on_event("startup")
@@ -45,4 +41,4 @@ async def startup_event():
@app.get("/")
async def root():
return {"message": "Hello World !"}
return {"message": "Hello World !"}

View File

@@ -6,13 +6,10 @@ class Event(BaseModel):
id: ObjectIdField = None
name: str
place: str
description: str
imgUrl: str | None = None
status: int = 0
latitude: float = 0.0
longitude: float = 0.0
organizers: list[str] = []
tags: list[str] = []
start_date: datetime | None = None
end_date: datetime | None = None
created_at: datetime = datetime.today()
@@ -24,21 +21,15 @@ class EventOut(BaseModel):
id: ObjectIdField = None
name: str
place: str
description: str
imgUrl: str | None = None
status: int = 0
start_date: datetime | None = None
end_date: datetime | None = None
tags: list[str] = []
class EventIn(BaseModel):
name: str
place: str
description: str
imgUrl: str | None = None
status: int = 0
organizers: list[str] = []
tags: list[str] = []
start_date: datetime | None = None
end_date: datetime | None = None
latitude: float = 0.0

33
app/models/ipaddress.py Normal file
View File

@@ -0,0 +1,33 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
from pydantic.networks import IPvAnyAddress
class IpAddress(BaseModel):
id: ObjectIdField = None
username: str
ip: IPvAnyAddress
socialnetwork: List[str] | None = None
status: int = 0
created_at: datetime = datetime.today()
updated_at: datetime | None = None
deleted_at: datetime | None = None
disabled_at: datetime | None = None
class IpAddressOut(BaseModel):
id: ObjectIdField = None
username: str
ip: IPvAnyAddress
socialnetwork: List[str] | None = None
status: int = 0
class IpAddressIn(BaseModel):
username: str
ip: IPvAnyAddress
socialnetwork: List[str] | None = None
status: int = 0
class IpAddressRepository(AbstractRepository[IpAdress]):
class Meta:
collection_name = "ipaddress"

View File

@@ -1,24 +0,0 @@
from pydantic import BaseModel, EmailStr
from pydantic_mongo import AbstractRepository, ObjectIdField
from datetime import datetime, date
class Tags(BaseModel):
id: ObjectIdField = None
name: str
created_at: datetime = datetime.today()
class TagsOut(BaseModel):
id: ObjectIdField = None
name: str
class TagsIn(BaseModel):
name: str
class TagsIDS(BaseModel):
ids: list[str]
class TagsRepository(AbstractRepository[Tags]):
class Meta:
collection_name = "tags"

View File

@@ -11,7 +11,7 @@ class User(BaseModel):
roles: str = "User"
status: int = 0
email: EmailStr
birth: datetime | None = None
birth: str | None = None
created_at: datetime = datetime.today()
connected_at: datetime | None = None
updated_at: datetime | None = None
@@ -42,9 +42,7 @@ class UserCreate(BaseModel):
username: str
password: str
email: EmailStr
birth: str
firstName: str
name: str
class UserInDB(User):
password: str

View File

@@ -2,221 +2,42 @@ from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import events, users, tags
from ..models import events, users
from pydantic import EmailStr
from typing import Annotated, Union
from typing import Annotated
from bson import ObjectId
from datetime import datetime
router = APIRouter()
def build_location_filter(min_lat, max_lat, min_lon, max_lon):
"""Build location-based query filters."""
if min_lat is not None and max_lat is not None and min_lon is not None and max_lon is not None:
return [
{"latitude": {"$gte": min_lat}},
{"latitude": {"$lte": max_lat}},
{"longitude": {"$gte": min_lon}},
{"longitude": {"$lte": max_lon}},
]
return []
def build_datetime_filter(current_datetime):
"""Build filters for current datetime."""
if current_datetime:
return {
"$or": [
{"start_date": {"$gte": current_datetime}}, # Upcoming events
{
"$and": [
{"start_date": {"$lte": current_datetime}}, # Already started
{"$or": [
{"end_date": {"$gte": current_datetime}}, # Ongoing
{"end_date": None}, # No end date
]},
],
},
],
}
return None
def build_date_filter(start_date, end_date):
"""Build date range filters."""
if start_date and end_date:
return [
{"start_date": {"$gte": datetime.combine(start_date, datetime.min.time())}},
{"start_date": {"$lte": datetime.combine(end_date, datetime.max.time())}},
]
return []
def build_text_filter(item):
"""Build text-based search filters."""
if item:
return {
"$or": [
{"name": {"$regex": item, "$options": "i"}},
{"tags": {"$regex": item, "$options": "i"}},
{"organizers": {"$regex": item, "$options": "i"}},
]
}
return None
@router.get("/events", tags=["events"], response_model=list[events.EventOut])
async def read_events(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
id_event: str | None = None,
name: str | None = None,
status: int = 1,
tags: str | None = None,
organizers: str | None = None,
current_datetime: datetime | None = None,
date_event: datetime | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
):
# Validate `skip` and `limit`
async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int | None = None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
limit = limit + skip
# Initialize filters
filters = []
# Add status filter
filters.append({"status": {"$eq": status}})
# Add date filters
if date_event:
start_of_day = datetime.combine(date_event, datetime.min.time())
end_of_day = datetime.combine(date_event, datetime.max.time())
filters.extend(build_date_filter(start_of_day, end_of_day))
elif start_date and end_date:
filters.extend(build_date_filter(start_date, end_date))
# Add current datetime filter
datetime_filter = build_datetime_filter(current_datetime)
if datetime_filter:
filters.append(datetime_filter)
# Add text-based filters
if name:
filters.append(build_text_filter(name))
if tags:
filters.append({"tags": {"$eq": tags}})
if organizers:
filters.append({"organizers": {"$eq": organizers}})
# Add ID filter
if id_event:
try:
event_id = ObjectId(id_event)
filters.append({"_id": {"$eq": event_id}})
except Exception:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid event ID format.")
# Combine all filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
listEvents = []
event_repository = events.EventRepository(database=database.database)
list_events = []
object_search = {}
if status is not None:
object_search = {"status":{"$eq": status}}
if id_event is not None:
eventid = ObjectId(id_event)
object_search = {"id": {"$regex": userid}}
if status is not None:
object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]}
if name is not None:
object_search = {"name": {"$regex": name}}
if status is not None:
object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]}
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(
id=event_index.id,
tags=event_index.tags,
imgUrl=event_index.imgUrl,
name=event_index.name,
description=event_index.description,
place=event_index.place,
status=event_index.status,
start_date=event_index.start_date,
end_date=event_index.end_date,
)
list_events.append(event)
return list_events
@router.get("/events/search", tags=["events"], response_model=list[events.EventOut])
async def search_events(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
item: Union[str, None] = None,
status: int = 1,
min_lat: Union[float, None] = None,
max_lat: Union[float, None] = None,
min_lon: Union[float, None] = None,
max_lon: Union[float, None] = None,
current_datetime: Union[datetime, None] = None,
date_event: Union[datetime, None] = None,
start_date: Union[datetime, None] = None,
end_date: Union[datetime, None] = None,
tags: Union[str, None] = None,
):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
)
limit = limit + skip
# Initialize filters
filters = [{"status": {"$eq": status}}]
# Date filters
if date_event:
start_of_day = datetime.combine(date_event, datetime.min.time())
end_of_day = datetime.combine(date_event, datetime.max.time())
filters.extend(build_date_filter(start_of_day, end_of_day))
else:
filters.extend(build_date_filter(start_date, end_date))
# Add location filter
filters.extend(build_location_filter(min_lat, max_lat, min_lon, max_lon))
# Add datetime filter
datetime_filter = build_datetime_filter(current_datetime)
if datetime_filter:
filters.append(datetime_filter)
# Add text filter
text_filter = build_text_filter(item)
if text_filter:
filters.append(text_filter)
if tags is not None:
filters.append({"tags": {"$eq": tags}})
# Combine filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
event_repository = events.EventRepository(database=database.database)
list_events = []
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(
id=event_index.id,
tags=event_index.tags,
imgUrl=event_index.imgUrl,
name=event_index.name,
description=event_index.description,
place=event_index.place,
status=event_index.status,
start_date=event_index.start_date,
end_date=event_index.end_date,
)
list_events.append(event)
return list_events
event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listEvents.append(event)
return listEvents
@router.get("/events/me",tags=["events"])
@@ -225,7 +46,7 @@ async def read_users_me(current_user: Annotated[users.User, Depends(users_token.
listOrganizers = []
for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, tags=event_index.tags, imgUrl=event_index.imgUrl, description=event_index.description, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listOrganizers.append(event)
content = {"organizers":listOrganizers}
@@ -307,16 +128,13 @@ async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(per
async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None):
event_repository = event.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.name = eventSingle.name
event.description = eventSingle.description
event.place = eventSingle.place
event.name = EventSingle.name
event.place = EventSingle.place
event.start_date = eventSingle.start_date
event.tags = eventSingle.tags
event.end_date = eventSingle.end_date
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
@@ -330,12 +148,8 @@ async def update_events(authorize: Annotated[bool, Depends(permissions_checker.P
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
tags_repository = tags.TagsRepository(database=database.database)
event = event_repository.find_one_by( {"$and": [
{"start_date": {"$eq": eventSingle.start_date}}, # Already started
{"name": {"$eq": eventSingle.name}},
]})
event = event_repository.find_one_by({"name": {'$eq': eventSingle.name}})
if event is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
@@ -343,20 +157,12 @@ async def update_events(authorize: Annotated[bool, Depends(permissions_checker.P
)
event = events.Event(name=eventSingle.name, description=eventSingle.description, place=eventSingle.place)
event = events.Event(name=eventSingle.name, place=eventSingle.place)
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.imgUrl = eventSingle.imgUrl
event.tags = eventSingle.tags
for tag_name in eventSingle.tags:
tag = tags_repository.find_one_by({"name": {'$eq': tag_name}})
if tag is None:
tag = tags.Tags(name=tag_name)
tags_repository.save(tag)
event.status = 1
event.created_at = datetime.today()
event_repository.save(event)
content = {"message": "event is created"}
@@ -372,7 +178,7 @@ async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(perm
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
tags_repository = tags.TagsRepository(database=database)
event = event_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}})
if event is None:
raise HTTPException(
@@ -382,20 +188,12 @@ async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(perm
event.name = eventSingle.name
event.place = eventSingle.place
event.description = eventSingle.description
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.tags = eventSingle.tags
for tag_name in eventSingle:
tag = tags_repository.find_one_by({"name": {'$eq': tag_name}})
if tag is None:
tag = Tags(name=tag_name)
tags_repository.save(tag)
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event.imgUrl = eventSingle.imgUrl
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)

234
app/routers/ipaddress.py Normal file
View File

@@ -0,0 +1,234 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import events, users
from pydantic import EmailStr
from typing import Annotated
from bson import ObjectId
router = APIRouter()
@router.get("/ipaddress", tags=["ipaddress"], response_model=list[ipaddress.IpAddressOut])
async def read_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], skip: int = 0, limit: int = 20, id_event: str | None = None, name: str | None = None, status: int | None = None):
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="skip should be greater than 0 and limit should be greater than 1. Limit should be greater than skip"
)
limit = limit + skip
listEvents = []
event_repository = events.EventRepository(database=database.database)
object_search = {}
if status is not None:
object_search = {"status":{"$eq": status}}
if id_event is not None:
eventid = ObjectId(id_event)
object_search = {"id": {"$regex": userid}}
if status is not None:
object_search = {"$and":[{"id":{"$regex": eventid}}, {"status":{"$eq":status}}]}
if name is not None:
object_search = {"name": {"$regex": name}}
if status is not None:
object_search = {"$and":[{"name":{"$regex": name}}, {"status":{"$eq":status}}]}
for event_index in event_repository.find_by(object_search, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listEvents.append(event)
return listEvents
@router.get("/events/me",tags=["events"])
async def read_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
event_repository = events.EventRepository(database=database.database)
listOrganizers = []
for event_index in event_repository.find_by({"organizers":{"$eq": current_user.username}}, limit=limit, skip=skip):
event = events.EventOut(id=event_index.id, name=event_index.name, place=event_index.place, status=event_index.status, start_date=event_index.start_date, end_date=event_index.end_date)
listOrganizers.append(event)
content = {"organizers":listOrganizers}
response = JSONResponse(content=content)
return response
@router.get("/events/count", tags=["events"])
async def read_events_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
count = database.database.get_collection("events").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/events/{item_id}", tags=["events"], response_model=events.Event)
async def read_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
return event
@router.delete("/events/me/{item_id}", tags=["events"])
async def delete_event_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], remove: bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.deleted_at = datetime.today()
event.status = -1
content = {"message": "event is deleted"}
else:
event.status = 0
event.deleted_at = datetime.today()
content = {"message": "event is disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/groups",tags=["events"])
async def delete_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove: bool = False, eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = event.EventRepository(database=database.database)
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted "}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.delete("/events/{item_id}", tags=["events"])
async def delete_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], remove : bool = False):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
if remove is True:
event.status = -1
event.deleted_at = datetime.today()
content = {"message": "events are deleted"}
else:
event.status = 0
event.disabled_at = datetime.today()
content = {"message": "events are disabled"}
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.put("/events/me/{item_id}",tags=["events"])
async def update_events_me(item_id: str, current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], eventSingle: events.EventIn | None = None):
event_repository = event.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.name = EventSingle.name
event.place = EventSingle.place
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.put("/events", tags=["events"], status_code=status.HTTP_201_CREATED)
async def update_events(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by({"name": {'$eq': eventSingle.name}})
if event is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="name"
)
event = events.Event(name=eventSingle.name, place=eventSingle.place)
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.created_at = datetime.today()
event_repository.save(event)
content = {"message": "event is created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response
@router.put("/events/{item_id}", tags=["events"], status_code=status.HTTP_200_OK)
async def update_events_id(item_id: str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventSingle: events.EventIn | None = None, response: Response = Response):
if eventSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by({"id": {'$eq': ObjectId(item_id)}})
if event is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
event.name = eventSingle.name
event.place = eventSingle.place
event.start_date = eventSingle.start_date
event.end_date = eventSingle.end_date
event.organizers = eventSingle.organizers
event.latitude = eventSingle.latitude
event.longitude = eventSingle.longitude
event.updated_at = datetime.today()
event_repository.save(event)
content = {"message": "event is updated"}
response = JSONResponse(content=content)
return response
@router.patch("/events/groups",tags=["events"])
async def patch_events_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], eventids: events.EventIDS | None = None):
if len(eventids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
event_repository = events.EventRepository(database=database.database)
content = {"message": "events are enabled"}
for i in eventids.ids:
event = event_repository.find_one_by_id(ObjectId(i))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
response = JSONResponse(content=content)
return response
@router.patch("/events/{item_id}", tags=["events"])
async def patch_events_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
event_repository = events.EventRepository(database=database.database)
event = event_repository.find_one_by_id(ObjectId(item_id))
event.status = 1
event.disabled_at = None
event.deleted_at = None
event_repository.save(event)
content = {"message": "event is enabled"}
response = JSONResponse(content=content)
return response

View File

@@ -19,7 +19,7 @@ async def create_user(userSingle: users.UserCreate | None = None):
if user is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="User already exists"
detail="User is already exist"
)
fm = FastMail(mail.conf)
@@ -35,8 +35,7 @@ async def create_user(userSingle: users.UserCreate | None = None):
)
await fm.send_message(message, template_name="mailer.html")
current_user = users.User(username=userSingle.username, password=users_token.get_password_hash(userSingle.password), email=userSingle.email, name=userSingle.name, firstName=userSingle.firstName)
current_user.status = 0
current_user = users.User(username=userSingle.username, password=users_token.get_password_hash(userSingle.password), email=userSingle.email)
user_repository.save(current_user)
database.connect_redis.set(userSingle.username, key_hashed)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "email has been sent"})
@@ -56,6 +55,6 @@ async def confirm_user(key: str | None = None, username: str | None = None):
status_code=status.HTTP_400_BAD_REQUEST,
detail="Key is invalid"
)
user.status = 1
user.confirmed = True
user_repository.save(user)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "user account confirmed"})

View File

@@ -1,119 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.responses import JSONResponse
from datetime import datetime
from ..dependencies import users_token, permissions_checker, database
from ..models import tags, users
from pydantic import EmailStr
from typing import Annotated, Union
from bson import ObjectId
from datetime import datetime
router = APIRouter()
@router.get("/tags", tags=["tags"], response_model=list[tags.TagsOut])
async def read_tags(
authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))],
skip: int = 0,
limit: int = 20,
id_tags: str | None = None,
name: str | None = None
):
# Validate `skip` and `limit`
if limit < 1 or skip < 0 or limit < skip:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="`skip` should be >= 0 and `limit` should be > 0 and greater than `skip`.",
)
limit = limit + skip
# Initialize filters
filters = []
if name:
filters.append({"name": {"$regex": name, "$options": "i"}})
# Add ID filter
if id_tags:
try:
tags_id = ObjectId(id_tags)
filters.append({"_id": {"$eq": event_id}})
except Exception:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid event ID format.")
# Combine all filters
object_search = {"$and": filters} if filters else {}
# Fetch and return results
tags_repository = tags.TagsRepository(database=database.database)
list_tags = []
for tag_index in tags_repository.find_by(object_search, limit=limit, skip=skip):
tag = tags.TagsOut(
id=tag_index.id,
name=tag_index.name
)
list_tags.append(tag)
return list_tags
@router.get("/tags/count", tags=["tags"])
async def read_tags_count(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))]):
count = database.database.get_collection("tags").estimated_document_count()
content = {"count":count}
response = JSONResponse(content=content)
return response
@router.get("/tags/{item_id}", tags=["tags"], response_model=tags.Tags)
async def read_tags_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
tags_repository = tags.TagsRepository(database=database.database)
tag = tags_repository.find_one_by_id(ObjectId(item_id))
return tag
@router.delete("/tags/groups",tags=["tags"])
async def delete_tags_groups(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], tagsids: tags.TagsIDS | None = None):
if len(tagsids.ids) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="eventids should be greater than 0"
)
tag_repository = tags.TagsRepository(database=database.database)
for i in tagsids.ids:
tag = tag_repository.find_one_by_id(ObjectId(i))
tag_repository.delete_one(tag)
content = {"message": "tags removed"}
response = JSONResponse(content=content)
return response
@router.delete("/tags/{item_id}", tags=["tags"])
async def delete_tags_id(item_id : str, authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))]):
tag_repository = tags.TagsRepository(database=database.database)
tag = tag_repository.find_one_by_id(ObjectId(item_id))
event_repository.delete_one(event)
content = {"message": "tags delete"}
response = JSONResponse(content=content)
return responsed
@router.put("/tags", tags=["tags"], status_code=status.HTTP_201_CREATED)
async def update_tags(authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin"]))], tagSingle: tags.TagsIn | None = None):
if tagSingle is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Body request is empty"
)
tag_repository = tags.TagsRepository(database=database.database)
tag = tag_repository.find_one_by({"name": {'$eq': tagSingle.name}})
if tag is not None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT,
detail="name"
)
tag = tags.Tags(name=tagSingle.name)
tag.created_at = datetime.today()
tag_repository.save(event)
content = {"message": "tags created"}
response = JSONResponse(content=content, status_code=status.HTTP_201_CREATED)
return response

View File

@@ -139,17 +139,13 @@ async def delete_users_id(item_id : str, authorize: Annotated[bool, Depends(perm
response = JSONResponse(content=content)
return response
@router.put("/users/me",tags=["users"])
async def update_users_me(current_user: Annotated[users.User, Depends(users_token.get_current_active_user)], authorize: Annotated[bool, Depends(permissions_checker.PermissionChecker(roles=["Admin", "User"]))], userSingle: users.UserIn | None = None):
user_repository = users.UserRepository(database=database.database)
current_user.username = userSingle.username
if len(userSingle.password) > 0:
current_user.password = user_token.get_password_hash(userSingle.password)
current_user.password = user_token.get_password_hash(userSingle.password)
current_user.roles = userSingle.roles
current_user.email = userSingle.email
current_user.name = userSingle.name
current_user.firstName = userSingle.firstName
current_user.birth = userSingle.birth
user_repository.save(current_user)
content = {"message": "user is updated"}
response = JSONResponse(content=content)
@@ -206,8 +202,7 @@ async def update_users_id(item_id: str, authorize: Annotated[bool, Depends(permi
)
user.username = userSingle.username
if len(userSingle.password) > 0:
user.password = users_token.get_password_hash(userSingle.password)
user.password = users_token.get_password_hash(userSingle.password)
user.roles = userSingle.roles
user.email = userSingle.email
user.firstName = userSingle.firstName