This commit is contained in:
QuangMinh_123
2026-05-27 13:50:27 +07:00
parent 7aebcf9567
commit 2683cdb882
30 changed files with 2091 additions and 17 deletions

View File

@@ -1,3 +1,4 @@
# pyrefly: ignore [missing-import]
from flask import Flask
from common.exceptions.handler import (
@@ -9,6 +10,14 @@ from modules.device_type.routes import (
device_type_bp
)
from modules.uploads.upload_routes import(
upload_bp
)
from modules.device.routes import (
device_bp
)
app = Flask(__name__)
# Register Global Exception Handlers
@@ -17,7 +26,17 @@ register_error_handlers(app)
# Register Blueprints
app.register_blueprint(
device_type_bp,
url_prefix="/api"
url_prefix="/api/device-types"
)
app.register_blueprint(
upload_bp,
url_prefix="/api/uploads"
)
app.register_blueprint(
device_bp,
url_prefix="/api/devices"
)
# @app.route("/")

View File

@@ -0,0 +1,109 @@
# pyrefly: ignore [missing-import]
from flask import request
from common.response.api_response import success_response
from common.constants.status_code import HTTP_CREATED
from modules.device.service import (
get_devices_service,
get_device_by_id_service,
create_device_service,
update_device_service,
delete_device_service
)
from modules.device.schemas import CreateDeviceSchema, UpdateDeviceSchema
def get_devices():
"""GET /api/devices — Lấy danh sách tất cả các thiết bị mạng"""
devices = get_devices_service()
return success_response(
data=devices,
message="Devices retrieved successfully"
)
def get_device_by_id(device_id):
"""GET /api/devices/<device_id> — Lấy chi tiết thiết bị theo ID"""
device = get_device_by_id_service(device_id)
return success_response(
data=device,
message="Device retrieved successfully"
)
def create_device():
"""
POST /api/devices — Thêm mới một thiết bị mạng
"""
body = request.get_json()
# Validate bằng schema
schema = CreateDeviceSchema()
data = schema.load(body)
# Gọi service xử lý logic nghiệp vụ
new_device = create_device_service(data)
return success_response(
data=new_device,
message="Device created successfully",
status_code=HTTP_CREATED
)
def update_device(device_id):
"""
PUT /api/devices/<device_id> — Cập nhật thông tin thiết bị mạng
"""
body = request.get_json()
# Validate bằng schema
schema = UpdateDeviceSchema()
data = schema.load(body)
# Gọi service để kiểm tra và cập nhật DB
updated_device = update_device_service(device_id, data)
return success_response(
data=updated_device,
message="Device updated successfully"
)
def delete_device(device_id):
"""
DELETE /api/devices/<device_id> — Xóa thiết bị mạng khỏi hệ thống
"""
delete_device_service(device_id)
return success_response(
message="Device deleted successfully"
)
# def upload_device_avatar_controller(device_id):
# """
# POST /api/devices/<device_id>/avatar — Upload ảnh đại diện thiết bị lên S3/MinIO
# và cập nhật URL vào database.
# """
# # 1. Kiểm tra tồn tại của thiết bị
# device = get_device_by_id_service(device_id)
# # 2. Lấy file ảnh từ request.files
# file = request.files.get("file")
# if not file:
# raise BadRequestException("Missing image file in request")
# try:
# # 3. Thực hiện upload lên S3/MinIO
# avatar_url = upload_device_avatar(file, device["name"])
# # 4. Cập nhật trường avatar_url của thiết bị trong DB
# updated_device = update_device_service(device_id, {"avatar_url": avatar_url})
# return success_response(
# data=updated_device,
# message="Device avatar uploaded successfully",
# status_code=HTTP_CREATED
# )
# except ValueError as e:
# # ValidationError trong quá trình validate loại file
# raise BadRequestException(str(e))

View File

@@ -0,0 +1,25 @@
from common.exceptions.app_exception import (
NotFoundException,
ConflictException
)
class DeviceNotFoundException(NotFoundException):
def __init__(self, device_id):
super().__init__(
message=f"Device not found with id={device_id}",
payload={"device_id": device_id}
)
class DeviceAlreadyExistsException(ConflictException):
def __init__(self, name):
super().__init__(
message=f"Device already exists with name={name}",
payload={"name": name}
)
class DeviceIPAlreadyExistsException(ConflictException):
def __init__(self, ip_address):
super().__init__(
message=f"Device already exists with IP address={ip_address}",
payload={"ip_address": ip_address}
)

View File

@@ -0,0 +1,293 @@
from config.database import get_connection, release_connection
def _row_to_dict(row):
"""
Chuyển đổi một dòng kết quả từ tuple (từ câu lệnh JOIN) thành dictionary.
"""
if not row:
return None
device_dict = {
"id": str(row[0]),
"device_type_id": str(row[1]),
"name": row[2],
"description": row[3],
"ip_address": row[4],
"port": row[5],
"latitude": row[6],
"longitude": row[7],
"color": row[8],
"avatar_url": row[9],
"is_active": row[10],
"created": row[11].isoformat() if row[11] else None,
"modified": row[12].isoformat() if row[12] else None
}
# Nếu câu truy vấn có JOIN với device_type
if len(row) > 13:
device_dict["device_type_name"] = row[13]
device_dict["device_type_icon"] = row[14]
return device_dict
def find_all_devices():
"""
Lấy danh sách tất cả các thiết bị trong database, JOIN với bảng device_type
để hiển thị tên loại thiết bị và icon đại diện.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
SELECT
d.id, d.device_type_id, d.name, d.description, d.ip_address, d.port,
d.latitude, d.longitude, d.color, d.avatar_url, d.is_active, d.created, d.modified,
dt.name as device_type_name, dt.icon_url as device_type_icon
FROM device d
JOIN device_type dt ON d.device_type_id = dt.id
ORDER BY d.created DESC
""")
rows = cur.fetchall()
devices = []
for row in rows:
devices.append(_row_to_dict(row))
return devices
finally:
if cur:
cur.close()
release_connection(conn)
def find_device_by_id(device_id):
"""
Tìm thiết bị theo ID, JOIN với device_type để lấy thông tin chi tiết.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
SELECT
d.id, d.device_type_id, d.name, d.description, d.ip_address, d.port,
d.latitude, d.longitude, d.color, d.avatar_url, d.is_active, d.created, d.modified,
dt.name as device_type_name, dt.icon_url as device_type_icon
FROM device d
JOIN device_type dt ON d.device_type_id = dt.id
WHERE d.id = %s
""", (device_id,))
row = cur.fetchone()
return _row_to_dict(row) if row else None
finally:
if cur:
cur.close()
release_connection(conn)
def find_device_by_name(name):
"""
Tìm thiết bị theo tên (không phân biệt hoa thường) để phục vụ validate trùng tên.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
SELECT id, device_type_id, name, description, ip_address, port, latitude, longitude, color, avatar_url, is_active, created, modified
FROM device
WHERE LOWER(name) = LOWER(%s)
""", (name,))
row = cur.fetchone()
return _row_to_dict(row) if row else None
finally:
if cur:
cur.close()
release_connection(conn)
def find_device_by_ip(ip_address):
"""
Tìm thiết bị theo địa chỉ IP để phục vụ validate tránh trùng lặp IP thiết bị.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
SELECT id, device_type_id, name, description, ip_address, port, latitude, longitude, color, avatar_url, is_active, created, modified
FROM device
WHERE ip_address = %s
""", (ip_address,))
row = cur.fetchone()
return _row_to_dict(row) if row else None
finally:
if cur:
cur.close()
release_connection(conn)
def insert_device(data):
"""
Tạo mới một thiết bị và cấu hình mặc định (MonitorConfig & AlertConfig)
trong cùng một database transaction.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
# 1. Thêm thiết bị vào bảng device
cur.execute("""
INSERT INTO device (device_type_id, name, description, ip_address, port, latitude, longitude, color, avatar_url, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id, device_type_id, name, description, ip_address, port, latitude, longitude, color, avatar_url, is_active, created, modified
""", (
data["device_type_id"],
data["name"],
data.get("description"),
data["ip_address"],
data.get("port"),
data["latitude"],
data["longitude"],
data["color"],
data.get("avatar_url"),
data.get("is_active", True)
))
device_row = cur.fetchone()
device_id = device_row[0]
# 2. Thêm cấu hình giám sát mặc định (MonitorConfig) cho thiết bị vừa tạo
# enable_ping mặc định bật True để thực hiện Ping giám sát
cur.execute("""
INSERT INTO monitor_config (device_id, enable_ping, ping_count, ping_timeout, ping_interval, enable_snmp)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
device_id,
True, # enable_ping
3, # ping_count
5, # ping_timeout (giây)
60, # ping_interval (giây)
False # enable_snmp
))
# 3. Thêm cấu hình cảnh báo mặc định (AlertConfig) cho thiết bị vừa tạo
cur.execute("""
INSERT INTO alert_config (device_id, is_enabled, fail_threshold, cooldown_minutes, notify_web, notify_email)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
device_id,
True, # is_enabled
3, # fail_threshold (số lần fail liên tiếp trước khi alert)
30, # cooldown_minutes (thời gian tránh spam alert)
True, # notify_web (hiển thị thông báo trên web)
False # notify_email (mặc định tắt gửi email)
))
# 4. Lấy lại thiết bị kèm thông tin DeviceType đã JOIN để trả về đầy đủ dữ liệu
cur.execute("""
SELECT
d.id, d.device_type_id, d.name, d.description, d.ip_address, d.port,
d.latitude, d.longitude, d.color, d.avatar_url, d.is_active, d.created, d.modified,
dt.name as device_type_name, dt.icon_url as device_type_icon
FROM device d
JOIN device_type dt ON d.device_type_id = dt.id
WHERE d.id = %s
""", (device_id,))
full_row = cur.fetchone()
# Commit toàn bộ transaction
conn.commit()
return _row_to_dict(full_row)
except Exception:
conn.rollback()
raise
finally:
if cur:
cur.close()
release_connection(conn)
def update_device_db(device_id, data):
"""
Cập nhật thông tin chi tiết của một thiết bị.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
update_fields = []
params = []
fields_list = [
"device_type_id", "name", "description",
"ip_address", "port", "latitude",
"longitude", "color", "avatar_url", "is_active"
]
for key in fields_list:
if key in data:
update_fields.append(f"{key} = %s")
params.append(data[key])
if not update_fields:
return find_device_by_id(device_id)
update_fields.append("modified = CURRENT_TIMESTAMP")
sql = f"""
UPDATE device
SET {', '.join(update_fields)}
WHERE id = %s
RETURNING id, device_type_id, name, description, ip_address, port, latitude, longitude, color, avatar_url, is_active, created, modified
"""
params.append(device_id)
cur.execute(sql, tuple(params))
row = cur.fetchone()
conn.commit()
if row:
# Lấy chi tiết có kèm JOIN device_type để trả về đầy đủ
return find_device_by_id(device_id)
return None
except Exception:
conn.rollback()
raise
finally:
if cur:
cur.close()
release_connection(conn)
def delete_device_db(device_id):
"""
Xóa thiết bị khỏi database. Do có khóa ngoại với Cascade Delete,
tất cả các dòng liên quan trong monitor_config, alert_config, device_status, alert_log sẽ tự động bị xóa.
"""
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
DELETE FROM device
WHERE id = %s
""", (device_id,))
conn.commit()
except Exception:
conn.rollback()
raise
finally:
if cur:
cur.close()
release_connection(conn)

View File

@@ -0,0 +1,20 @@
# pyrefly: ignore [missing-import]
from flask import Blueprint
from modules.device.controller import (
get_devices,
get_device_by_id,
create_device,
update_device,
delete_device
)
# Khởi tạo Blueprint cho Module Device
device_bp = Blueprint("device", __name__)
# Đăng ký các endpoints với controller tương ứng
device_bp.route("", methods=["GET"])(get_devices)
device_bp.route("/<device_id>", methods=["GET"])(get_device_by_id)
device_bp.route("", methods=["POST"])(create_device)
device_bp.route("/<device_id>", methods=["PUT"])(update_device)
device_bp.route("/<device_id>", methods=["DELETE"])(delete_device)

View File

@@ -0,0 +1,120 @@
import ipaddress
# pyrefly: ignore [missing-import]
from marshmallow import Schema, fields, validate, ValidationError
HEX_COLOR_REGEX = r"^#[0-9A-Fa-f]{6}$"
def validate_ip_address(val):
try:
ipaddress.ip_address(val)
except ValueError:
raise ValidationError("Invalid IP address format. Must be a valid IPv4 or IPv6 address.")
class CreateDeviceSchema(Schema):
device_type_id = fields.UUID(
required=True,
error_messages={"required": "Device type ID is required."}
)
name = fields.String(
required=True,
validate=validate.Length(min=1, max=200),
error_messages={"required": "Device name is required."}
)
description = fields.String(
required=False,
allow_none=True
)
ip_address = fields.String(
required=True,
validate=validate_ip_address,
error_messages={"required": "IP address is required."}
)
port = fields.Integer(
required=False,
allow_none=True,
validate=validate.Range(min=1, max=65535)
)
latitude = fields.Float(
required=True,
validate=validate.Range(min=-90.0, max=90.0),
error_messages={"required": "Latitude is required."}
)
longitude = fields.Float(
required=True,
validate=validate.Range(min=-180.0, max=180.0),
error_messages={"required": "Longitude is required."}
)
color = fields.String(
required=True,
validate=validate.Regexp(HEX_COLOR_REGEX),
error_messages={"required": "Color code (HEX) is required."}
)
avatar_url = fields.String(
required=False,
allow_none=True,
validate=validate.Length(max=512)
)
is_active = fields.Boolean(
required=False,
load_default=True
)
class UpdateDeviceSchema(Schema):
device_type_id = fields.UUID(
required=False
)
name = fields.String(
required=False,
validate=validate.Length(min=1, max=200)
)
description = fields.String(
required=False,
allow_none=True
)
ip_address = fields.String(
required=False,
validate=validate_ip_address
)
port = fields.Integer(
required=False,
allow_none=True,
validate=validate.Range(min=1, max=65535)
)
latitude = fields.Float(
required=False,
validate=validate.Range(min=-90.0, max=90.0)
)
longitude = fields.Float(
required=False,
validate=validate.Range(min=-180.0, max=180.0)
)
color = fields.String(
required=False,
validate=validate.Regexp(HEX_COLOR_REGEX)
)
avatar_url = fields.String(
required=False,
allow_none=True,
validate=validate.Length(max=512)
)
is_active = fields.Boolean(
required=False
)

View File

@@ -0,0 +1,138 @@
from modules.device.repository import (
find_all_devices,
find_device_by_id,
find_device_by_name,
find_device_by_ip,
insert_device,
update_device_db,
delete_device_db
)
from modules.device.exceptions import (
DeviceNotFoundException,
DeviceAlreadyExistsException,
DeviceIPAlreadyExistsException
)
from modules.device_type.repository import find_device_type_by_id
from modules.device_type.exceptions import DeviceTypeNotFoundException
from scheduler.scheduler import (
add_device_monitoring_job,
remove_device_monitoring_job,
reschedule_device_monitoring_job
)
def get_devices_service():
"""Lấy danh sách tất cả thiết bị"""
return find_all_devices()
def get_device_by_id_service(device_id):
"""
Lấy thông tin chi tiết một thiết bị theo ID.
Ném lỗi DeviceNotFoundException nếu không tồn tại.
"""
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
return device
def create_device_service(data):
"""
Tạo mới một thiết bị mạng.
Các bước xử lý:
1. Kiểm tra loại thiết bị (device_type_id) có tồn tại trong hệ thống hay không.
2. Kiểm tra trùng lặp tên thiết bị (case-insensitive).
3. Kiểm tra trùng lặp địa chỉ IP.
4. Thêm thiết bị và cấu hình mặc định (monitor & alert) vào database.
5. Đăng ký công việc giám sát vào Scheduler nền.
"""
# 1. Kiểm tra DeviceType
device_type = find_device_type_by_id(data["device_type_id"])
if not device_type:
raise DeviceTypeNotFoundException(data["device_type_id"])
# 2. Kiểm tra trùng tên
existing_name = find_device_by_name(data["name"])
if existing_name:
raise DeviceAlreadyExistsException(data["name"])
# 3. Kiểm tra trùng IP
existing_ip = find_device_by_ip(data["ip_address"])
if existing_ip:
raise DeviceIPAlreadyExistsException(data["ip_address"])
# 4. Insert DB
new_device = insert_device(data)
# 5. Kích hoạt Job giám sát trên Background Scheduler
# Truyền kèm thông tin cấu hình mặc định (enable_ping=True, v.v...)
add_device_monitoring_job(new_device["id"], None)
return new_device
def update_device_service(device_id, data):
"""
Cập nhật thông tin thiết bị mạng.
Các bước xử lý:
1. Kiểm tra sự tồn tại của thiết bị.
2. Nếu cập nhật device_type_id -> kiểm tra xem có tồn tại không.
3. Nếu cập nhật name -> kiểm tra xem có bị trùng với thiết bị khác không.
4. Nếu cập nhật ip_address -> kiểm tra xem có bị trùng với thiết bị khác không.
5. Cập nhật dữ liệu vào DB.
6. Cập nhật lại cấu hình giám sát trong Scheduler.
"""
# 1. Kiểm tra thiết bị có tồn tại hay không
existing = find_device_by_id(device_id)
if not existing:
raise DeviceNotFoundException(device_id)
# 2. Kiểm tra loại thiết bị nếu có truyền vào
new_device_type_id = data.get("device_type_id")
if new_device_type_id and new_device_type_id != existing["device_type_id"]:
device_type = find_device_type_by_id(new_device_type_id)
if not device_type:
raise DeviceTypeNotFoundException(new_device_type_id)
# 3. Kiểm tra trùng tên khi tên bị thay đổi
new_name = data.get("name")
if new_name and new_name.lower() != existing["name"].lower():
conflict_name = find_device_by_name(new_name)
if conflict_name and conflict_name["id"] != device_id:
raise DeviceAlreadyExistsException(new_name)
# 4. Kiểm tra trùng IP khi IP bị thay đổi
new_ip = data.get("ip_address")
if new_ip and new_ip != existing["ip_address"]:
conflict_ip = find_device_by_ip(new_ip)
if conflict_ip and conflict_ip["id"] != device_id:
raise DeviceIPAlreadyExistsException(new_ip)
# 5. Cập nhật DB
updated_device = update_device_db(device_id, data)
# 6. Cập nhật lại thông tin giám sát trong Scheduler
reschedule_device_monitoring_job(device_id, None)
return updated_device
def delete_device_service(device_id):
"""
Xóa thiết bị mạng.
Các bước xử lý:
1. Kiểm tra sự tồn tại của thiết bị.
2. Thực hiện xóa khỏi DB (tự động xóa cấu hình và lịch sử liên quan).
3. Hủy bỏ job giám sát khỏi Scheduler.
"""
# 1. Kiểm tra tồn tại
existing = find_device_by_id(device_id)
if not existing:
raise DeviceNotFoundException(device_id)
# 2. Xóa khỏi DB
delete_device_db(device_id)
# 3. Hủy công việc giám sát trong Scheduler
remove_device_monitoring_job(device_id)

View File

@@ -0,0 +1 @@
{"name":"Local: controller","url":"/Users/danghongquangminh/Downloads/NetworkDeviceManagementSystem_Project/backend/modules/device_type/controller.py","tests":[{"id":1779345290807,"input":"","output":""}],"interactive":false,"memoryLimit":1024,"timeLimit":3000,"srcPath":"/Users/danghongquangminh/Downloads/NetworkDeviceManagementSystem_Project/backend/modules/device_type/controller.py","group":"local","local":true}

View File

@@ -8,14 +8,19 @@
# 3. Gọi service → trả response
# ============================================
# pyrefly: ignore [missing-import]
from flask import request
from common.response.api_response import success_response
from common.constants.status_code import HTTP_CREATED
from modules.device_type.service import (
get_device_types_service,
create_device_type_service
create_device_type_service,
get_device_type_by_id_service,
update_device_type_service,
delete_device_type_service
)
from modules.device_type.schemas import CreateDeviceTypeSchema
from modules.device_type.schemas import CreateDeviceTypeSchema, UpdateDeviceTypeSchema
def get_device_types():
@@ -29,8 +34,11 @@ def get_device_types():
def get_device_type_by_id(device_type_id):
"""GET /api/device-types/<device_type_id> — Lấy chi tiết theo ID"""
# TODO: Triển khai sau
pass
device_type = get_device_type_by_id_service(device_type_id)
return success_response(
data=device_type,
message="Device type retrieved successfully"
)
def create_device_type():
@@ -73,11 +81,20 @@ def create_device_type():
def update_device_type(device_type_id):
"""PUT /api/device-types/<device_type_id> — Cập nhật"""
# TODO: Triển khai sau
pass
body = request.get_json()
schema = UpdateDeviceTypeSchema()
data = schema.load(body)
updated_device_type = update_device_type_service(device_type_id, data)
return success_response(
data=updated_device_type,
message="Device type updated successfully"
)
def delete_device_type(device_type_id):
"""DELETE /api/device-types/<device_type_id> — Xóa"""
# TODO: Triển khai sau
pass
delete_device_type_service(device_type_id)
return success_response(
message="Device type deleted successfully"
)

View File

@@ -29,7 +29,7 @@ def find_all_device_types():
cur = conn.cursor()
cur.execute("""
SELECT id, name, description, icon_url, color, sort_order, is_active, created, modified
FROM device_types
FROM device_type
ORDER BY sort_order ASC, name ASC
""")
@@ -63,7 +63,7 @@ def find_device_type_by_name(name):
# VD: "Router" và "router" coi như trùng
cur.execute("""
SELECT id, name, description, icon_url, color, sort_order, is_active, created, modified
FROM device_types
FROM device_type
WHERE LOWER(name) = LOWER(%s)
""", (name,))
# ⚠️ LƯU Ý: (name,) có dấu phẩy → tạo tuple 1 phần tử
@@ -90,7 +90,7 @@ def insert_device_type(data):
try:
cur = conn.cursor()
cur.execute("""
INSERT INTO device_types (name, description, icon_url, color, sort_order, is_active)
INSERT INTO device_type (name, description, icon_url, color, sort_order, is_active)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, name, description, icon_url, color, sort_order, is_active, created, modified
""", (
@@ -114,6 +114,129 @@ def insert_device_type(data):
conn.rollback() # Nếu có lỗi → rollback để hủy transaction, tránh dữ liệu bẩn
raise # raise lại exception để tầng trên (service/controller) xử lý
finally:
if cur:
cur.close()
release_connection(conn)
# ============================================
# FIND BY ID: Tìm device type theo ID
# Trả về dict nếu tìm thấy, None nếu không
# ============================================
def find_device_type_by_id(device_type_id):
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
SELECT id, name, description, icon_url, color, sort_order, is_active, created, modified
FROM device_type
WHERE id = %s
""", (device_type_id,))
row = cur.fetchone()
if row:
return _row_to_dict(row)
return None
finally:
if cur:
cur.close()
release_connection(conn)
# ============================================
# CHECK IN USE: Kiểm tra xem device type có đang được dùng bởi thiết bị nào không ?
# Trả về True nếu có, False nếu không
# ============================================
def is_device_type_in_use(device_type_id):
conn = get_connection()
cur = None
try:
cur = conn.cursor()
# Kiểm tra sự tồn tại trong bảng devices
cur.execute("""
SELECT EXISTS(
SELECT 1 FROM device
WHERE device_type_id = %s
)
""", (device_type_id,))
row = cur.fetchone()
return row[0] if row else False
finally:
if cur:
cur.close()
release_connection(conn)
# ============================================
# UPDATE: Cập nhật thông tin device type
# Trả về dict của device type sau khi cập nhật
# ============================================
def update_device_type_db(device_type_id, data):
conn = get_connection()
cur = None
try:
cur = conn.cursor()
# Xây dựng câu lệnh UPDATE động tùy vào các trường có trong data
update_fields = []
params = []
for key in ["name", "description", "icon_url", "color", "sort_order", "is_active"]:
if key in data:
update_fields.append(f"{key} = %s")
params.append(data[key])
if not update_fields:
# Không có trường nào cần cập nhật
return find_device_type_by_id(device_type_id)
# Thêm cập nhật thời gian sửa đổi (modified)
update_fields.append("modified = CURRENT_TIMESTAMP")
sql = f"""
UPDATE device_type
SET {', '.join(update_fields)}
WHERE id = %s
RETURNING id, name, description, icon_url, color, sort_order, is_active, created, modified
"""
params.append(device_type_id)
cur.execute(sql, tuple(params))
row = cur.fetchone()
conn.commit()
if row:
return _row_to_dict(row)
return None
except Exception:
conn.rollback()
raise
finally:
if cur:
cur.close()
release_connection(conn)
# ============================================
# DELETE: Xóa 1 device type khỏi database
# ============================================
def delete_device_type_db(device_type_id):
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
DELETE FROM device_type
WHERE id = %s
""", (device_type_id,))
conn.commit()
except Exception:
conn.rollback()
raise
finally:
if cur:
cur.close()

View File

@@ -11,7 +11,6 @@ from modules.device_type.controller import (
device_type_bp = Blueprint(
"device_type",
__name__,
url_prefix="/device-types"
)
device_type_bp.route("", methods=["GET"])(get_device_types)

View File

@@ -9,9 +9,17 @@
from modules.device_type.repository import (
find_all_device_types,
find_device_type_by_name,
insert_device_type
insert_device_type,
find_device_type_by_id,
is_device_type_in_use,
update_device_type_db,
delete_device_type_db
)
from modules.device_type.exceptions import (
DeviceTypeAlreadyExistsException,
DeviceTypeNotFoundException,
DeviceTypeInUseException
)
from modules.device_type.exceptions import DeviceTypeAlreadyExistsException
def get_device_types_service():
@@ -19,6 +27,19 @@ def get_device_types_service():
return find_all_device_types()
def get_device_type_by_id_service(device_type_id):
"""
Lấy chi tiết device type theo ID.
Business logic:
1. Tìm device type theo ID
2. Nếu không thấy → throw NotFoundException
"""
device_type = find_device_type_by_id(device_type_id)
if not device_type:
raise DeviceTypeNotFoundException(device_type_id)
return device_type
def create_device_type_service(data):
"""
Tạo mới device type.
@@ -35,4 +56,52 @@ def create_device_type_service(data):
# Bước 2: Insert vào DB
new_device_type = insert_device_type(data)
return new_device_type
return new_device_type
def update_device_type_service(device_type_id, data):
"""
Cập nhật device type.
Business logic:
1. Kiểm tra sự tồn tại của device_type_id. Nếu không có → throw NotFoundException
2. Nếu có đổi tên (name) → kiểm tra xem tên mới đã tồn tại ở bản ghi khác chưa
- Nếu trùng → throw ConflictException
3. Cập nhật vào DB
"""
# Bước 1: Check sự tồn tại
existing = find_device_type_by_id(device_type_id)
if not existing:
raise DeviceTypeNotFoundException(device_type_id)
# Bước 2: Check trùng tên nếu tên thay đổi
new_name = data.get("name")
if new_name and new_name.lower() != existing["name"].lower():
name_conflict = find_device_type_by_name(new_name)
if name_conflict and name_conflict["id"] != device_type_id:
raise DeviceTypeAlreadyExistsException(new_name)
# Bước 3: Cập nhật DB
updated_device_type = update_device_type_db(device_type_id, data)
return updated_device_type
def delete_device_type_service(device_type_id):
"""
Xóa device type.
Business logic:
1. Kiểm tra sự tồn tại. Nếu không có → throw NotFoundException
2. Kiểm tra xem có đang được gán cho thiết bị nào không (FK constraint check)
- Nếu có → throw DeviceTypeInUseException
3. Thực hiện xóa khỏi DB
"""
# Bước 1: Check sự tồn tại
existing = find_device_type_by_id(device_type_id)
if not existing:
raise DeviceTypeNotFoundException(device_type_id)
# Bước 2: Check xem có đang được sử dụng không
if is_device_type_in_use(device_type_id):
raise DeviceTypeInUseException(device_type_id)
# Bước 3: Xóa
delete_device_type_db(device_type_id)

