save
This commit is contained in:
70
app/src/routers/history.py
Normal file
70
app/src/routers/history.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
Depends,
|
||||
status,
|
||||
HTTPException
|
||||
)
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from src.settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
# from ..models.history_find import *
|
||||
from ..models.history import *
|
||||
from typing import List
|
||||
from datetime import datetime, timedelta
|
||||
from ..models.post import (
|
||||
PostModel,
|
||||
UpdatePostModel,
|
||||
ShowPostModel
|
||||
)
|
||||
from ..models.models import (
|
||||
UserModel,
|
||||
ShowUserModel,
|
||||
UpdateUserModel
|
||||
)
|
||||
from ..dependecies import (
|
||||
get_current_user,
|
||||
authenticate_user,
|
||||
create_access_token,
|
||||
get_password_hash
|
||||
)
|
||||
import re
|
||||
import pytz
|
||||
import datetime
|
||||
tz = pytz.timezone('Asia/Ho_Chi_Minh')
|
||||
history = APIRouter()
|
||||
##############################POST###############################################
|
||||
|
||||
|
||||
@history.post("/create_history", response_description="history", response_model=HistoryByUserModel)
|
||||
async def create_history(username: str, status: str, note: str, history: HistoryByUserModel = None):
|
||||
history.created_at = datetime.datetime.now(tz=tz)
|
||||
history = jsonable_encoder(history)
|
||||
history["user_name"] = username
|
||||
history["status"] = status
|
||||
history["note"] = note
|
||||
print(history)
|
||||
history_new = await db["history"].insert_one(history)
|
||||
create_history = await db["history"].find_one({"_id": history_new.inserted_id})
|
||||
create_history = jsonable_encoder(create_history)
|
||||
print(type(create_history))
|
||||
# return JSONResponse(status_code=status.HTTP_201_CREATED, content=create_history)
|
||||
return create_history
|
||||
|
||||
|
||||
@history.get(
|
||||
"/list_history", response_description="List all posts", response_model=List[HistoryByUserModel]
|
||||
)
|
||||
async def list_post():
|
||||
history_find = await db["history"].find().to_list(1000)
|
||||
return history_find
|
||||
|
||||
|
||||
@ history.get("/user_history", response_description="Get list Posts viewed")
|
||||
async def get_list_post_view_by_username(token: str):
|
||||
data_token = await get_current_user(token)
|
||||
user_name = data_token.get("user_name", None)
|
||||
if user_name == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
history_find = await db["history"].find({"user_name": user_name}).to_list(100)
|
||||
return history_find
|
||||
@@ -28,30 +28,31 @@ from ..dependecies import (
|
||||
get_password_hash
|
||||
)
|
||||
import re
|
||||
|
||||
history = APIRouter()
|
||||
import pytz
|
||||
import datetime
|
||||
tz = pytz.timezone('Asia/Ho_Chi_Minh')
|
||||
history_find = APIRouter()
|
||||
##############################POST###############################################
|
||||
|
||||
|
||||
@history.post("/create_history", response_description="history", response_model=HistoryFindModel)
|
||||
async def create_history(history: HistoryFindModel):
|
||||
datetime_now = datetime.now()
|
||||
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
@history_find.post("/create_history_find", response_description="history", response_model=HistoryFindModel)
|
||||
async def create_history_find(history: HistoryFindModel):
|
||||
post.created_at = datetime.datetime.now(tz=tz)
|
||||
post = jsonable_encoder(post)
|
||||
new_post = await db["history_find"].insert_one(post)
|
||||
created = await db["history_find"].find_one({"_id": new_post.inserted_id})
|
||||
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created)
|
||||
|
||||
|
||||
@history.get(
|
||||
"/list_history", response_description="List all posts", response_model=List[HistoryFindModel]
|
||||
@history_find.get(
|
||||
"/list_history_find", response_description="List all posts", response_model=List[HistoryFindModel]
|
||||
)
|
||||
async def list_post():
|
||||
async def list_post_in_history_find():
|
||||
history_find = await db["history_find"].find().to_list(1000)
|
||||
return history_find
|
||||
|
||||
|
||||
@ history.get("/list_history_by_user", response_description="Get list Posts viewed", response_model=List[HistoryFindModel])
|
||||
@ history_find.get("/list_history_by_user_find", response_description="Get list Posts viewed", response_model=List[HistoryFindModel])
|
||||
async def get_list_post_view_by_username(username: str, current_user: ShowUserModel = Depends(get_current_user)):
|
||||
history_find = await db["history_find"].find({"username": current_user["username"]}).to_list(10)
|
||||
return history_find
|
||||
@@ -68,7 +69,7 @@ async def get_list_post_view_by_username(username: str, current_user: ShowUserMo
|
||||
# return post_view
|
||||
|
||||
|
||||
@history.get("/list_post_by_user", response_description="Get list Posts viewed", response_model=List[HistoryFindByUserModel])
|
||||
@history_find.get("/list_post_by_user", response_description="Get list Posts viewed", response_model=List[HistoryFindByUserModel])
|
||||
async def get_list_post_view_by_username(current_user: ShowUserModel = Depends(get_current_user)):
|
||||
history_find = await db["history_find"].find({"username": current_user["username"]}).to_list(10)
|
||||
post_view = []
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from email.policy import default
|
||||
import imp
|
||||
from fastapi import APIRouter, Depends, status, HTTPException, UploadFile, File, Header, Request
|
||||
from fastapi.responses import JSONResponse, FileResponse, StreamingResponse
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
@@ -17,6 +18,7 @@ from ..models.post import (
|
||||
UpdatePostModel,
|
||||
ShowPostModel
|
||||
)
|
||||
from ..routers.history import *
|
||||
from ..models.models import (
|
||||
UserModel,
|
||||
ShowUserModel,
|
||||
@@ -28,15 +30,21 @@ from ..dependecies import (
|
||||
create_access_token,
|
||||
get_password_hash
|
||||
)
|
||||
# from history import *
|
||||
from ..settings import *
|
||||
from ..models.history_find import *
|
||||
from ..routers.routers import *
|
||||
from typing import List
|
||||
from datetime import datetime
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
import codecs
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from fastapi.responses import HTMLResponse
|
||||
import pytz
|
||||
import datetime
|
||||
tz = pytz.timezone('Asia/Ho_Chi_Minh')
|
||||
|
||||
post = APIRouter()
|
||||
|
||||
##############################POST###############################################
|
||||
@@ -51,13 +59,17 @@ async def post_html(content: str, request: Request):
|
||||
@post.post("/create_post", response_description="Add new post", response_model=PostModel)
|
||||
async def create_post(post: PostModel,
|
||||
# current_user: UserModel = Depends(get_current_user)
|
||||
# token: TokenModel
|
||||
):
|
||||
try:
|
||||
print(post)
|
||||
# if current_user["role"] == "user":
|
||||
# return JSONResponse(content="Role User không được phép tạo bài viết")
|
||||
datetime_now = datetime.now()
|
||||
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
# print(post)
|
||||
# data_token = await get_current_user(token)
|
||||
# data = data_token.get("user_name", None)
|
||||
# user_type = data_token.get("user_type", None)
|
||||
# if data == None:
|
||||
# return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
|
||||
post.created_at = datetime.datetime.now(tz=tz)
|
||||
post = jsonable_encoder(post)
|
||||
if(post.get("point", None) == None):
|
||||
post["point"] = {
|
||||
@@ -70,27 +82,60 @@ async def create_post(post: PostModel,
|
||||
"bigger60": 0,
|
||||
"total": 0
|
||||
}
|
||||
# create_history()
|
||||
new_post = await db["posts"].insert_one(post)
|
||||
created_post = await db["posts"].find_one({"_id": new_post.inserted_id})
|
||||
# aaa = create_history(username=data, status="tạo bài viết",
|
||||
# note=new_post.inserted_id, history=None)
|
||||
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_post)
|
||||
except NameError:
|
||||
return NameError
|
||||
|
||||
|
||||
@post.get(
|
||||
@ post.post(
|
||||
"/list_post", response_description="List all posts"
|
||||
)
|
||||
async def list_post():
|
||||
async def list_post(
|
||||
token: TokenModel = None
|
||||
):
|
||||
posts = await db["posts"].find().to_list(1000)
|
||||
print(posts)
|
||||
print(len(posts))
|
||||
return posts
|
||||
output = []
|
||||
if token.token != None:
|
||||
posts = jsonable_encoder(posts)
|
||||
for post in posts:
|
||||
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
|
||||
|
||||
payload = {'token': token.token}
|
||||
|
||||
headers = {
|
||||
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
|
||||
}
|
||||
|
||||
response = requests.request(
|
||||
"POST", url, headers=headers, data=payload)
|
||||
data_token = json.loads(response.text)
|
||||
|
||||
data = data_token.get("user_name", None)
|
||||
user_type = data_token.get("user_type", None)
|
||||
if data == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
post = jsonable_encoder(post)
|
||||
|
||||
count = await db["post_save"].count_documents({"post_id": post["_id"], "username": data})
|
||||
if count != 0:
|
||||
post["post_save"] = True
|
||||
else:
|
||||
post["post_save"] = False
|
||||
output.append(post)
|
||||
return output
|
||||
else:
|
||||
return posts
|
||||
|
||||
|
||||
@post.post(
|
||||
"/find_list_post", response_description="search list posts", response_model=List[ShowPostModel]
|
||||
@ post.post(
|
||||
"/find_list_post", response_description="search list posts"
|
||||
)
|
||||
async def list_post(key_find: str, data: str = None):
|
||||
async def list_post(key_find: str, token: TokenModel = None):
|
||||
|
||||
# point_data = ["point.less10",
|
||||
# "point.form10to20",
|
||||
@@ -106,28 +151,55 @@ async def list_post(key_find: str, data: str = None):
|
||||
# age_sort = "point.bigger60"
|
||||
# else:
|
||||
# age_sort = point_data[history.get("age")//10]
|
||||
posts = await db["posts"].find({"translation_post": {"$regex": key_find}}).sort(age_sort, -1).to_list(100)
|
||||
# posts = await db["posts"].find("$or": [
|
||||
# {"translation_post": {"$regex": key_find}},
|
||||
# {"translation_post": {"$regex": key_find}},
|
||||
|
||||
print(posts)
|
||||
return posts
|
||||
# ]).sort(age_sort, -1).to_list(100)
|
||||
|
||||
posts = await db["posts"].find({
|
||||
"$or": [
|
||||
{"translation_post": {"$regex": key_find, "$options": 'i'}},
|
||||
{"data.content": {"$regex": key_find, "$options": 'i'}},
|
||||
{"original_post": {"$regex": key_find, "$options": 'i'}},
|
||||
{"summary": {"$regex": key_find, "$options": 'i'}},
|
||||
]
|
||||
}).sort(age_sort, -1).to_list(100)
|
||||
output = []
|
||||
if token.token != None:
|
||||
posts = jsonable_encoder(posts)
|
||||
for post in posts:
|
||||
data_token = await get_current_user(token.token)
|
||||
data = data_token.get("user_name", None)
|
||||
user_type = data_token.get("user_type", None)
|
||||
if data == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
post = jsonable_encoder(post)
|
||||
|
||||
count = await db["post_save"].count_documents({"post_id": post["_id"], "username": data})
|
||||
if count != 0:
|
||||
post["post_save"] = True
|
||||
else:
|
||||
post["post_save"] = False
|
||||
output.append(post)
|
||||
return output
|
||||
|
||||
|
||||
@post.post(
|
||||
@ post.post(
|
||||
"/get_post_by_name"
|
||||
# , response_description="Get a single post", response_model=PostModel
|
||||
)
|
||||
async def get_post_by_name(history: HistoryFindModel):
|
||||
try:
|
||||
history.created_at = datetime.datetime.now(tz=tz)
|
||||
history = jsonable_encoder(history)
|
||||
token = history.get("token", None)
|
||||
if token == '':
|
||||
datetime_now = datetime.now()
|
||||
history["created_at"] = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
post = await db["posts"].find_one({"_id": history["post_id"]})
|
||||
print(post)
|
||||
for dt in post["data"]:
|
||||
if dt["level"] != ["*"]:
|
||||
dt["content"] = "Bạn không có quyền xem nội dung này, vui lòng sử dụng tải khoản được cấp quyền để xem nội dung"
|
||||
if not ROLE_PUBLIC in dt["level"]:
|
||||
if dt["level"] != ["*"]:
|
||||
post["data"].remove(dt)
|
||||
history = jsonable_encoder(history)
|
||||
new_his = await db["history_find"].insert_one(history)
|
||||
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
|
||||
@@ -138,22 +210,22 @@ async def get_post_by_name(history: HistoryFindModel):
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
content={'message': "Bài viết không tồn tại"}
|
||||
)
|
||||
data_token = await get_current_user(history["token"])
|
||||
data_token = await get_current_user(token)
|
||||
data = data_token.get("user_name", None)
|
||||
user_type = data_token.get("user_type", None)
|
||||
if data == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
else:
|
||||
datetime_now = datetime.now()
|
||||
history["created_at"] = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
post = await db["posts"].find_one({"_id": history["post_id"]})
|
||||
for dt in post["data"]:
|
||||
if dt["level"] != ["*"]:
|
||||
if not user_type in dt["level"]:
|
||||
dt["content"] = "Bạn không có quyền xem nội dung này, vui lòng sử dụng tải khoản được cấp quyền để xem nội dung"
|
||||
if not ROLE_PUBLIC in dt["level"]:
|
||||
if not user_type in dt["level"]:
|
||||
post["data"].remove(dt)
|
||||
history = jsonable_encoder(history)
|
||||
new_his = await db["history_find"].insert_one(history)
|
||||
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
|
||||
print(post)
|
||||
if post is not None:
|
||||
return post
|
||||
else:
|
||||
@@ -173,9 +245,8 @@ async def get_post_by_name(history: HistoryFindModel):
|
||||
)
|
||||
async def get_post_edit(history: HistoryFindModel):
|
||||
try:
|
||||
datetime_now = datetime.now()
|
||||
history.created_at = datetime.datetime.now(tz=tz)
|
||||
history = jsonable_encoder(history)
|
||||
history["created_at"] = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
post = await db["posts"].find_one({"_id": history["post_id"]})
|
||||
new_his = await db["history_find"].insert_one(history)
|
||||
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
|
||||
@@ -210,7 +281,6 @@ async def score_all_post(
|
||||
posts = await db["posts"].find().to_list(1000)
|
||||
|
||||
for dt_post in posts:
|
||||
print(dt_post)
|
||||
data_old = dt_post
|
||||
dt_post["point"]["less10"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
|
||||
"age": {
|
||||
@@ -268,8 +338,7 @@ async def create_upload_files(
|
||||
i = 0
|
||||
file_location = f"../media/"
|
||||
for file in files:
|
||||
now = datetime.now()
|
||||
current_time = now.strftime("%H:%M:%S_%d-%m-%Y_")
|
||||
current_time = datetime.datetime.now(tz=tz)
|
||||
file_save = file_location + current_time + str(i) + file.filename
|
||||
file_name.append(current_time + str(i) + file.filename)
|
||||
i = i + 1
|
||||
@@ -293,7 +362,7 @@ async def create_upload_post(
|
||||
|
||||
file_name = []
|
||||
i = 0
|
||||
now = datetime.now()
|
||||
now = datetime.datetime.now(tz=tz)
|
||||
current_time = now.strftime("%H_%M_%S_%d-%m-%Y_")
|
||||
folder_save = f"./post/" + current_time + str(i)
|
||||
Path(folder_save).mkdir(parents=True)
|
||||
@@ -366,13 +435,15 @@ async def video_endpoint(video_name
|
||||
@post.post("/edit_post/{id}", response_description="score all post", response_model=UpdatePostModel)
|
||||
async def edit_post(id: str, post: UpdatePostModel):
|
||||
try:
|
||||
post.updated_at = datetime.datetime.now(tz=tz)
|
||||
post = jsonable_encoder(post)
|
||||
created_post = await db["posts"].find_one({"_id": id})
|
||||
post["created_at"] = created_post["created_at"]
|
||||
await db["posts"].update_one({"_id": id}, {"$set":
|
||||
post
|
||||
})
|
||||
created_post = await db["posts"].find_one({"_id": id})
|
||||
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=created_post)
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=post)
|
||||
except Exception as e:
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
|
||||
@@ -31,7 +31,10 @@ from datetime import datetime, timedelta
|
||||
import requests
|
||||
import re
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
import pytz
|
||||
import datetime
|
||||
tz = pytz.timezone('Asia/Ho_Chi_Minh')
|
||||
history = APIRouter()
|
||||
|
||||
post_save = APIRouter()
|
||||
# ============= Creating path operations ==============
|
||||
@@ -51,12 +54,16 @@ async def create_post(post_save: SavePost):
|
||||
data = data_output.get("user_name", None)
|
||||
if data == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
|
||||
datetime_now = datetime.now()
|
||||
post_save.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
|
||||
count = await db["posts"].count_documents({"post_id": post_save.post_id, "username": data})
|
||||
if count == 0:
|
||||
return f"Post {post_save.post_id} not found"
|
||||
post_save.created_at = datetime.datetime.now(tz=tz)
|
||||
post_save = jsonable_encoder(post_save)
|
||||
del post_save["token"]
|
||||
post_save["username"] = data
|
||||
created_post = await db["post_save"].count_documents({"post_id": post_save["post_id"], "username": data})
|
||||
if created_post == 1:
|
||||
return {"message": f'Post {post_save["post_id"]} saved'}
|
||||
new_post = await db["post_save"].insert_one(post_save)
|
||||
created_post = await db["post_save"].find_one({"_id": new_post.inserted_id})
|
||||
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_post)
|
||||
@@ -64,7 +71,7 @@ async def create_post(post_save: SavePost):
|
||||
return NameError
|
||||
|
||||
|
||||
@post_save.get(
|
||||
@post_save.post(
|
||||
"/list_save_post_by_user", response_description="List save posts", response_model=SavePostModel
|
||||
)
|
||||
async def list_post(post_save: SavePost):
|
||||
@@ -79,7 +86,34 @@ async def list_post(post_save: SavePost):
|
||||
data = data_output.get("user_name", None)
|
||||
if data == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
posts = await db["post_save"].find({"username": data, "is_active": "true"}).to_list(1000)
|
||||
posts = await db["post_save"].find({"username": data}).to_list(1000)
|
||||
print(posts)
|
||||
print(len(posts))
|
||||
return posts
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=posts)
|
||||
|
||||
|
||||
@post_save.delete("/delete_save_post", response_description="Delete save post")
|
||||
async def delete_save_post(post_save: SavePost):
|
||||
# delete_result = await db["post_save"].delete_one({"_id": user_id})
|
||||
try:
|
||||
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
|
||||
payload = {'token': post_save.token}
|
||||
headers = {
|
||||
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
|
||||
}
|
||||
response = requests.request(
|
||||
"POST", url, headers=headers, data=payload)
|
||||
data_output = json.loads(response.text)
|
||||
data = data_output.get("user_name", None)
|
||||
if data == None:
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
|
||||
|
||||
post_save = jsonable_encoder(post_save)
|
||||
delete_result = await db["post_save"].delete_one({"post_id": post_save["post_id"], "username": data})
|
||||
if delete_result.deleted_count == 1:
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "Delete post save successful"})
|
||||
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f'Post save {post_save["post_id"]} not found')
|
||||
except NameError:
|
||||
return NameError
|
||||
|
||||
@@ -13,8 +13,10 @@ from fastapi import File, UploadFile, FastAPI
|
||||
from ..models.models import (
|
||||
UserModel,
|
||||
ShowUserModel,
|
||||
UpdateUserModel
|
||||
UpdateUserModel,
|
||||
token_test
|
||||
)
|
||||
from ..models.post import *
|
||||
from ..dependecies import (
|
||||
get_current_user,
|
||||
authenticate_user,
|
||||
@@ -132,40 +134,30 @@ async def login_for_access_token(body: LoginRequest):
|
||||
@router.post("/token")
|
||||
async def login_for_access_token_2(body: OAuth2PasswordRequestForm = Depends()):
|
||||
user = await authenticate_user_oauth2(body.username, body.password)
|
||||
print(body)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorect ID or password",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
# access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
# access_token = create_access_token(
|
||||
# data={"sub": user["username"]}, expires_delta=access_token_expires
|
||||
# )
|
||||
# await db["users"].update_one({"username": body.username}, {"$set": {
|
||||
# "last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
|
||||
# "is_active": "true"
|
||||
# }})
|
||||
|
||||
return {"access_token": user["access_token"], "token_type": "bearer"}
|
||||
|
||||
|
||||
@router.get(
|
||||
"/list", response_description="List all users", response_model=List[ShowUserModel]
|
||||
)
|
||||
async def list_users():
|
||||
users = await db["users"].find().to_list(1000)
|
||||
for user in users:
|
||||
user["is_active"] = "false"
|
||||
try:
|
||||
last_login = datetime.strptime(
|
||||
user["last_login"], "%m/%d/%y %H:%M:%S")
|
||||
my_delta = datetime.now() - last_login
|
||||
if my_delta <= timedelta(days=30):
|
||||
user["is_active"] = "true"
|
||||
except ValueError:
|
||||
pass
|
||||
# @router.get(
|
||||
# "/list", response_description="List all users", response_model=List[ShowUserModel]
|
||||
# )
|
||||
# async def list_users():
|
||||
# users = await db["users"].find().to_list(1000)
|
||||
# for user in users:
|
||||
# user["is_active"] = "false"
|
||||
# try:
|
||||
# last_login = datetime.strptime(
|
||||
# user["last_login"], "%m/%d/%y %H:%M:%S")
|
||||
# my_delta = datetime.now() - last_login
|
||||
# if my_delta <= timedelta(days=30):
|
||||
# user["is_active"] = "true"
|
||||
# except ValueError:
|
||||
# pass
|
||||
|
||||
|
||||
@router.post("/current", response_description="Current User")
|
||||
@@ -210,30 +202,39 @@ async def refresh_token(refresh_token: TokenModel):
|
||||
return json.loads(response.text)
|
||||
|
||||
|
||||
@router.put("/admin/{user_id}", response_description="Update a user", response_model=UpdateUserModel)
|
||||
async def update_user(user_id: str, user: UpdateUserModel):
|
||||
if current_user["role"] == "admin":
|
||||
user = {k: v for k, v in user.dict().items() if v is not None}
|
||||
if len(user) >= 1:
|
||||
update_result = await db["users"].update_one({"_id": user_id}, {"$set": user})
|
||||
if update_result.modified_count == 1:
|
||||
if (
|
||||
updated_user := await db["users"].find_one({"_id": user_id})
|
||||
) is not None:
|
||||
return updated_user
|
||||
if (existing_user := await db["users"].find_one({"_id": user_id})) is not None:
|
||||
return existing_user
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"User {user_id} not found")
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=403, detail=f"Not having sufficient rights to modify the content")
|
||||
# @router.put("/admin/{user_id}", response_description="Update a user", response_model=UpdateUserModel)
|
||||
# async def update_user(user_id: str, user: UpdateUserModel):
|
||||
# if current_user["role"] == "admin":
|
||||
# user = {k: v for k, v in user.dict().items() if v is not None}
|
||||
# if len(user) >= 1:
|
||||
# update_result = await db["users"].update_one({"_id": user_id}, {"$set": user})
|
||||
# if update_result.modified_count == 1:
|
||||
# if (
|
||||
# updated_user := await db["users"].find_one({"_id": user_id})
|
||||
# ) is not None:
|
||||
# return updated_user
|
||||
# if (existing_user := await db["users"].find_one({"_id": user_id})) is not None:
|
||||
# return existing_user
|
||||
# raise HTTPException(
|
||||
# status_code=404, detail=f"User {user_id} not found")
|
||||
# else:
|
||||
# raise HTTPException(
|
||||
# status_code=403, detail=f"Not having sufficient rights to modify the content")
|
||||
|
||||
|
||||
@router.delete("/delete_user/{user_id}", response_description="Delete a user")
|
||||
async def delete_user(user_id: str):
|
||||
delete_result = await db["users"].delete_one({"_id": user_id})
|
||||
if delete_result.deleted_count == 1:
|
||||
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
|
||||
# @router.delete("/delete_user/{user_id}", response_description="Delete a user")
|
||||
# async def delete_user(user_id: str):
|
||||
# delete_result = await db["users"].delete_one({"_id": user_id})
|
||||
# if delete_result.deleted_count == 1:
|
||||
# return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
|
||||
|
||||
raise HTTPException(status_code=404, detail=f"User {user_id} not found")
|
||||
# raise HTTPException(status_code=404, detail=f"User {user_id} not found")
|
||||
|
||||
|
||||
# @router.get(
|
||||
# "/list1111", response_description="List all users", response_model=List[UpdatePostModel]
|
||||
# )
|
||||
# async def list_users(token: token_test = Depends(get_current_user)):
|
||||
# print(token)
|
||||
# users = await db["posts"].find().to_list(1000)
|
||||
# return users
|
||||
|
||||
Reference in New Issue
Block a user