Files
medihome-dictionary-be/app/src/routers/routers.py

242 lines
8.6 KiB
Python
Raw Normal View History

2022-08-02 14:25:51 +07:00
from os import access
from urllib import response
2021-07-21 08:05:29 +03:00
from fastapi import (
APIRouter,
Depends,
status,
HTTPException
)
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from fastapi.security import OAuth2PasswordRequestForm
2022-07-26 13:46:19 +07:00
from fastapi import File, UploadFile, FastAPI
from ..models.models import (
2021-07-21 08:05:29 +03:00
UserModel,
ShowUserModel,
2022-08-04 14:48:50 +07:00
UpdateUserModel,
token_test
2021-07-21 08:05:29 +03:00
)
2022-08-04 14:48:50 +07:00
from ..models.post import *
2022-07-26 13:46:19 +07:00
from ..dependecies import (
2021-07-21 08:05:29 +03:00
get_current_user,
authenticate_user,
2022-08-02 14:25:51 +07:00
authenticate_user_oauth2,
2021-07-21 08:05:29 +03:00
create_access_token,
get_password_hash
)
2022-07-26 13:46:19 +07:00
from ..settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
2022-08-02 14:25:51 +07:00
import json
2021-07-21 08:05:29 +03:00
from typing import List
from datetime import datetime, timedelta
2022-08-02 14:25:51 +07:00
import requests
2021-07-21 08:05:29 +03:00
import re
2022-07-26 13:46:19 +07:00
from pydantic import BaseModel, Field
2022-08-02 14:25:51 +07:00
2022-07-26 13:46:19 +07:00
class LoginRequest(BaseModel):
username: str
password: str
2022-08-02 14:25:51 +07:00
grant_type: str
class TokenModel(BaseModel):
token: str = None
refresh_token: str = None
grant_type: str = None
2022-08-08 10:18:10 +07:00
created_at: datetime = None
2022-08-02 14:25:51 +07:00
2021-07-21 08:05:29 +03:00
router = APIRouter()
2021-07-21 14:38:05 +03:00
# ============= Creating path operations ==============
2021-07-21 08:05:29 +03:00
2022-08-02 14:25:51 +07:00
# @router.post("/create_user", response_description="Add new user", response_model=UserModel)
# async def create_user(user: UserModel
# # , file: UploadFile = File(...)
# ):
# if re.match("admin|user|editor", user.role):
# datetime_now = datetime.now()
# user.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
# user.password = get_password_hash(user.password)
# user = jsonable_encoder(user)
# new_user = await db["users"].insert_one(user)
# await db["users"].update_one({"_id": new_user.inserted_id}, {
# "$rename": {"password": "hashed_pass"}})
# created_user = await db["users"].find_one({"_id": new_user.inserted_id})
# return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_user)
# raise HTTPException(status_code=406, detail="User role not acceptable")
2021-07-21 08:05:29 +03:00
2022-07-26 13:46:19 +07:00
@router.post("/login")
async def login_for_access_token(body: LoginRequest):
2022-08-02 14:25:51 +07:00
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/token"
payload = {'username': body.username,
'password': body.password,
'grant_type': body.grant_type}
files = [
]
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0=',
'Cookie': 'JSESSIONID=node0gmjiiq3ht7kv1gesg74t1pxsb20316.node0; XSRF-TOKEN=0976f6e0-814e-4be9-b6fa-b8d0c0896315'
}
response = requests.request(
"POST", url, headers=headers, data=payload, files=files)
access_token = json.loads(response.text)
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
payload = {'token': access_token["access_token"]}
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
}
response = requests.request(
"POST", url, headers=headers, data=payload)
data_output = json.loads(response.text)
data = data_output.get("user_name", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
user = json.loads(response.text)
access_token["authorities"] = user["authorities"]
return access_token
# @router.post("/token")
# async def login_for_access_token(body: OAuth2PasswordRequestForm = Depends()):
# url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/token"
# payload = {'username': body.username,
# 'password': body.password,
# 'grant_type': body.grant_type}
# files = [
# ]
# headers = {
# 'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0=',
# 'Cookie': 'JSESSIONID=node0gmjiiq3ht7kv1gesg74t1pxsb20316.node0; XSRF-TOKEN=0976f6e0-814e-4be9-b6fa-b8d0c0896315'
# }
# response = requests.request(
# "POST", url, headers=headers, data=payload, files=files)
# data = json.loads(response.text)
# del data["refresh_token"]
# del data["expires_in"]
# del data["scope"]
# del data["user_type"]
# del data["user_key"]
# return data
2021-07-21 08:05:29 +03:00
2022-07-26 13:46:19 +07:00
@router.post("/token")
2022-08-02 14:25:51 +07:00
async def login_for_access_token_2(body: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user_oauth2(body.username, body.password)
2022-07-26 13:46:19 +07:00
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorect ID or password",
headers={"WWW-Authenticate": "Bearer"},
)
2022-08-02 14:25:51 +07:00
return {"access_token": user["access_token"], "token_type": "bearer"}
2021-07-21 08:05:29 +03:00
2022-08-04 14:48:50 +07:00
# @router.get(
# "/list", response_description="List all users", response_model=List[ShowUserModel]
# )
# async def list_users():
# users = await db["users"].find().to_list(1000)
# for user in users:
# user["is_active"] = "false"
# try:
# last_login = datetime.strptime(
# user["last_login"], "%m/%d/%y %H:%M:%S")
# my_delta = datetime.now() - last_login
# if my_delta <= timedelta(days=30):
# user["is_active"] = "true"
# except ValueError:
# pass
2021-07-21 08:05:29 +03:00
2022-08-02 14:25:51 +07:00
@router.post("/current", response_description="Current User")
async def current_user(token: TokenModel):
try:
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/check_token"
payload = {'token': token.token}
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0='
}
response = requests.request(
"POST", url, headers=headers, data=payload)
data_output = json.loads(response.text)
data = data_output.get("user_name", None)
if data == None:
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "UNAUTHORIZED"})
else:
return json.loads(response.text)
except ValueError:
pass
# @router.get("/current", response_description="Current User", response_model=ShowUserModel)
# async def current_user(current_user: ShowUserModel = Depends(get_current_user)):
# return current_user
@router.post("/refresh_token", response_description="refresh token")
async def refresh_token(refresh_token: TokenModel):
url = "https://sandboxapi.ebacsi.com.vn/auth/oauth/token"
payload = {'refresh_token': refresh_token.refresh_token,
'grant_type': refresh_token.grant_type}
files = [
]
headers = {
'Authorization': 'Basic RGljdGlvbmFyeU1lZGlob21lOlJ4aXR6ZnZvaWFmZmNtb2l0ZW0=',
'Cookie': 'JSESSIONID=node0oxtulscdyhrr1ij9hfhpjl76825093.node0; XSRF-TOKEN=79789f05-27c4-4b2a-a0dc-4491894046ec'
}
response = requests.request(
"POST", url, headers=headers, data=payload, files=files)
return json.loads(response.text)
2021-07-21 08:05:29 +03:00
2022-08-04 14:48:50 +07:00
# @router.put("/admin/{user_id}", response_description="Update a user", response_model=UpdateUserModel)
# async def update_user(user_id: str, user: UpdateUserModel):
# if current_user["role"] == "admin":
# user = {k: v for k, v in user.dict().items() if v is not None}
# if len(user) >= 1:
# update_result = await db["users"].update_one({"_id": user_id}, {"$set": user})
# if update_result.modified_count == 1:
# if (
# updated_user := await db["users"].find_one({"_id": user_id})
# ) is not None:
# return updated_user
# if (existing_user := await db["users"].find_one({"_id": user_id})) is not None:
# return existing_user
# raise HTTPException(
# status_code=404, detail=f"User {user_id} not found")
# else:
# raise HTTPException(
# status_code=403, detail=f"Not having sufficient rights to modify the content")
# @router.delete("/delete_user/{user_id}", response_description="Delete a user")
# async def delete_user(user_id: str):
# delete_result = await db["users"].delete_one({"_id": user_id})
# if delete_result.deleted_count == 1:
# return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
# raise HTTPException(status_code=404, detail=f"User {user_id} not found")
# @router.get(
# "/list1111", response_description="List all users", response_model=List[UpdatePostModel]
# )
# async def list_users(token: token_test = Depends(get_current_user)):
# print(token)
# users = await db["posts"].find().to_list(1000)
# return users