View File

@@ -0,0 +1,149 @@
# pyrefly: ignore [missing-import]
from flask import request
from common.response.api_response import success_response
from modules.monitor_config.service import (
get_monitor_config_service,
update_monitor_config_service,
test_connection_service
)
from modules.monitor_config.schemas import (
UpdateMonitorConfigSchema,
TestConnectionSchema
)
# ============================================
# CONTROLLER: Nhận request → Validate → Gọi Service → Trả response
# ============================================
# Luồng đi tổng thể:
#
# Client (Frontend/Postman)
# │
# ├── GET /api/devices/{device_id}/monitor-config → get_monitor_config()
# ├── PUT /api/devices/{device_id}/monitor-config → update_monitor_config()
# └── POST /api/devices/{device_id}/monitor-config/test → test_connection()
# │
# ▼
# Controller (file này)
# │ 1. Lấy dữ liệu từ request (params, body)
# │ 2. Validate bằng Marshmallow Schema
# │ 3. Gọi Service xử lý logic
# │
# ▼
# Service → Repository → Database
#
# Controller KHÔNG chứa business logic (check tồn tại, tính toán...)
# Controller CHỈ là cầu nối giữa HTTP request và Service layer
# ============================================
def get_monitor_config(device_id):
"""
GET /api/devices/<device_id>/monitor-config
Lấy cấu hình giám sát hiện tại của thiết bị.
Luồng đi:
1. Nhận device_id từ URL path
2. Gọi service → service check thiết bị tồn tại → trả config
3. Trả response thành công kèm data
"""
config = get_monitor_config_service(device_id)
return success_response(
data=config,
message="Monitor config retrieved successfully"
)
def update_monitor_config(device_id):
"""
PUT /api/devices/<device_id>/monitor-config
Cập nhật cấu hình giám sát của thiết bị.
Luồng đi:
1. Nhận device_id từ URL path + JSON body từ request
2. Validate body bằng UpdateMonitorConfigSchema
→ Marshmallow kiểm tra: kiểu dữ liệu, range, enum...
→ Nếu sai → ném ValidationError → Global handler trả 400
→ Nếu đúng → loại bỏ trường thừa, áp dụng defaults
3. Gọi service → service check tồn tại → cập nhật DB → reschedule job
4. Trả response thành công kèm config đã cập nhật
Ví dụ request body:
{
"enable_ping": true,
"ping_interval": 30,
"ping_count": 5,
"enable_snmp": true,
"snmp_community": "public",
"snmp_version": "v2c",
"snmp_port": 161
}
"""
body = request.get_json()
# Validate bằng schema — loại bỏ trường không hợp lệ, check range/enum
schema = UpdateMonitorConfigSchema()
data = schema.load(body)
# Gọi service để xử lý logic nghiệp vụ
updated_config = update_monitor_config_service(device_id, data)
return success_response(
data=updated_config,
message="Monitor config updated successfully"
)
def test_connection(device_id):
"""
POST /api/devices/<device_id>/monitor-config/test
Kiểm tra kết nối tới thiết bị (Ping và/hoặc SNMP).
Luồng đi:
1. Nhận device_id từ URL path + JSON body chứa tham số test
2. Validate body bằng TestConnectionSchema
3. Gọi service → service lấy IP thiết bị → chạy Ping/SNMP test
4. Trả kết quả kiểm tra (Up/Down, RTT, chi tiết SNMP)
Ví dụ request body (test cả Ping và SNMP):
{
"test_ping": true,
"ping_count": 3,
"ping_timeout": 5,
"test_snmp": true,
"snmp_community": "public",
"snmp_version": "v2c",
"snmp_port": 161,
"snmp_timeout": 5
}
Ví dụ response:
{
"success": true,
"data": {
"ping_result": {
"status": "up",
"method": "icmplib",
"avg_rtt_ms": 12.5,
"packet_loss": 0.0
},
"snmp_result": {
"status": "up",
"method": "snmp",
"snmp_data": {"1.3.6.1.2.1.1.1.0": "Cisco IOS XR Software..."}
}
}
}
"""
body = request.get_json() or {}
# Validate tham số test
schema = TestConnectionSchema()
test_params = schema.load(body)
# Gọi service chạy test kết nối
results = test_connection_service(device_id, test_params)
return success_response(
data=results,
message="Connection test completed"
)

