create project
This commit is contained in:
57
app/src/routers/history_find.py
Normal file
57
app/src/routers/history_find.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
Depends,
|
||||
status,
|
||||
HTTPException
|
||||
)
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from src.settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
from ..models.history_find import *
|
||||
from typing import List
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from ..models.models import (
|
||||
UserModel,
|
||||
ShowUserModel,
|
||||
UpdateUserModel
|
||||
)
|
||||
from ..dependecies import (
|
||||
get_current_user,
|
||||
authenticate_user,
|
||||
create_access_token,
|
||||
get_password_hash
|
||||
)
|
||||
import re
|
||||
|
||||
history = APIRouter()
|
||||
##############################POST###############################################
|
||||
@history.post("/create_history", response_description="Add new user", response_model=HistoryFindModel)
|
||||
async def create_history(history: HistoryFindModel, current_user: ShowUserModel = Depends(get_current_user)):
|
||||
datetime_now = datetime.now()
|
||||
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
post = jsonable_encoder(post)
|
||||
new_post = await db["history_find"].insert_one(post)
|
||||
created = await db["history_find"].find_one({"_id": new_post.inserted_id})
|
||||
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created)
|
||||
|
||||
@history.get(
|
||||
"/list_history", response_description="List all posts", response_model=List[HistoryFindModel]
|
||||
)
|
||||
async def list_post(current_user: ShowUserModel = Depends(get_current_user)):
|
||||
history_find = await db["history_find"].find().to_list(1000)
|
||||
return history_find
|
||||
|
||||
|
||||
# @post.get(
|
||||
# "/get_post_by_name", response_description="Get a single posot", response_model=PostModel
|
||||
# )
|
||||
# async def show_post(id: str):
|
||||
# print(id)
|
||||
# post = await db["posts"].find_one({"_id": id})
|
||||
# print(post)
|
||||
# if post is not None:
|
||||
# return post
|
||||
# else:
|
||||
# return None
|
||||
202
app/src/routers/post.py
Normal file
202
app/src/routers/post.py
Normal file
@@ -0,0 +1,202 @@
|
||||
from email.policy import default
|
||||
from fastapi import APIRouter,Depends,status,HTTPException,UploadFile, File, Header
|
||||
from fastapi.responses import JSONResponse, FileResponse, StreamingResponse
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
import logging
|
||||
from ..dependecies import (
|
||||
get_current_user,
|
||||
authenticate_user,
|
||||
create_access_token,
|
||||
get_password_hash
|
||||
)
|
||||
from src.settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
from ..models.post import (
|
||||
PostModel,
|
||||
UpdatePostModel,
|
||||
ShowPostModel
|
||||
)
|
||||
from ..models.models import (
|
||||
UserModel,
|
||||
ShowUserModel,
|
||||
UpdateUserModel
|
||||
)
|
||||
from ..dependecies import (
|
||||
get_current_user,
|
||||
authenticate_user,
|
||||
create_access_token,
|
||||
get_password_hash
|
||||
)
|
||||
from ..models.history_find import *
|
||||
from typing import List
|
||||
from datetime import datetime
|
||||
import os
|
||||
import re
|
||||
post = APIRouter()
|
||||
##############################POST###############################################
|
||||
@post.post("/create_post", response_description="Add new user", response_model=PostModel)
|
||||
async def create_post(post: PostModel, current_user: UserModel = Depends(get_current_user)):
|
||||
try:
|
||||
datetime_now = datetime.now()
|
||||
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
post = jsonable_encoder(post)
|
||||
new_post = await db["posts"].insert_one(post)
|
||||
created_post = await db["posts"].find_one({"_id": new_post.inserted_id})
|
||||
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_post)
|
||||
except NameError:
|
||||
return NameError
|
||||
|
||||
@post.get(
|
||||
"/list_post", response_description="List all posts", response_model=List[ShowPostModel]
|
||||
)
|
||||
async def list_post(current_user: UserModel = Depends(get_current_user)):
|
||||
posts = await db["posts"].find().to_list(1000)
|
||||
print(posts)
|
||||
print(len(posts))
|
||||
return posts
|
||||
|
||||
@post.post(
|
||||
"/find_list_post", response_description="search list posts", response_model=List[ShowPostModel]
|
||||
)
|
||||
async def list_post(history: HistoryFindModel, current_user: UserModel = Depends(get_current_user)):
|
||||
|
||||
point_data=["point.less10",
|
||||
"point.form10to20",
|
||||
"point.form20to30",
|
||||
"point.form30to40",
|
||||
"point.form40to50",
|
||||
"point.form50to60",
|
||||
]
|
||||
age_sort = "point.total"
|
||||
history = jsonable_encoder(history)
|
||||
if history.get("age", None) != None:
|
||||
if history.get("age") >59:
|
||||
age_sort = "point.bigger60"
|
||||
else:
|
||||
age_sort = point_data[history.get("age")//10]
|
||||
posts = await db["posts"].find({"translation_post": { "$regex": history.get("key_find") } }).sort(age_sort, -1).to_list(100)
|
||||
|
||||
print(posts)
|
||||
return posts
|
||||
|
||||
|
||||
@post.post(
|
||||
"/get_post_by_name", response_description="Get a single post", response_model=PostModel
|
||||
)
|
||||
async def get_post_by_name(history: HistoryFindModel, current_user: UserModel = Depends(get_current_user)
|
||||
):
|
||||
datetime_now = datetime.now()
|
||||
history.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
post = await db["posts"].find_one({"_id": history.post_id})
|
||||
history = jsonable_encoder(history)
|
||||
new_his = await db["history_find"].insert_one(history)
|
||||
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
|
||||
print(created)
|
||||
if post is not None:
|
||||
return post
|
||||
else:
|
||||
return None
|
||||
|
||||
@post.delete("/delete_post/{id}", response_description="Delete a post")
|
||||
async def delete_user(id: str, current_user: UserModel = Depends(get_current_user)):
|
||||
delete_result = await db["posts"].delete_one({"_id": id})
|
||||
if delete_result.deleted_count == 1:
|
||||
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
|
||||
raise HTTPException(status_code=404, detail=f"Post {id} not found")
|
||||
|
||||
@post.get("/score", response_description="score all post", response_model=List[ShowPostModel])
|
||||
async def score_all_post( current_user: UserModel = Depends(get_current_user)):
|
||||
posts = await db["posts"].find().to_list(1000)
|
||||
for dt_post in posts:
|
||||
print(dt_post)
|
||||
data_old = dt_post
|
||||
dt_post["point"]["less10"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 0, '$lte': 9
|
||||
}})
|
||||
dt_post["point"]["form10to20"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 10, '$lte': 19
|
||||
}})
|
||||
dt_post["point"]["form20to30"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 20, '$lte': 29
|
||||
}})
|
||||
dt_post["point"]["form30to40"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 30, '$lte': 39
|
||||
}})
|
||||
dt_post["point"]["form40to50"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 40, '$lte': 49
|
||||
}})
|
||||
dt_post["point"]["form50to60"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 50, '$lte': 59
|
||||
}})
|
||||
dt_post["point"]["bigger60"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age":{
|
||||
'$gte': 60
|
||||
}})
|
||||
dt_post["point"]["total"] = await db["history_find"].count_documents({"post_id": dt_post["_id"]})
|
||||
# await db["posts"].update_one(data_old, dt_post)
|
||||
await db["posts"].update_one({"_id": dt_post["_id"]}, {"$set": {
|
||||
"point":{
|
||||
"less10": dt_post["point"]["less10"],
|
||||
"form10to20": dt_post["point"]["form10to20"],
|
||||
"form20to30": dt_post["point"]["form20to30"],
|
||||
"form30to40": dt_post["point"]["form30to40"],
|
||||
"form40to50": dt_post["point"]["form40to50"],
|
||||
"form50to60": dt_post["point"]["form50to60"],
|
||||
"bigger60": dt_post["point"]["bigger60"],
|
||||
"total": dt_post["point"]["total"]
|
||||
}
|
||||
}})
|
||||
|
||||
return posts
|
||||
|
||||
|
||||
|
||||
@post.post("/uploadfiles/")
|
||||
async def create_upload_files(
|
||||
files: List[UploadFile] = File(...), current_user: UserModel = Depends(get_current_user)
|
||||
):
|
||||
try:
|
||||
file_name =[]
|
||||
i = 0
|
||||
file_location = f"../media/"
|
||||
for file in files:
|
||||
now = datetime.now()
|
||||
current_time = now.strftime("%H:%M:%S_%d-%m-%Y_")
|
||||
file_save = file_location + current_time + str(i) + file.filename
|
||||
file_name.append(current_time + str(i) + file.filename)
|
||||
i = i + 1
|
||||
with open(file_save, "wb+") as file_object:
|
||||
file_object.write(file.file.read())
|
||||
return {"filenames": file_name}
|
||||
except Exception as e:
|
||||
return JSONResponse(
|
||||
status_code = status.HTTP_400_BAD_REQUEST,
|
||||
content = { 'message' : str(e) }
|
||||
)
|
||||
|
||||
@post.get("/showfile/{name}")
|
||||
async def show_file(name: str, current_user: UserModel = Depends(get_current_user)):
|
||||
file_path = f"../media/" + name
|
||||
if os.path.exists(file_path):
|
||||
return FileResponse(file_path)
|
||||
return {"erro": "File not found!"}
|
||||
|
||||
@post.get("/showvideo/{file_name}", response_class=FileResponse)
|
||||
async def main(file_name: str, current_user: UserModel = Depends(get_current_user)):
|
||||
file_path = f"../media/" + file_name
|
||||
return file_path
|
||||
|
||||
@post.get("/video")
|
||||
async def video_endpoint(video_name, current_user: UserModel = Depends(get_current_user)):
|
||||
def iterfile():
|
||||
video_path = f"../media/" + video_name
|
||||
with open(video_path, mode="rb") as file_like:
|
||||
yield from file_like
|
||||
|
||||
return StreamingResponse(iterfile(), media_type="video/mp4")
|
||||
158
app/src/routers/routers.py
Normal file
158
app/src/routers/routers.py
Normal file
@@ -0,0 +1,158 @@
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
Depends,
|
||||
status,
|
||||
HTTPException
|
||||
)
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from fastapi import File, UploadFile, FastAPI
|
||||
from ..models.models import (
|
||||
UserModel,
|
||||
ShowUserModel,
|
||||
UpdateUserModel
|
||||
)
|
||||
from ..dependecies import (
|
||||
get_current_user,
|
||||
authenticate_user,
|
||||
create_access_token,
|
||||
get_password_hash
|
||||
)
|
||||
from ..settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
|
||||
from typing import List
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import re
|
||||
from pydantic import BaseModel, Field
|
||||
class LoginRequest(BaseModel):
|
||||
username: str
|
||||
password: str
|
||||
router = APIRouter()
|
||||
# ============= Creating path operations ==============
|
||||
@router.post("/create_user", response_description="Add new user", response_model=UserModel)
|
||||
async def create_user(user: UserModel, file: UploadFile = File(...)):
|
||||
if re.match("admin|dev|simple mortal", user.role):
|
||||
datetime_now = datetime.now()
|
||||
user.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
user.password = get_password_hash(user.password)
|
||||
user = jsonable_encoder(user)
|
||||
file_location = f"../media/"
|
||||
current_time = datetime_now.strftime("%H:%M:%S_%d-%m-%Y_")
|
||||
file_save = file_location + current_time + file.filename
|
||||
user.avatar = current_time + file.filename
|
||||
with open(file_save, "wb+") as file_object:
|
||||
file_object.write(file.file.read())
|
||||
new_user = await db["users"].insert_one(user)
|
||||
await db["users"].update_one({"_id": new_user.inserted_id}, {
|
||||
"$rename": {"password": "hashed_pass"}})
|
||||
|
||||
created_user = await db["users"].find_one({"_id": new_user.inserted_id})
|
||||
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_user)
|
||||
|
||||
raise HTTPException(status_code=406, detail="User role not acceptable")
|
||||
|
||||
|
||||
|
||||
@router.post("/login")
|
||||
async def login_for_access_token(body: LoginRequest):
|
||||
print(body)
|
||||
user = await authenticate_user(body.username, body.password)
|
||||
print(body)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorect ID or password",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
access_token = create_access_token(
|
||||
data={"sub": user["username"]}, expires_delta=access_token_expires
|
||||
)
|
||||
await db["users"].update_one({"username": body.username}, {"$set": {
|
||||
"last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
|
||||
"is_active": "true"
|
||||
}})
|
||||
|
||||
return {"access_token": access_token, "token_type": "bearer"}
|
||||
|
||||
@router.post("/token")
|
||||
async def login_for_access_token_2(body: LoginRequest):
|
||||
print(body)
|
||||
user = await authenticate_user(body.username, body.password)
|
||||
print(body)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorect ID or password",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
access_token = create_access_token(
|
||||
data={"sub": user["username"]}, expires_delta=access_token_expires
|
||||
)
|
||||
await db["users"].update_one({"username": body.username}, {"$set": {
|
||||
"last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
|
||||
"is_active": "true"
|
||||
}})
|
||||
|
||||
return {"access_token": access_token, "token_type": "bearer"}
|
||||
|
||||
@router.get(
|
||||
"/list", response_description="List all users", response_model=List[ShowUserModel]
|
||||
)
|
||||
async def list_users(current_user: ShowUserModel = Depends(get_current_user)):
|
||||
users = await db["users"].find().to_list(1000)
|
||||
for user in users:
|
||||
user["is_active"] = "false"
|
||||
try:
|
||||
last_login = datetime.strptime(user["last_login"], "%m/%d/%y %H:%M:%S")
|
||||
my_delta = datetime.now() - last_login
|
||||
if my_delta <= timedelta(days=30):
|
||||
user["is_active"] = "true"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return users
|
||||
|
||||
|
||||
|
||||
@router.get("/current", response_description="Current User", response_model=ShowUserModel)
|
||||
async def current_user(current_user: ShowUserModel = Depends(get_current_user)):
|
||||
return current_user
|
||||
|
||||
|
||||
|
||||
|
||||
@router.put("/admin/{user_id}", response_description="Update a user", response_model=UpdateUserModel)
|
||||
async def update_user(user_id: str, user: UpdateUserModel, current_user: UserModel = Depends(get_current_user)):
|
||||
if current_user["role"] == "admin":
|
||||
user = {k: v for k, v in user.dict().items() if v is not None}
|
||||
|
||||
|
||||
if len(user) >= 1:
|
||||
update_result = await db["users"].update_one({"_id": user_id}, {"$set": user})
|
||||
|
||||
if update_result.modified_count == 1:
|
||||
if (
|
||||
updated_user := await db["users"].find_one({"_id": user_id})
|
||||
) is not None:
|
||||
return updated_user
|
||||
|
||||
if (existing_user := await db["users"].find_one({"_id": user_id})) is not None:
|
||||
return existing_user
|
||||
|
||||
raise HTTPException(status_code=404, detail=f"User {user_id} not found")
|
||||
else:
|
||||
raise HTTPException(status_code=403, detail=f"Not having sufficient rights to modify the content")
|
||||
|
||||
|
||||
|
||||
@router.delete("/delete_user/{user_id}", response_description="Delete a user")
|
||||
async def delete_user(user_id: str, current_user: ShowUserModel = Depends(get_current_user)):
|
||||
delete_result = await db["users"].delete_one({"_id": user_id})
|
||||
if delete_result.deleted_count == 1:
|
||||
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
|
||||
|
||||
raise HTTPException(status_code=404, detail=f"User {user_id} not found")
|
||||
Reference in New Issue
Block a user