ngay 2-8-2022

This commit is contained in:
2022-08-02 14:25:51 +07:00
parent 9e3a18756c
commit f541dd9956
15 changed files with 963 additions and 325 deletions

View File

@@ -11,7 +11,11 @@ from src.settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
from ..models.history_find import *
from typing import List
from datetime import datetime, timedelta
from ..models.post import (
PostModel,
UpdatePostModel,
ShowPostModel
)
from ..models.models import (
UserModel,
ShowUserModel,
@@ -27,31 +31,49 @@ import re
history = APIRouter()
##############################POST###############################################
@history.post("/create_history", response_description="Add new user", response_model=HistoryFindModel)
async def create_history(history: HistoryFindModel, current_user: ShowUserModel = Depends(get_current_user)):
datetime_now = datetime.now()
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = jsonable_encoder(post)
new_post = await db["history_find"].insert_one(post)
created = await db["history_find"].find_one({"_id": new_post.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created)
@history.post("/create_history", response_description="history", response_model=HistoryFindModel)
async def create_history(history: HistoryFindModel):
datetime_now = datetime.now()
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = jsonable_encoder(post)
new_post = await db["history_find"].insert_one(post)
created = await db["history_find"].find_one({"_id": new_post.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created)
@history.get(
"/list_history", response_description="List all posts", response_model=List[HistoryFindModel]
)
async def list_post(current_user: ShowUserModel = Depends(get_current_user)):
async def list_post():
history_find = await db["history_find"].find().to_list(1000)
return history_find
# @post.get(
# "/get_post_by_name", response_description="Get a single posot", response_model=PostModel
# )
# async def show_post(id: str):
# print(id)
# post = await db["posts"].find_one({"_id": id})
# print(post)
# if post is not None:
# return post
# else:
# return None
@ history.get("/list_history_by_user", response_description="Get list Posts viewed", response_model=List[HistoryFindModel])
async def get_list_post_view_by_username(username: str, current_user: ShowUserModel = Depends(get_current_user)):
history_find = await db["history_find"].find({"username": current_user["username"]}).to_list(10)
return history_find
# @ history.get("/list_post_by_user", response_description="Get list Posts viewed", response_model=List[ShowPostModel])
# async def get_list_post_view_by_username(username: str, current_user: ShowUserModel = Depends(get_current_user)):
# history_find = await db["history_find"].find({"username": username}).to_list(10)
# post_view = []
# for dt in history_find:
# print(dt["post_id"])
# post = await db["posts"].find_one({"_id": dt["post_id"]})
# post_view.append(post)
# return post_view
@history.get("/list_post_by_user", response_description="Get list Posts viewed", response_model=List[HistoryFindByUserModel])
async def get_list_post_view_by_username(current_user: ShowUserModel = Depends(get_current_user)):
history_find = await db["history_find"].find({"username": current_user["username"]}).to_list(10)
post_view = []
for dt in history_find:
print(dt["post_id"])
post = await db["posts"].find_one({"_id": dt["post_id"]})
dt["post"] = post
return history_find

View File

@@ -1,8 +1,9 @@
from email.policy import default
from fastapi import APIRouter,Depends,status,HTTPException,UploadFile, File, Header
from fastapi import APIRouter, Depends, status, HTTPException, UploadFile, File, Header, Request
from fastapi.responses import JSONResponse, FileResponse, StreamingResponse
from fastapi.encoders import jsonable_encoder
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles
import logging
from ..dependecies import (
get_current_user,
@@ -32,137 +33,238 @@ from typing import List
from datetime import datetime
import os
import re
from pathlib import Path
import codecs
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
post = APIRouter()
##############################POST###############################################
@post.post("/create_post", response_description="Add new user", response_model=PostModel)
async def create_post(post: PostModel, current_user: UserModel = Depends(get_current_user)):
@post.get("/post_html", response_class=HTMLResponse)
async def post_html(content: str, request: Request):
templates = Jinja2Templates(directory="post")
return templates.TemplateResponse(content, {"request": request})
@post.post("/create_post", response_description="Add new post", response_model=PostModel)
async def create_post(post: PostModel,
# current_user: UserModel = Depends(get_current_user)
):
try:
print(post)
# if current_user["role"] == "user":
# return JSONResponse(content="Role User không được phép tạo bài viết")
datetime_now = datetime.now()
post.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = jsonable_encoder(post)
if(post.get("point", None) == None):
post["point"] = {
"less10": 0,
"form10to20": 0,
"form20to30": 0,
"form30to40": 0,
"form40to50": 0,
"form50to60": 0,
"bigger60": 0,
"total": 0
}
new_post = await db["posts"].insert_one(post)
created_post = await db["posts"].find_one({"_id": new_post.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_post)
except NameError:
return NameError
@post.get(
"/list_post", response_description="List all posts", response_model=List[ShowPostModel]
"/list_post", response_description="List all posts"
)
async def list_post(current_user: UserModel = Depends(get_current_user)):
async def list_post():
posts = await db["posts"].find().to_list(1000)
print(posts)
print(len(posts))
return posts
@post.post(
"/find_list_post", response_description="search list posts", response_model=List[ShowPostModel]
)
async def list_post(history: HistoryFindModel, current_user: UserModel = Depends(get_current_user)):
async def list_post(key_find: str, data: str = None):
point_data=["point.less10",
"point.form10to20",
"point.form20to30",
"point.form30to40",
"point.form40to50",
"point.form50to60",
]
# point_data = ["point.less10",
# "point.form10to20",
# "point.form20to30",
# "point.form30to40",
# "point.form40to50",
# "point.form50to60",
# ]
age_sort = "point.total"
history = jsonable_encoder(history)
if history.get("age", None) != None:
if history.get("age") >59:
age_sort = "point.bigger60"
else:
age_sort = point_data[history.get("age")//10]
posts = await db["posts"].find({"translation_post": { "$regex": history.get("key_find") } }).sort(age_sort, -1).to_list(100)
# history = jsonable_encoder(history)
# if history.get("age", None) != None:
# if history.get("age") > 59:
# age_sort = "point.bigger60"
# else:
# age_sort = point_data[history.get("age")//10]
posts = await db["posts"].find({"translation_post": {"$regex": key_find}}).sort(age_sort, -1).to_list(100)
print(posts)
return posts
@post.post(
"/get_post_by_name", response_description="Get a single post", response_model=PostModel
"/get_post_by_name"
# , response_description="Get a single post", response_model=PostModel
)
async def get_post_by_name(history: HistoryFindModel, current_user: UserModel = Depends(get_current_user)
):
datetime_now = datetime.now()
history.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = await db["posts"].find_one({"_id": history.post_id})
history = jsonable_encoder(history)
new_his = await db["history_find"].insert_one(history)
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
print(created)
if post is not None:
return post
else:
return None
async def get_post_by_name(history: HistoryFindModel):
try:
history = jsonable_encoder(history)
token = history.get("token", None)
if token == '':
datetime_now = datetime.now()
history["created_at"] = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = await db["posts"].find_one({"_id": history["post_id"]})
print(post)
for dt in post["data"]:
if dt["level"] != ["*"]:
dt["content"] = "Bạn không có quyền xem nội dung này, vui lòng sử dụng tải khoản được cấp quyền để xem nội dung"
history = jsonable_encoder(history)
new_his = await db["history_find"].insert_one(history)
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
if post is not None:
return post
else:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': "Bài viết không tồn tại"}
)
data_token = await get_current_user(history["token"])
data = data_token.get("user_name", None)
user_type = data_token.get("user_type", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
else:
datetime_now = datetime.now()
history["created_at"] = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = await db["posts"].find_one({"_id": history["post_id"]})
for dt in post["data"]:
if dt["level"] != ["*"]:
if not user_type in dt["level"]:
dt["content"] = "Bạn không có quyền xem nội dung này, vui lòng sử dụng tải khoản được cấp quyền để xem nội dung"
history = jsonable_encoder(history)
new_his = await db["history_find"].insert_one(history)
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
if post is not None:
return post
else:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': "Bài viết không tồn tại"}
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': str(e)}
)
@post.post(
"/get_post_edit"
)
async def get_post_edit(history: HistoryFindModel):
try:
datetime_now = datetime.now()
history = jsonable_encoder(history)
history["created_at"] = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post = await db["posts"].find_one({"_id": history["post_id"]})
new_his = await db["history_find"].insert_one(history)
created = await db["history_find"].find_one({"_id": new_his.inserted_id})
if post is not None:
return post
else:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': "Bài viết không tồn tại"}
)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': str(e)}
)
@post.delete("/delete_post/{id}", response_description="Delete a post")
async def delete_user(id: str, current_user: UserModel = Depends(get_current_user)):
async def delete_user(id: str,
# current_user: UserModel = Depends(get_current_user)
):
delete_result = await db["posts"].delete_one({"_id": id})
if delete_result.deleted_count == 1:
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
raise HTTPException(status_code=404, detail=f"Post {id} not found")
@post.get("/score", response_description="score all post", response_model=List[ShowPostModel])
async def score_all_post( current_user: UserModel = Depends(get_current_user)):
async def score_all_post(
# current_user: UserModel = Depends(get_current_user)
):
posts = await db["posts"].find().to_list(1000)
for dt_post in posts:
print(dt_post)
data_old = dt_post
dt_post["point"]["less10"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 0, '$lte': 9
}})
"age": {
'$gte': 0, '$lte': 9
}})
dt_post["point"]["form10to20"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 10, '$lte': 19
}})
"age": {
'$gte': 10, '$lte': 19
}})
dt_post["point"]["form20to30"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 20, '$lte': 29
}})
"age": {
'$gte': 20, '$lte': 29
}})
dt_post["point"]["form30to40"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 30, '$lte': 39
}})
"age": {
'$gte': 30, '$lte': 39
}})
dt_post["point"]["form40to50"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 40, '$lte': 49
}})
"age": {
'$gte': 40, '$lte': 49
}})
dt_post["point"]["form50to60"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 50, '$lte': 59
}})
"age": {
'$gte': 50, '$lte': 59
}})
dt_post["point"]["bigger60"] = await db["history_find"].count_documents({"post_id": dt_post["_id"],
"age":{
'$gte': 60
}})
"age": {
'$gte': 60
}})
dt_post["point"]["total"] = await db["history_find"].count_documents({"post_id": dt_post["_id"]})
# await db["posts"].update_one(data_old, dt_post)
await db["posts"].update_one({"_id": dt_post["_id"]}, {"$set": {
"point":{
"less10": dt_post["point"]["less10"],
"form10to20": dt_post["point"]["form10to20"],
"form20to30": dt_post["point"]["form20to30"],
"form30to40": dt_post["point"]["form30to40"],
"form40to50": dt_post["point"]["form40to50"],
"form50to60": dt_post["point"]["form50to60"],
"bigger60": dt_post["point"]["bigger60"],
"total": dt_post["point"]["total"]
}
"point": {
"less10": dt_post["point"]["less10"],
"form10to20": dt_post["point"]["form10to20"],
"form20to30": dt_post["point"]["form20to30"],
"form30to40": dt_post["point"]["form30to40"],
"form40to50": dt_post["point"]["form40to50"],
"form50to60": dt_post["point"]["form50to60"],
"bigger60": dt_post["point"]["bigger60"],
"total": dt_post["point"]["total"]
}
}})
return posts
return posts
@post.post("/uploadfiles/")
async def create_upload_files(
files: List[UploadFile] = File(...), current_user: UserModel = Depends(get_current_user)
files: List[UploadFile] = File(...),
# current_user: UserModel = Depends(get_current_user)
):
try:
file_name =[]
file_name = []
i = 0
file_location = f"../media/"
for file in files:
@@ -172,31 +274,107 @@ async def create_upload_files(
file_name.append(current_time + str(i) + file.filename)
i = i + 1
with open(file_save, "wb+") as file_object:
file_object.write(file.file.read())
file_object.write(file.file.read())
return {"filenames": file_name}
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': str(e)}
)
@post.post("/upload_post/")
async def create_upload_post(
post: List[UploadFile] = File(...),
image: List[UploadFile] = File(...),
# current_user: UserModel = Depends(get_current_user)
):
try:
file_name = []
i = 0
now = datetime.now()
current_time = now.strftime("%H_%M_%S_%d-%m-%Y_")
folder_save = f"./post/" + current_time + str(i)
Path(folder_save).mkdir(parents=True)
for file in post:
file_save = folder_save + "/" + file.filename
file_name.append(current_time + str(i) + "/" + file.filename)
i = i + 1
with open(file_save, "wb+") as file_object:
file_object.write(file.file.read())
folder_save = folder_save + "/images"
Path(folder_save).mkdir(parents=True, exist_ok=True)
for file in image:
file_save = folder_save + "/" + file.filename
i = i + 1
with open(file_save, "wb+") as file_object:
file_object.write(file.file.read())
return {"filenames": file_name}
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': str(e)}
)
# @post.get("/show_post")
# async def show_file(name: str):
# try:
# f = codecs.open(
# "/app/post/04_52_34_27-07-2022_0/formatted.html", "r", "utf-8")
# f.read()
# return {"data": f.read()}
# except Exception as e:
# return JSONResponse(
# status_code=status.HTTP_400_BAD_REQUEST,
# content={'message': str(e)}
# )
@post.get("/showfile/{name}")
async def show_file(name: str, current_user: UserModel = Depends(get_current_user)):
async def show_file(name: str
# , current_user: UserModel = Depends(get_current_user)
):
file_path = f"../media/" + name
if os.path.exists(file_path):
return FileResponse(file_path)
return {"erro": "File not found!"}
@post.get("/showvideo/{file_name}", response_class=FileResponse)
async def main(file_name: str, current_user: UserModel = Depends(get_current_user)):
async def main(file_name: str
# , current_user: UserModel = Depends(get_current_user)
):
file_path = f"../media/" + file_name
return file_path
@post.get("/video")
async def video_endpoint(video_name, current_user: UserModel = Depends(get_current_user)):
async def video_endpoint(video_name
# , current_user: UserModel = Depends(get_current_user)
):
def iterfile():
video_path = f"../media/" + video_name
with open(video_path, mode="rb") as file_like:
yield from file_like
return StreamingResponse(iterfile(), media_type="video/mp4")
return StreamingResponse(iterfile(), media_type="video/mp4")
@post.post("/edit_post/{id}", response_description="score all post", response_model=UpdatePostModel)
async def edit_post(id: str, post: UpdatePostModel):
try:
post = jsonable_encoder(post)
await db["posts"].update_one({"_id": id}, {"$set":
post
})
created_post = await db["posts"].find_one({"_id": id})
return JSONResponse(status_code=status.HTTP_200_OK, content=created_post)
except Exception as e:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'message': str(e)}
)