View File

@@ -0,0 +1,12 @@
from common.exceptions.app_exception import NotFoundException
class MonitorConfigNotFoundException(NotFoundException):
"""
Exception ném ra khi không tìm thấy cấu hình giám sát (MonitorConfig) của thiết bị.
Kế thừa từ NotFoundException để trả về HTTP status 404 cho client.
"""
def __init__(self, device_id):
super().__init__(
message=f"Monitor config not found for device with id={device_id}",
payload={"device_id": device_id}
)

View File

@@ -0,0 +1,147 @@
import json
from config.database import get_connection, release_connection
# ============================================
# REPOSITORY LAYER: Tương tác trực tiếp với bảng monitor_config trong PostgreSQL
# ============================================
# Luồng đi tổng thể:
# Controller → Service → Repository → Database
# Repository CHỈ chịu trách nhiệm thực thi SQL, KHÔNG chứa logic nghiệp vụ.
#
# Bảng monitor_config có quan hệ 1-1 với bảng device:
# - Mỗi device có đúng 1 bản ghi monitor_config (tạo tự động khi tạo device)
# - Khóa ngoại: device_id → device(id) ON DELETE CASCADE
# - Ràng buộc UNIQUE trên device_id → đảm bảo không trùng
# ============================================
def _row_to_dict(row):
"""
Chuyển đổi một dòng kết quả (tuple) từ câu query thành dictionary.
Thứ tự các cột phải khớp với SELECT trong các hàm bên dưới.
"""
if not row:
return None
return {
"id": str(row[0]),
"device_id": str(row[1]),
"enable_ping": row[2],
"ping_count": row[3],
"ping_timeout": row[4],
"ping_interval": row[5],
"enable_snmp": row[6],
"snmp_version": row[7],
"snmp_community": row[8],
"snmp_port": row[9],
"snmp_interval": row[10],
"snmp_timeout": row[11],
"snmp_custom_oids": row[12], # PostgreSQL JSONB → Python dict (tự động)
"created": row[13].isoformat() if row[13] else None,
"modified": row[14].isoformat() if row[14] else None
}
# ============================================
# FIND BY DEVICE ID: Lấy cấu hình giám sát của một thiết bị
# ============================================
# Luồng đi:
# GET /api/devices/{device_id}/monitor-config
# → Controller → Service gọi hàm này
# → Trả về dict nếu tìm thấy, None nếu không
# ============================================
def find_monitor_config_by_device_id(device_id):
conn = get_connection()
cur = None
try:
cur = conn.cursor()
cur.execute("""
SELECT id, device_id, enable_ping, ping_count, ping_timeout, ping_interval,
enable_snmp, snmp_version, snmp_community, snmp_port, snmp_interval,
snmp_timeout, snmp_custom_oids, created, modified
FROM monitor_config
WHERE device_id = %s
""", (device_id,))
row = cur.fetchone()
return _row_to_dict(row) if row else None
finally:
if cur:
cur.close()
release_connection(conn)
# ============================================
# UPDATE: Cập nhật cấu hình giám sát của thiết bị
# ============================================
# Luồng đi:
# PUT /api/devices/{device_id}/monitor-config
# → Controller validate body → Service kiểm tra tồn tại → Repository cập nhật DB
#
# Cách hoạt động:
# - Dùng Dynamic UPDATE: chỉ cập nhật các trường có trong data (không ghi đè toàn bộ)
# - Trường snmp_custom_oids cần chuyển sang JSON string trước khi lưu vào JSONB
# - Tự động cập nhật cột modified = CURRENT_TIMESTAMP
# - RETURNING: trả về toàn bộ row sau khi update (tránh phải SELECT lại)
# ============================================
def update_monitor_config_db(device_id, data):
conn = get_connection()
cur = None
try:
cur = conn.cursor()
# Xây dựng câu lệnh UPDATE động — chỉ SET các trường có trong data
update_fields = []
params = []
# Danh sách các trường được phép cập nhật
allowed_fields = [
"enable_ping", "ping_count", "ping_timeout", "ping_interval",
"enable_snmp", "snmp_version", "snmp_community", "snmp_port",
"snmp_interval", "snmp_timeout", "snmp_custom_oids"
]
for key in allowed_fields:
if key in data:
value = data[key]
# ⚠️ XỬ LÝ ĐẶC BIỆT: snmp_custom_oids là JSONB trong PostgreSQL
# Python dict cần chuyển sang JSON string trước khi INSERT/UPDATE
# Ví dụ: {"sysName": "1.3.6.1.2.1.1.5.0"} → '{"sysName": "1.3.6.1.2.1.1.5.0"}'
if key == "snmp_custom_oids" and isinstance(value, dict):
value = json.dumps(value)
update_fields.append(f"{key} = %s")
params.append(value)
if not update_fields:
# Không có trường nào cần cập nhật → trả về config hiện tại
return find_monitor_config_by_device_id(device_id)
# Thêm cập nhật thời gian modified
update_fields.append("modified = CURRENT_TIMESTAMP")
sql = f"""
UPDATE monitor_config
SET {', '.join(update_fields)}
WHERE device_id = %s
RETURNING id, device_id, enable_ping, ping_count, ping_timeout, ping_interval,
enable_snmp, snmp_version, snmp_community, snmp_port, snmp_interval,
snmp_timeout, snmp_custom_oids, created, modified
"""
params.append(device_id)
cur.execute(sql, tuple(params))
row = cur.fetchone()
conn.commit()
return _row_to_dict(row) if row else None
except Exception:
conn.rollback()
raise
finally:
if cur:
cur.close()
release_connection(conn)

