Added app directory

This commit is contained in:
AtamanKit
2021-07-21 12:12:58 +03:00
parent b243b421dd
commit b8d5811936
12 changed files with 9 additions and 9 deletions

7
app/.dockerignore Normal file
View File

@@ -0,0 +1,7 @@
venv
.git
.gitignore
.idea
__pycache__

12
app/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM python:3.8-slim-buster
WORKDIR /app
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
EXPOSE 80
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]

0
app/__init__.py Normal file
View File

45
app/main.py Normal file
View File

@@ -0,0 +1,45 @@
#===================== Importing FastAPI necessary packages =============
from fastapi import (
FastAPI,
HTTPException,
status,
Request,
)
from src.dependecies import authenticate_user
from src.routers import router
import base64
import binascii
#------------------ FastAPI variable ----------------------------------
app = FastAPI()
# ================ Authentication Middleware =======================
#----------- Here authentication is based on basic scheme,
#----------- another authentication, based on bearer scheme, is used throughout
#---------- the application (as decribed in FastAPI oficial documentation)
@app.middleware("http")
async def authenticate(request: Request, call_next):
#-------------------- Authentication basic scheme -----------------------------
if "Authorization" in request.headers:
auth = request.headers["Authorization"]
try:
scheme, credentials = auth.split()
if scheme.lower() == 'basic':
decoded = base64.b64decode(credentials).decode("ascii")
username, _, password = decoded.partition(":")
request.state.user = await authenticate_user(username, password)
except (ValueError, UnicodeDecodeError, binascii.Error):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid basic auth credentials"
)
response = await call_next(request)
return response
app.include_router(router)

25
app/requirements.txt Normal file
View File

@@ -0,0 +1,25 @@
asgiref==3.4.1
bcrypt==3.2.0
cffi==1.14.5
click==8.0.1
colorama==0.4.4
cryptography==3.4.7
dnspython==2.1.0
ecdsa==0.17.0
email-validator==1.1.3
fastapi==0.66.0
h11==0.12.0
idna==3.2
motor==2.4.0
passlib==1.7.4
pyasn1==0.4.8
pycparser==2.20
pydantic==1.8.2
pymongo==3.11.4
python-jose==3.3.0
python-multipart==0.0.5
rsa==4.7.2
six==1.16.0
starlette==0.14.2
typing-extensions==3.10.0.0
uvicorn==0.14.0

0
app/src/__init__.py Normal file
View File

63
app/src/dependecies.py Normal file
View File

@@ -0,0 +1,63 @@
from fastapi import Depends, HTTPException, status
from jose import JWTError, jwt
from .settings import pwd_context, db, oauth2_scheme, SECRET_KEY, ALGORITHM
from datetime import datetime, timedelta
from typing import Optional
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
async def get_user(id: str):
if (user := await db["users"].find_one({"_id": id})) is not None:
return user
async def authenticate_user(id: str, password: str):
user = await get_user(id)
if not user:
return False
if not verify_password(password, user["hashed_pass"]):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
# token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = await get_user(username)
if user is None:
raise credentials_exception
return user

92
app/src/models.py Normal file
View File

@@ -0,0 +1,92 @@
from bson import ObjectId
from pydantic import BaseModel, Field
from typing import Optional
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
class UserModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
first_name: str
last_name: str
role : str
is_active : str
created_at: Optional[str] = None
last_login: str
password: str
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"first_name": "John",
"last_name": "Doe",
"role": "simple mortal",
"is_active": "false",
"created_at": "datetime",
"last_login": "datetime",
"password": "fakehashedsecret",
}
}
class UpdateUserModel(BaseModel):
first_name: Optional[str]
last_name: Optional[str]
role: Optional[str]
is_active: Optional[str]
created_at: Optional[str]
last_login: Optional[str]
class Config:
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"first_name": "John",
"last_name": "Doe",
"role": "simple mortal",
"is_active": "false",
"created_at": "datetime",
"last_login": "datetime",
}
}
class ShowUserModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
first_name: Optional[str]
last_name: Optional[str]
role: Optional[str]
is_active: Optional[str]
created_at: Optional[str]
last_login: Optional[str]
class Config:
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"first_name": "John",
"last_name": "Doe",
"role": "simple mortal",
"created_at": "datetime",
"last_login": "datetime",
}
}