View File

@@ -0,0 +1,85 @@
from os import access
from urllib import response
from fastapi import (
APIRouter,
Depends,
status,
HTTPException
)
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from fastapi.security import OAuth2PasswordRequestForm
from fastapi import File, UploadFile, FastAPI
from ..models.models import (
UserModel,
ShowUserModel,
UpdateUserModel
)
from ..models.save_post import *
from ..dependecies import (
get_current_user,
authenticate_user,
authenticate_user_oauth2,
create_access_token,
get_password_hash
)
from ..settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
import json
from typing import List
from datetime import datetime, timedelta
import requests
import re
from pydantic import BaseModel, Field
post_save = APIRouter()
# ============= Creating path operations ==============
@post_save.post("/save_post", response_description="save new post", response_model=SavePostModel)
async def create_post(post_save: SavePost):
try:
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
payload = {'token': post_save.token}
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
}
response = requests.request(
"POST", url, headers=headers, data=payload)
data_output = json.loads(response.text)
data = data_output.get("user_name", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
datetime_now = datetime.now()
post_save.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
post_save = jsonable_encoder(post_save)
del post_save["token"]
post_save["username"] = data
new_post = await db["post_save"].insert_one(post_save)
created_post = await db["post_save"].find_one({"_id": new_post.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_post)
except NameError:
return NameError
@post_save.get(
"/list_save_post_by_user", response_description="List save posts", response_model=SavePostModel
)
async def list_post(post_save: SavePost):
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
payload = {'token': post_save.token}
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
}
response = requests.request(
"POST", url, headers=headers, data=payload)
data_output = json.loads(response.text)
data = data_output.get("user_name", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
posts = await db["post_save"].find({"username": data, "is_active": "true"}).to_list(1000)
print(posts)
print(len(posts))
return posts

View File

@@ -1,3 +1,5 @@
from os import access
from urllib import response
from fastapi import (
APIRouter,
Depends,
@@ -16,71 +18,120 @@ from ..models.models import (
from ..dependecies import (
get_current_user,
authenticate_user,
authenticate_user_oauth2,
create_access_token,
get_password_hash
)
from ..settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
import json
from typing import List
from datetime import datetime, timedelta
import requests
import re
from pydantic import BaseModel, Field
class LoginRequest(BaseModel):
username: str
password: str
grant_type: str
class TokenModel(BaseModel):
token: str = None
refresh_token: str = None
grant_type: str = None
router = APIRouter()
# ============= Creating path operations ==============
@router.post("/create_user", response_description="Add new user", response_model=UserModel)
async def create_user(user: UserModel, file: UploadFile = File(...)):
if re.match("admin|dev|simple mortal", user.role):
datetime_now = datetime.now()
user.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
user.password = get_password_hash(user.password)
user = jsonable_encoder(user)
file_location = f"../media/"
current_time = datetime_now.strftime("%H:%M:%S_%d-%m-%Y_")
file_save = file_location + current_time + file.filename
user.avatar = current_time + file.filename
with open(file_save, "wb+") as file_object:
file_object.write(file.file.read())
new_user = await db["users"].insert_one(user)
await db["users"].update_one({"_id": new_user.inserted_id}, {
"$rename": {"password": "hashed_pass"}})
created_user = await db["users"].find_one({"_id": new_user.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_user)
raise HTTPException(status_code=406, detail="User role not acceptable")
# @router.post("/create_user", response_description="Add new user", response_model=UserModel)
# async def create_user(user: UserModel
# # , file: UploadFile = File(...)
# ):
# if re.match("admin|user|editor", user.role):
# datetime_now = datetime.now()
# user.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
# user.password = get_password_hash(user.password)
# user = jsonable_encoder(user)
# new_user = await db["users"].insert_one(user)
# await db["users"].update_one({"_id": new_user.inserted_id}, {
# "$rename": {"password": "hashed_pass"}})
# created_user = await db["users"].find_one({"_id": new_user.inserted_id})
# return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_user)
# raise HTTPException(status_code=406, detail="User role not acceptable")
@router.post("/login")
async def login_for_access_token(body: LoginRequest):
print(body)
user = await authenticate_user(body.username, body.password)
print(body)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorect ID or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]}, expires_delta=access_token_expires
)
await db["users"].update_one({"username": body.username}, {"$set": {
"last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
"is_active": "true"
}})
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/token"
payload = {'username': body.username,
'password': body.password,
'grant_type': body.grant_type}
files = [
]
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0=',
'Cookie': 'JSESSIONID=node0gmjiiq3ht7kv1gesg74t1pxsb20316.node0; XSRF-TOKEN=0976f6e0-814e-4be9-b6fa-b8d0c0896315'
}
response = requests.request(
"POST", url, headers=headers, data=payload, files=files)
access_token = json.loads(response.text)
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
payload = {'token': access_token["access_token"]}
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
}
response = requests.request(
"POST", url, headers=headers, data=payload)
data_output = json.loads(response.text)
data = data_output.get("user_name", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
user = json.loads(response.text)
access_token["authorities"] = user["authorities"]
return access_token
# @router.post("/token")
# async def login_for_access_token(body: OAuth2PasswordRequestForm = Depends()):
# url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/token"
# payload = {'username': body.username,
# 'password': body.password,
# 'grant_type': body.grant_type}
# files = [
# ]
# headers = {
# 'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0=',
# 'Cookie': 'JSESSIONID=node0gmjiiq3ht7kv1gesg74t1pxsb20316.node0; XSRF-TOKEN=0976f6e0-814e-4be9-b6fa-b8d0c0896315'
# }
# response = requests.request(
# "POST", url, headers=headers, data=payload, files=files)
# data = json.loads(response.text)
# del data["refresh_token"]
# del data["expires_in"]
# del data["scope"]
# del data["user_type"]
# del data["user_key"]
# return data
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/token")
async def login_for_access_token_2(body: LoginRequest):
print(body)
user = await authenticate_user(body.username, body.password)
async def login_for_access_token_2(body: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user_oauth2(body.username, body.password)
print(body)
if not user:
raise HTTPException(
@@ -88,69 +139,99 @@ async def login_for_access_token_2(body: LoginRequest):
detail="Incorect ID or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]}, expires_delta=access_token_expires
)
await db["users"].update_one({"username": body.username}, {"$set": {
"last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
"is_active": "true"
}})
# access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# access_token = create_access_token(
# data={"sub": user["username"]}, expires_delta=access_token_expires
# )
# await db["users"].update_one({"username": body.username}, {"$set": {
# "last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
# "is_active": "true"
# }})
return {"access_token": user["access_token"], "token_type": "bearer"}
return {"access_token": access_token, "token_type": "bearer"}
@router.get(
"/list", response_description="List all users", response_model=List[ShowUserModel]
)
async def list_users(current_user: ShowUserModel = Depends(get_current_user)):
async def list_users():
users = await db["users"].find().to_list(1000)
for user in users:
user["is_active"] = "false"
try:
last_login = datetime.strptime(user["last_login"], "%m/%d/%y %H:%M:%S")
last_login = datetime.strptime(
user["last_login"], "%m/%d/%y %H:%M:%S")
my_delta = datetime.now() - last_login
if my_delta <= timedelta(days=30):
user["is_active"] = "true"
except ValueError:
pass
return users
@router.post("/current", response_description="Current User")
async def current_user(token: TokenModel):
try:
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
payload = {'token': token.token}
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
}
response = requests.request(
"POST", url, headers=headers, data=payload)
data_output = json.loads(response.text)
data = data_output.get("user_name", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
else:
return json.loads(response.text)
except ValueError:
pass
@router.get("/current", response_description="Current User", response_model=ShowUserModel)
async def current_user(current_user: ShowUserModel = Depends(get_current_user)):
return current_user
# @router.get("/current", response_description="Current User", response_model=ShowUserModel)
# async def current_user(current_user: ShowUserModel = Depends(get_current_user)):
# return current_user
@router.post("/refresh_token", response_description="refresh token")
async def refresh_token(refresh_token: TokenModel):
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/token"
payload = {'refresh_token': refresh_token.refresh_token,
'grant_type': refresh_token.grant_type}
files = [
]
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0=',
'Cookie': 'JSESSIONID=node0oxtulscdyhrr1ij9hfhpjl76825093.node0; XSRF-TOKEN=79789f05-27c4-4b2a-a0dc-4491894046ec'
}
response = requests.request(
"POST", url, headers=headers, data=payload, files=files)
return json.loads(response.text)
@router.put("/admin/{user_id}", response_description="Update a user", response_model=UpdateUserModel)
async def update_user(user_id: str, user: UpdateUserModel, current_user: UserModel = Depends(get_current_user)):
async def update_user(user_id: str, user: UpdateUserModel):
if current_user["role"] == "admin":
user = {k: v for k, v in user.dict().items() if v is not None}
if len(user) >= 1:
update_result = await db["users"].update_one({"_id": user_id}, {"$set": user})
if update_result.modified_count == 1:
if (
updated_user := await db["users"].find_one({"_id": user_id})
) is not None:
return updated_user
if (existing_user := await db["users"].find_one({"_id": user_id})) is not None:
return existing_user
raise HTTPException(status_code=404, detail=f"User {user_id} not found")
raise HTTPException(
status_code=404, detail=f"User {user_id} not found")
else:
raise HTTPException(status_code=403, detail=f"Not having sufficient rights to modify the content")
raise HTTPException(
status_code=403, detail=f"Not having sufficient rights to modify the content")
@router.delete("/delete_user/{user_id}", response_description="Delete a user")
async def delete_user(user_id: str, current_user: ShowUserModel = Depends(get_current_user)):
async def delete_user(user_id: str):
delete_result = await db["users"].delete_one({"_id": user_id})
if delete_result.deleted_count == 1:
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)