View File

@@ -0,0 +1,164 @@
# pyrefly: ignore [missing-import]
from marshmallow import Schema, fields, validate
# ============================================
# SCHEMAS: Validate dữ liệu đầu vào cho Monitor Config
# ============================================
# Luồng đi: Client gửi JSON body → Controller nhận → Schema validate → Service xử lý
# Nếu dữ liệu không hợp lệ → Marshmallow ném ValidationError → Global handler trả 400
# ============================================
# Danh sách các SNMP version hệ thống hỗ trợ
VALID_SNMP_VERSIONS = ["v1", "v2c", "v3"]
class UpdateMonitorConfigSchema(Schema):
"""
Schema để validate dữ liệu khi cập nhật cấu hình giám sát.
Tất cả các trường đều optional vì người dùng có thể chỉ cập nhật một phần.
Ví dụ request body:
{
"enable_ping": true,
"ping_interval": 30,
"enable_snmp": true,
"snmp_community": "public",
"snmp_version": "v2c"
}
"""
# ─── Cấu hình Ping (ICMP) ────────────────────────
enable_ping = fields.Boolean(required=False)
ping_count = fields.Integer(
required=False,
validate=validate.Range(min=1, max=10)
# Số lượng gói tin ping gửi mỗi lần kiểm tra
# min=1: ít nhất 1 gói, max=10: tránh gửi quá nhiều gây tải mạng
)
ping_timeout = fields.Integer(
required=False,
validate=validate.Range(min=1, max=30)
# Thời gian chờ phản hồi tối đa (giây)
# Nếu thiết bị không phản hồi trong khoảng này → coi như timeout
)
ping_interval = fields.Integer(
required=False,
validate=validate.Range(min=5, max=86400)
# Tần suất kiểm tra (giây): min=5s (thiết bị quan trọng), max=86400s (1 ngày)
# APScheduler sẽ chạy job Ping theo interval này
)
# ─── Cấu hình SNMP ───────────────────────────────
enable_snmp = fields.Boolean(required=False)
snmp_version = fields.String(
required=False,
allow_none=True,
validate=validate.OneOf(VALID_SNMP_VERSIONS)
# Chỉ chấp nhận: "v1", "v2c", "v3"
)
snmp_community = fields.String(
required=False,
allow_none=True,
validate=validate.Length(max=256)
# Community string dùng để xác thực SNMP (ví dụ: "public", "private")
)
snmp_port = fields.Integer(
required=False,
allow_none=True,
validate=validate.Range(min=1, max=65535)
# Port SNMP mặc định là 161, nhưng cho phép tùy chỉnh
)
snmp_interval = fields.Integer(
required=False,
allow_none=True,
validate=validate.Range(min=5, max=86400)
# Tần suất kiểm tra SNMP (giây), tương tự ping_interval
)
snmp_timeout = fields.Integer(
required=False,
allow_none=True,
validate=validate.Range(min=1, max=30)
# Thời gian chờ phản hồi SNMP tối đa (giây)
)
snmp_custom_oids = fields.Dict(
required=False,
allow_none=True
# Danh sách OID tùy chỉnh dưới dạng JSON object
# Ví dụ: {"sysName": "1.3.6.1.2.1.1.5.0", "ifNumber": "1.3.6.1.2.1.2.1.0"}
# Lưu vào PostgreSQL dưới dạng JSONB
)
class TestConnectionSchema(Schema):
"""
Schema để validate dữ liệu khi người dùng bấm nút "Test kết nối".
Cho phép test trước khi lưu cấu hình — client truyền lên cấu hình tạm để test.
Luồng đi:
1. Người dùng nhập cấu hình trên giao diện (chưa bấm Lưu)
2. Bấm nút "Test" → Client gửi POST /api/devices/{id}/monitor-config/test
3. Server nhận cấu hình tạm → Chạy Ping/SNMP test ngay lập tức
4. Trả kết quả (Up/Down, RTT, chi tiết) → Hiển thị trên giao diện
5. Nếu OK → Người dùng mới bấm "Lưu" để PUT cập nhật cấu hình thực
"""
# Test ping hay snmp?
test_ping = fields.Boolean(
required=False,
load_default=True
# Mặc định test ping
)
test_snmp = fields.Boolean(
required=False,
load_default=False
)
# ─── Tham số Ping (dùng cho test) ─────────────────
ping_count = fields.Integer(
required=False,
load_default=3,
validate=validate.Range(min=1, max=10)
)
ping_timeout = fields.Integer(
required=False,
load_default=5,
validate=validate.Range(min=1, max=30)
)
# ─── Tham số SNMP (dùng cho test) ─────────────────
snmp_version = fields.String(
required=False,
allow_none=True,
validate=validate.OneOf(VALID_SNMP_VERSIONS)
)
snmp_community = fields.String(
required=False,
allow_none=True,
validate=validate.Length(max=256)
)
snmp_port = fields.Integer(
required=False,
allow_none=True,
load_default=161,
validate=validate.Range(min=1, max=65535)
)
snmp_timeout = fields.Integer(
required=False,
allow_none=True,
load_default=5,
validate=validate.Range(min=1, max=30)
)