129
app/src/routers.py Normal file
View File

@@ -0,0 +1,129 @@
from fastapi import (
APIRouter,
Depends,
status,
HTTPException
)
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from fastapi.security import OAuth2PasswordRequestForm
from .models import (
UserModel,
ShowUserModel,
UpdateUserModel
)
from .dependecies import (
get_current_user,
authenticate_user,
create_access_token,
get_password_hash
)
from src.settings import db, ACCESS_TOKEN_EXPIRE_MINUTES
from typing import List
from datetime import datetime, timedelta
import re
router = APIRouter()
# ============= Creating path operation ==============
@router.post("/", response_description="Add new user", response_model=UserModel)
async def create_user(user: UserModel):
if re.match("admin|dev|simple mortal", user.role):
datetime_now = datetime.now()
user.created_at = datetime_now.strftime("%m/%d/%y %H:%M:%S")
user.password = get_password_hash(user.password)
user = jsonable_encoder(user)
new_user = await db["users"].insert_one(user)
await db["users"].update_one({"_id": new_user.inserted_id}, {
"$rename": {"password": "hashed_pass"}})
created_user = await db["users"].find_one({"_id": new_user.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_user)
raise HTTPException(status_code=406, detail="User role not acceptable")
@router.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorect ID or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["_id"]}, expires_delta=access_token_expires
)
await db["users"].update_one({"_id": form_data.username}, {"$set": {
"last_login": datetime.now().strftime("%m/%d/%y %H:%M:%S"),
"is_active": "true"
}})
return {"access_token": access_token, "token_type": "bearer"}
@router.get(
"/list", response_description="List all users", response_model=List[ShowUserModel]
)
async def list_users():
users = await db["users"].find().to_list(1000)
for user in users:
user["is_active"] = "false"
try:
last_login = datetime.strptime(user["last_login"], "%m/%d/%y %H:%M:%S")
my_delta = datetime.now() - last_login
if my_delta <= timedelta(days=30):
user["is_active"] = "true"
except ValueError:
pass
return users
@router.get("/current", response_description="Current User", response_model=ShowUserModel)
async def current_user(current_user: ShowUserModel = Depends(get_current_user)):
return current_user
@router.put("/admin/{user_id}", response_description="Update a user", response_model=UpdateUserModel)
async def update_user(user_id: str, user: UpdateUserModel, current_user: UserModel = Depends(get_current_user)):
if current_user["role"] == "admin":
user = {k: v for k, v in user.dict().items() if v is not None}
if len(user) >= 1:
update_result = await db["users"].update_one({"_id": user_id}, {"$set": user})
if update_result.modified_count == 1:
if (
updated_user := await db["users"].find_one({"_id": user_id})
) is not None:
return updated_user
if (existing_user := await db["users"].find_one({"_id": user_id})) is not None:
return existing_user
raise HTTPException(status_code=404, detail=f"User {user_id} not found")
else:
raise HTTPException(status_code=403, detail=f"Not having sufficient rights to modify the content")
@router.delete("/{user_id}", response_description="Delete a user")
async def delete_user(user_id: str):
delete_result = await db["users"].delete_one({"_id": user_id})
if delete_result.deleted_count == 1:
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
raise HTTPException(status_code=404, detail=f"User {user_id} not found")

20
app/src/settings.py Normal file
View File

@@ -0,0 +1,20 @@
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
import os
import motor.motor_asyncio
# ================= Creating necessary variables ========================
#------------------ Token, authentication variables ---------------------
SECRET_KEY = "4ab5be85c8c56eecdd547f7831979be83de58a6768d10a314f54cda4e4d67ffe"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
#----------------- Database variables (MongoDB) --------------------------
client = motor.motor_asyncio.AsyncIOMotorClient(os.environ["DB_URL"])
db = client.myTestDB