View File

@@ -0,0 +1,323 @@
import subprocess
import platform
# ============================================
# SERVICE LAYER: Business Logic cho Module Monitor Config
# ============================================
# Luồng đi tổng thể của module này:
#
# ┌──────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────┐
# │ Controller│ ──→ │ Service │ ──→ │ Repository │ ──→ │ Database │
# └──────────┘ └──────────┘ └──────────────┘ └──────────┘
# │
# ├──→ Device Repository (kiểm tra thiết bị tồn tại)
# ├──→ Scheduler (cập nhật job giám sát khi config thay đổi)
# └──→ ICMP/SNMP Test (kiểm tra kết nối trực tiếp)
#
# Service chứa 3 chức năng chính:
# 1. Lấy cấu hình giám sát (GET)
# 2. Cập nhật cấu hình giám sát (PUT) + reschedule job
# 3. Test kết nối trực tiếp (POST /test) — Ping hoặc SNMP
# ============================================
from modules.monitor_config.repository import (
find_monitor_config_by_device_id,
update_monitor_config_db
)
from modules.monitor_config.exceptions import MonitorConfigNotFoundException
from modules.device.repository import find_device_by_id
from modules.device.exceptions import DeviceNotFoundException
from scheduler.scheduler import reschedule_device_monitoring_job
# ============================================
# Dynamic Import: icmplib và pysnmp
# ============================================
# Tại sao cần Dynamic Import?
# - icmplib cần quyền root/sudo trên một số hệ điều hành để gửi ICMP raw socket
# - pysnmp có thể chưa được cài đặt trên máy phát triển
# - Nếu import thất bại → app vẫn chạy được, chỉ fallback sang cách khác
# ============================================
try:
from icmplib import ping as icmp_ping
HAS_ICMPLIB = True
except ImportError:
HAS_ICMPLIB = False
try:
from pysnmp.hlapi import (
SnmpEngine, CommunityData, UdpTransportTarget,
ContextData, ObjectType, ObjectIdentity, getCmd
)
HAS_PYSNMP = True
except ImportError:
HAS_PYSNMP = False
# ============================================
# 1. GET: Lấy cấu hình giám sát của thiết bị
# ============================================
# Luồng đi:
# GET /api/devices/{device_id}/monitor-config
# → Controller gọi hàm này
# → Kiểm tra device tồn tại (DeviceNotFoundException nếu không)
# → Kiểm tra config tồn tại (MonitorConfigNotFoundException nếu không)
# → Trả về dict cấu hình
# ============================================
def get_monitor_config_service(device_id):
# Bước 1: Kiểm tra thiết bị có tồn tại trong hệ thống không
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
# Bước 2: Lấy cấu hình giám sát từ DB
# (Bản ghi monitor_config được tạo tự động khi tạo device trong module device)
config = find_monitor_config_by_device_id(device_id)
if not config:
raise MonitorConfigNotFoundException(device_id)
return config
# ============================================
# 2. PUT: Cập nhật cấu hình giám sát
# ============================================
# Luồng đi:
# PUT /api/devices/{device_id}/monitor-config
# → Controller validate body bằng UpdateMonitorConfigSchema
# → Service gọi hàm này
# → Kiểm tra device + config tồn tại
# → Cập nhật DB (Repository)
# → Gọi Scheduler reschedule_job để áp dụng tần suất mới NGAY LẬP TỨC
# → Trả về config đã cập nhật
#
# Ví dụ: Người dùng đổi ping_interval từ 60s → 30s
# → DB được cập nhật
# → Scheduler reschedule job: lần kiểm tra tiếp theo sẽ chạy sau 30s thay vì 60s
# → KHÔNG cần restart server
# ============================================
def update_monitor_config_service(device_id, data):
# Bước 1: Kiểm tra thiết bị tồn tại
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
# Bước 2: Kiểm tra config tồn tại
existing_config = find_monitor_config_by_device_id(device_id)
if not existing_config:
raise MonitorConfigNotFoundException(device_id)
# Bước 3: Cập nhật vào DB
updated_config = update_monitor_config_db(device_id, data)
# Bước 4: Thông báo cho Scheduler cập nhật lại job
# reschedule_device_monitoring_job sẽ đọc config mới từ DB
# và áp dụng interval mới cho job ping/snmp
reschedule_device_monitoring_job(device_id, updated_config)
return updated_config
# ============================================
# 3. POST: Test kết nối (Ping hoặc SNMP)
# ============================================
# Luồng đi:
# POST /api/devices/{device_id}/monitor-config/test
# → Controller validate body bằng TestConnectionSchema
# → Service gọi hàm này
# → Lấy IP thiết bị từ DB
# → Chạy Ping test (nếu test_ping=True)
# → Chạy SNMP test (nếu test_snmp=True)
# → Trả về kết quả chi tiết {ping_result, snmp_result}
#
# Cơ chế Fallback cho Ping:
# Ưu tiên 1: icmplib (thư viện Python, cần cài pip install icmplib)
# → async_ping() hoặc ping() với privileged=False
# Ưu tiên 2: Lệnh ping hệ điều hành (subprocess)
# → Chạy "ping -c 3 <ip>" (Linux/Mac) hoặc "ping -n 3 <ip>" (Windows)
# → Parse kết quả từ stdout
# ============================================
def test_connection_service(device_id, test_params):
# Bước 1: Kiểm tra thiết bị tồn tại và lấy IP
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
ip_address = device["ip_address"]
results = {}
# Bước 2: Test Ping nếu được yêu cầu
if test_params.get("test_ping", True):
ping_count = test_params.get("ping_count", 3)
ping_timeout = test_params.get("ping_timeout", 5)
results["ping_result"] = _run_ping_test(ip_address, ping_count, ping_timeout)
# Bước 3: Test SNMP nếu được yêu cầu
if test_params.get("test_snmp", False):
snmp_params = {
"community": test_params.get("snmp_community", "public"),
"version": test_params.get("snmp_version", "v2c"),
"port": test_params.get("snmp_port", 161),
"timeout": test_params.get("snmp_timeout", 5)
}
results["snmp_result"] = _run_snmp_test(ip_address, snmp_params)
return results
# ============================================
# PRIVATE: Hàm chạy Ping test
# ============================================
# Cơ chế:
# 1. Thử dùng icmplib trước (nhanh, chính xác, có RTT)
# → privileged=False: không cần quyền root, dùng UDP probe
# 2. Nếu icmplib không có hoặc lỗi → Fallback sang subprocess
# → Chạy lệnh ping của hệ điều hành qua subprocess.run()
# → Parse returncode: 0 = thành công, != 0 = thất bại
# ============================================
def _run_ping_test(ip_address, count, timeout):
# ─── Ưu tiên 1: Dùng icmplib ─────────────────────
if HAS_ICMPLIB:
try:
# privileged=False: Không cần quyền root
# Trên macOS/Linux không root, icmplib sẽ dùng UDP probe
result = icmp_ping(
ip_address,
count=count,
timeout=timeout,
privileged=False
)
return {
"status": "up" if result.is_alive else "down",
"method": "icmplib",
"packets_sent": result.packets_sent,
"packets_received": result.packets_received,
"packet_loss": result.packet_loss, # 0.0 ~ 1.0
"avg_rtt_ms": round(result.avg_rtt, 2), # Thời gian phản hồi trung bình (ms)
"min_rtt_ms": round(result.min_rtt, 2),
"max_rtt_ms": round(result.max_rtt, 2)
}
except Exception as e:
# icmplib có nhưng gặp lỗi (ví dụ: permission denied)
# → Fallback sang subprocess
pass
# ─── Ưu tiên 2: Fallback sang lệnh ping hệ điều hành ───
try:
# Xác định tham số theo hệ điều hành
# Windows dùng -n (number), Linux/macOS dùng -c (count)
param = "-n" if platform.system().lower() == "windows" else "-c"
timeout_param = "-w" if platform.system().lower() == "windows" else "-W"
cmd = ["ping", param, str(count), timeout_param, str(timeout), ip_address]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout * count + 5 # Timeout tổng = timeout mỗi gói × số gói + buffer
)
is_alive = result.returncode == 0
return {
"status": "up" if is_alive else "down",
"method": "system_ping",
"detail": result.stdout.decode("utf-8", errors="replace")[:500]
# Giới hạn 500 ký tự đầu ra để tránh response quá lớn
}
except subprocess.TimeoutExpired:
return {
"status": "down",
"method": "system_ping",
"detail": f"Ping timeout after {timeout * count + 5} seconds"
}
except Exception as e:
return {
"status": "error",
"method": "system_ping",
"detail": str(e)
}
# ============================================
# PRIVATE: Hàm chạy SNMP test
# ============================================
# Cơ chế:
# 1. Dùng pysnmp gửi SNMP GET request lấy sysDescr (OID: 1.3.6.1.2.1.1.1.0)
# → Đây là OID tiêu chuẩn mà MỌI thiết bị SNMP đều hỗ trợ
# → Nếu lấy được giá trị → thiết bị Up + hỗ trợ SNMP
# 2. Nếu pysnmp không có → trả về lỗi yêu cầu cài đặt
#
# Tham số SNMP:
# - community: chuỗi xác thực (mặc định "public")
# - version: phiên bản SNMP (v1, v2c, v3)
# - port: cổng SNMP trên thiết bị (mặc định 161)
# - timeout: thời gian chờ phản hồi (giây)
# ============================================
def _run_snmp_test(ip_address, snmp_params):
if not HAS_PYSNMP:
return {
"status": "error",
"method": "snmp",
"detail": "pysnmp library is not installed. Run: pip install pysnmp"
}
try:
community = snmp_params.get("community", "public")
port = snmp_params.get("port", 161)
timeout_val = snmp_params.get("timeout", 5)
# Xác định mpModel (SNMP version) cho pysnmp
# v1 → mpModel=0, v2c → mpModel=1
version = snmp_params.get("version", "v2c")
mp_model = 0 if version == "v1" else 1
# Gửi SNMP GET request để lấy sysDescr
# OID: 1.3.6.1.2.1.1.1.0 = iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0
# Đây là mô tả hệ thống — mọi thiết bị SNMP đều phải trả lời OID này
error_indication, error_status, error_index, var_binds = next(
getCmd(
SnmpEngine(),
CommunityData(community, mpModel=mp_model),
UdpTransportTarget((ip_address, port), timeout=timeout_val, retries=1),
ContextData(),
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")) # sysDescr
)
)
# Phân tích kết quả
if error_indication:
# Lỗi transport: timeout, không kết nối được, v.v.
return {
"status": "down",
"method": "snmp",
"detail": str(error_indication)
}
elif error_status:
# Lỗi SNMP protocol: OID không tồn tại, quyền truy cập bị từ chối, v.v.
return {
"status": "down",
"method": "snmp",
"detail": f"SNMP error: {error_status.prettyPrint()} at {error_index}"
}
else:
# Thành công! Thiết bị phản hồi SNMP
snmp_data = {}
for var_bind in var_binds:
oid = str(var_bind[0])
value = str(var_bind[1])
snmp_data[oid] = value
return {
"status": "up",
"method": "snmp",
"snmp_data": snmp_data,
"detail": f"SNMP {version} response received successfully"
}
except Exception as e:
return {
"status": "error",
"method": "snmp",
"detail": f"SNMP test failed: {str(e)}"
}

View File

@@ -0,0 +1,60 @@
from flask import request, jsonify
from storage.storage_service import (
upload_device_type_icon,
upload_device_avatar,
)
def upload_device_type_icon_controller():
try:
file = request.files.get("file")
name = request.form.get("name")
icon_url = upload_device_type_icon(file, name)
return jsonify({
"success": True,
"message": "Device type icon uploaded successfully",
"url": icon_url
}), 201
except ValueError as e:
return jsonify({
"success": False,
"message": str(e)
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": "Failed to upload device type icon",
"error": str(e)
}), 500
def upload_device_avatar_controller():
try:
file = request.files.get("file")
name = request.form.get("name")
avatar_url = upload_device_avatar(file, name)
return jsonify({
"success": True,
"message": "Device avatar uploaded successfully",
"url": avatar_url
}), 201
except ValueError as e:
return jsonify({
"success": False,
"message": str(e)
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": "Failed to upload device avatar",
"error": str(e)
}), 500

View File

@@ -0,0 +1,11 @@
from flask import Blueprint
from modules.uploads.upload_controller import (
upload_device_type_icon_controller,
upload_device_avatar_controller,
)
upload_bp = Blueprint("uploads", __name__)
upload_bp.route("/device-type-icons", methods=["POST"])(upload_device_type_icon_controller)
upload_bp.route("/device-avatars", methods=["POST"])(upload_device_avatar_controller)

View File

@@ -11,4 +11,7 @@ SQLAlchemy==2.0.49
typing_extensions==4.15.0
Werkzeug==3.1.8
marshmallow==3.21.2
APScheduler==3.10.4
APScheduler==3.10.4
boto3==2.0.0
icmplib==3.0.4
pysnmp==4.4.12

View File

@@ -0,0 +1 @@
# Init file for scheduler package

View File

@@ -0,0 +1,32 @@
from apscheduler.schedulers.background import BackgroundScheduler
# Khởi tạo background scheduler
# Thư viện APScheduler sẽ chạy các job ping/snmp định kỳ trong nền của Flask app
scheduler = BackgroundScheduler()
def start_scheduler():
"""Khởi chạy scheduler nếu nó chưa chạy"""
if not scheduler.running:
scheduler.start()
print("⏰ Background scheduler started.")
def add_device_monitoring_job(device_id, monitor_config):
"""
Thêm các job Ping/SNMP cho thiết bị mới hoặc được kích hoạt.
Sẽ được triển khai chi tiết ở module monitor_config/device_status.
"""
print(f"⏰ Stub: Added monitoring job for device {device_id}")
def remove_device_monitoring_job(device_id):
"""
Xóa toàn bộ các job Ping/SNMP liên quan đến thiết bị.
Sẽ được triển khai chi tiết ở module monitor_config/device_status.
"""
print(f"⏰ Stub: Removed monitoring job for device {device_id}")
def reschedule_device_monitoring_job(device_id, monitor_config):
"""
Cập nhật lại tần suất chạy (interval) hoặc phương thức chạy khi cấu hình giám sát thay đổi.
Sẽ được triển khai chi tiết ở module monitor_config/device_status.
"""
print(f"⏰ Stub: Rescheduled monitoring job for device {device_id}")

View File

@@ -0,0 +1,49 @@
import os
import boto3
from dotenv import load_dotenv
load_dotenv()
def get_s3_client():
"""
Tạo và trả về S3 client dùng để làm việc với MinIO/S3.
MinIO tương thích với S3 API, nên mình dùng boto3 như khi làm với AWS S3.
Hàm này chỉ chịu trách nhiệm tạo kết nối, không xử lý upload hay nghiệp vụ file.
"""
return boto3.client(
"s3",
endpoint_url=os.getenv("S3_ENDPOINT_URL"),
aws_access_key_id=os.getenv("S3_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_SECRET_KEY"),
region_name=os.getenv("S3_REGION", "us-east-1"),
)
def get_bucket_name():
"""
Lấy tên bucket từ biến môi trường.
Project này nên dùng một bucket chung, ví dụ: ndms.
Sau đó phân loại file bằng object key:
- device-types/router-uuid.svg
- devices/avatar-uuid.png
"""
return os.getenv("S3_BUCKET_NAME")
def get_public_url():
"""
Lấy public URL của MinIO/S3.
Ví dụ khi chạy local:
S3_PUBLIC_URL=http://localhost:9000
URL cuối cùng sẽ có dạng:
http://localhost:9000/ndms/device-types/router-uuid.svg
"""
return os.getenv("S3_PUBLIC_URL")

View File

@@ -0,0 +1,141 @@
import re
import uuid
from werkzeug.utils import secure_filename
from storage.s3_client import (
get_s3_client,
get_bucket_name,
get_public_url,
)
from storage.validators import validate_image_file
def slugify(text):
"""
Chuyển text thành dạng an toàn để dùng trong tên file.
Ví dụ:
- "Router" -> "router"
- "Core Switch 01" -> "core-switch-01"
- "Thiết bị định tuyến" -> "thi-t-b-nh-tuy-n"
Lưu ý:
Hàm này đơn giản, chưa xử lý tiếng Việt hoàn hảo.
Nếu muốn slug tiếng Việt đẹp hơn, sau này có thể dùng thư viện python-slugify.
"""
if text is None or text.strip() == "":
return "file"
text = text.lower().strip()
# Thay toàn bộ ký tự không phải chữ/số bằng dấu gạch ngang.
text = re.sub(r"[^a-z0-9]+", "-", text)
# Xóa dấu gạch ngang thừa ở đầu/cuối.
text = text.strip("-")
return text or "file"
def build_object_key(folder, business_name, extension):
"""
Tạo object key để lưu file trong bucket.
MinIO/S3 không có folder thật như filesystem.
Folder ở đây chỉ là prefix trong object key.
Ví dụ:
folder = "device-types"
business_name = "Router"
extension = "svg"
Kết quả:
device-types/router-550e8400.svg
Dùng UUID để tránh trùng tên file.
"""
safe_name = slugify(business_name)
unique_id = uuid.uuid4()
return f"{folder}/{safe_name}-{unique_id}.{extension}"
def upload_image(file, folder, business_name): # Hàm Upload dùng chung cho device_types và devices
"""
Upload một file ảnh lên MinIO/S3 và trả về public URL.
Tham số:
- file: file lấy từ request.files
- folder: nhóm lưu trữ, ví dụ "device-types" hoặc "devices"
- business_name: tên nghiệp vụ để đặt tên file đẹp hơn
Ví dụ:
+ DeviceType name = Router
+ Device name = Switch tầng 2
Flow:
1. Validate file
2. Tạo object key
3. Upload file lên bucket
4. Trả về URL public để lưu vào database
"""
extension = validate_image_file(file)
object_key = build_object_key(
folder=folder,
business_name=business_name,
extension=extension,
)
s3_client = get_s3_client()
bucket_name = get_bucket_name()
public_url = get_public_url()
s3_client.upload_fileobj(
file,
bucket_name,
object_key,
ExtraArgs={
"ContentType": file.content_type
},
)
return f"{public_url}/{bucket_name}/{object_key}"
def upload_device_type_icon(file, device_type_name):
"""
Upload icon cho DeviceType.
File sẽ được lưu trong bucket với prefix:
device-types/
Ví dụ URL:
http://localhost:9000/ndms/device-types/router-uuid.svg
"""
return upload_image( # Ở đoạn này có thể trả về một hàm luôn á ? Mà không cần phải gọi hàm đó vào bên trong và truyền tham số à ?
file=file,
folder="device-types",
business_name=device_type_name,
)
def upload_device_avatar(file, device_name):
"""
Upload avatar cho Device.
File sẽ được lưu trong bucket với prefix:
devices/
Ví dụ URL:
http://localhost:9000/ndms/devices/switch-tang-2-uuid.png
"""
return upload_image(
file=file,
folder="devices",
business_name=device_name,
)

View File

@@ -0,0 +1,49 @@
ALLOWED_IMAGE_EXTENSIONS = {"svg", "png", "jpg", "jpeg", "webp"}
MAX_FILE_SIZE_MB = 5
MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024
def validate_image_file(file):
"""
Validate file ảnh trước khi upload lên MinIO/S3.
Hàm này kiểm tra:
1. Có file được gửi lên không
2. File có tên không
3. File có extension không
4. Extension có hợp lệ không
5. File có vượt quá dung lượng cho phép không
Trả về:
- extension của file nếu hợp lệ
Raise:
- ValueError nếu file không hợp lệ
"""
if file is None:
raise ValueError("No file uploaded")
if file.filename == "":
raise ValueError("Filename is empty")
if "." not in file.filename:
raise ValueError("File must have an extension")
extension = file.filename.rsplit(".", 1)[1].lower()
if extension not in ALLOWED_IMAGE_EXTENSIONS:
raise ValueError("Invalid image type. Allowed: svg, png, jpg, jpeg, webp")
# Di chuyển con trỏ file tới cuối để đo dung lượng file.
file.seek(0, 2)
file_size = file.tell()
# Đưa con trỏ file về đầu để lát nữa upload không bị mất dữ liệu.
file.seek(0)
if file_size > MAX_FILE_SIZE_BYTES:
raise ValueError(f"File size must be less than {MAX_FILE_SIZE_MB}MB")
return extension