Files
NetworkDeviceManagementSystem/backend/modules/monitor_config/service.py
QuangMinh_123 edc4fc44c5 monitorconfig
2026-05-29 11:10:51 +07:00

335 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import platform
# ============================================
# SERVICE LAYER: Business Logic cho Module Monitor Config
# ============================================
# Luồng đi tổng thể của module này:
#
# ┌──────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────┐
# │ Controller│ ──→ │ Service │ ──→ │ Repository │ ──→ │ Database │
# └──────────┘ └──────────┘ └──────────────┘ └──────────┘
# │
# ├──→ Device Repository (kiểm tra thiết bị tồn tại)
# ├──→ Scheduler (cập nhật job giám sát khi config thay đổi)
# └──→ ICMP/SNMP Test (kiểm tra kết nối trực tiếp)
#
# Service chứa 3 chức năng chính:
# 1. Lấy cấu hình giám sát (GET)
# 2. Cập nhật cấu hình giám sát (PUT) + reschedule job
# 3. Test kết nối trực tiếp (POST /test) — Ping hoặc SNMP
# ============================================
from modules.monitor_config.repository import (
find_monitor_config_by_device_id,
update_monitor_config_db,
insert_default_monitor_config_db
)
from modules.monitor_config.exceptions import MonitorConfigNotFoundException
from modules.device.repository import find_device_by_id
from modules.device.exceptions import DeviceNotFoundException
from scheduler.scheduler import reschedule_device_monitoring_job
# ============================================
# CREATE DEFAULT: Tạo cấu hình giám sát mặc định (Service Layer)
# ============================================
def create_default_monitor_config_service(device_id, cursor=None):
"""
Tạo cấu hình giám sát mặc định cho thiết bị mới.
"""
return insert_default_monitor_config_db(device_id, cursor)
# ============================================
# Dynamic Import: icmplib và pysnmp
# ============================================
# Tại sao cần Dynamic Import?
# - icmplib cần quyền root/sudo trên một số hệ điều hành để gửi ICMP raw socket
# - pysnmp có thể chưa được cài đặt trên máy phát triển
# - Nếu import thất bại → app vẫn chạy được, chỉ fallback sang cách khác
# ============================================
try:
from icmplib import ping as icmp_ping
HAS_ICMPLIB = True
except ImportError:
HAS_ICMPLIB = False
try:
from pysnmp.hlapi import (
SnmpEngine, CommunityData, UdpTransportTarget,
ContextData, ObjectType, ObjectIdentity, getCmd
)
HAS_PYSNMP = True
except ImportError:
HAS_PYSNMP = False
# ============================================
# 1. GET: Lấy cấu hình giám sát của thiết bị
# ============================================
# Luồng đi:
# GET /api/devices/{device_id}/monitor-config
# → Controller gọi hàm này
# → Kiểm tra device tồn tại (DeviceNotFoundException nếu không)
# → Kiểm tra config tồn tại (MonitorConfigNotFoundException nếu không)
# → Trả về dict cấu hình
# ============================================
def get_monitor_config_service(device_id):
# Bước 1: Kiểm tra thiết bị có tồn tại trong hệ thống không
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
# Bước 2: Lấy cấu hình giám sát từ DB
# (Bản ghi monitor_config được tạo tự động khi tạo device trong module device)
config = find_monitor_config_by_device_id(device_id)
if not config:
raise MonitorConfigNotFoundException(device_id)
return config
# ============================================
# 2. PUT: Cập nhật cấu hình giám sát
# ============================================
# Luồng đi:
# PUT /api/devices/{device_id}/monitor-config
# → Controller validate body bằng UpdateMonitorConfigSchema
# → Service gọi hàm này
# → Kiểm tra device + config tồn tại
# → Cập nhật DB (Repository)
# → Gọi Scheduler reschedule_job để áp dụng tần suất mới NGAY LẬP TỨC
# → Trả về config đã cập nhật
#
# Ví dụ: Người dùng đổi ping_interval từ 60s → 30s
# → DB được cập nhật
# → Scheduler reschedule job: lần kiểm tra tiếp theo sẽ chạy sau 30s thay vì 60s
# → KHÔNG cần restart server
# ============================================
def update_monitor_config_service(device_id, data):
# Bước 1: Kiểm tra thiết bị tồn tại
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
# Bước 2: Kiểm tra config tồn tại
existing_config = find_monitor_config_by_device_id(device_id)
if not existing_config:
raise MonitorConfigNotFoundException(device_id)
# Bước 3: Cập nhật vào DB
updated_config = update_monitor_config_db(device_id, data)
# Bước 4: Thông báo cho Scheduler cập nhật lại job
# reschedule_device_monitoring_job sẽ đọc config mới từ DB
# và áp dụng interval mới cho job ping/snmp
reschedule_device_monitoring_job(device_id, updated_config)
return updated_config
# ============================================
# 3. POST: Test kết nối (Ping hoặc SNMP)
# ============================================
# Luồng đi:
# POST /api/devices/{device_id}/monitor-config/test
# → Controller validate body bằng TestConnectionSchema
# → Service gọi hàm này
# → Lấy IP thiết bị từ DB
# → Chạy Ping test (nếu test_ping=True)
# → Chạy SNMP test (nếu test_snmp=True)
# → Trả về kết quả chi tiết {ping_result, snmp_result}
#
# Cơ chế Fallback cho Ping:
# Ưu tiên 1: icmplib (thư viện Python, cần cài pip install icmplib)
# → async_ping() hoặc ping() với privileged=False
# Ưu tiên 2: Lệnh ping hệ điều hành (subprocess)
# → Chạy "ping -c 3 <ip>" (Linux/Mac) hoặc "ping -n 3 <ip>" (Windows)
# → Parse kết quả từ stdout
# ============================================
def test_connection_service(device_id, test_params):
# Bước 1: Kiểm tra thiết bị tồn tại và lấy IP
device = find_device_by_id(device_id)
if not device:
raise DeviceNotFoundException(device_id)
ip_address = device["ip_address"]
results = {}
# Bước 2: Test Ping nếu được yêu cầu
if test_params.get("test_ping", True):
ping_count = test_params.get("ping_count", 3)
ping_timeout = test_params.get("ping_timeout", 5)
results["ping_result"] = _run_ping_test(ip_address, ping_count, ping_timeout)
# Bước 3: Test SNMP nếu được yêu cầu
if test_params.get("test_snmp", False):
snmp_params = {
"community": test_params.get("snmp_community", "public"),
"version": test_params.get("snmp_version", "v2c"),
"port": test_params.get("snmp_port", 161),
"timeout": test_params.get("snmp_timeout", 5)
}
results["snmp_result"] = _run_snmp_test(ip_address, snmp_params)
return results
# ============================================
# PRIVATE: Hàm chạy Ping test
# ============================================
# Cơ chế:
# 1. Thử dùng icmplib trước (nhanh, chính xác, có RTT)
# → privileged=False: không cần quyền root, dùng UDP probe
# 2. Nếu icmplib không có hoặc lỗi → Fallback sang subprocess
# → Chạy lệnh ping của hệ điều hành qua subprocess.run()
# → Parse returncode: 0 = thành công, != 0 = thất bại
# ============================================
def _run_ping_test(ip_address, count, timeout):
# ─── Ưu tiên 1: Dùng icmplib ─────────────────────
if HAS_ICMPLIB:
try:
# privileged=False: Không cần quyền root
# Trên macOS/Linux không root, icmplib sẽ dùng UDP probe
result = icmp_ping(
ip_address,
count=count,
timeout=timeout,
privileged=False
)
return {
"status": "up" if result.is_alive else "down",
"method": "icmplib",
"packets_sent": result.packets_sent,
"packets_received": result.packets_received,
"packet_loss": result.packet_loss, # 0.0 ~ 1.0
"avg_rtt_ms": round(result.avg_rtt, 2), # Thời gian phản hồi trung bình (ms)
"min_rtt_ms": round(result.min_rtt, 2),
"max_rtt_ms": round(result.max_rtt, 2)
}
except Exception as e:
# icmplib có nhưng gặp lỗi (ví dụ: permission denied)
# → Fallback sang subprocess
pass
# ─── Ưu tiên 2: Fallback sang lệnh ping hệ điều hành ───
try:
# Xác định tham số theo hệ điều hành
# Windows dùng -n (number), Linux/macOS dùng -c (count)
param = "-n" if platform.system().lower() == "windows" else "-c"
timeout_param = "-w" if platform.system().lower() == "windows" else "-W"
cmd = ["ping", param, str(count), timeout_param, str(timeout), ip_address]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout * count + 5 # Timeout tổng = timeout mỗi gói × số gói + buffer
)
is_alive = result.returncode == 0
return {
"status": "up" if is_alive else "down",
"method": "system_ping",
"detail": result.stdout.decode("utf-8", errors="replace")[:500]
# Giới hạn 500 ký tự đầu ra để tránh response quá lớn
}
except subprocess.TimeoutExpired:
return {
"status": "down",
"method": "system_ping",
"detail": f"Ping timeout after {timeout * count + 5} seconds"
}
except Exception as e:
return {
"status": "error",
"method": "system_ping",
"detail": str(e)
}
# ============================================
# PRIVATE: Hàm chạy SNMP test
# ============================================
# Cơ chế:
# 1. Dùng pysnmp gửi SNMP GET request lấy sysDescr (OID: 1.3.6.1.2.1.1.1.0)
# → Đây là OID tiêu chuẩn mà MỌI thiết bị SNMP đều hỗ trợ
# → Nếu lấy được giá trị → thiết bị Up + hỗ trợ SNMP
# 2. Nếu pysnmp không có → trả về lỗi yêu cầu cài đặt
#
# Tham số SNMP:
# - community: chuỗi xác thực (mặc định "public")
# - version: phiên bản SNMP (v1, v2c, v3)
# - port: cổng SNMP trên thiết bị (mặc định 161)
# - timeout: thời gian chờ phản hồi (giây)
# ============================================
def _run_snmp_test(ip_address, snmp_params):
if not HAS_PYSNMP:
return {
"status": "error",
"method": "snmp",
"detail": "pysnmp library is not installed. Run: pip install pysnmp"
}
try:
community = snmp_params.get("community", "public")
port = snmp_params.get("port", 161)
timeout_val = snmp_params.get("timeout", 5)
# Xác định mpModel (SNMP version) cho pysnmp
# v1 → mpModel=0, v2c → mpModel=1
version = snmp_params.get("version", "v2c")
mp_model = 0 if version == "v1" else 1
# Gửi SNMP GET request để lấy sysDescr
# OID: 1.3.6.1.2.1.1.1.0 = iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0
# Đây là mô tả hệ thống — mọi thiết bị SNMP đều phải trả lời OID này
error_indication, error_status, error_index, var_binds = next(
getCmd(
SnmpEngine(),
CommunityData(community, mpModel=mp_model),
UdpTransportTarget((ip_address, port), timeout=timeout_val, retries=1),
ContextData(),
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")) # sysDescr
)
)
# Phân tích kết quả
if error_indication:
# Lỗi transport: timeout, không kết nối được, v.v.
return {
"status": "down",
"method": "snmp",
"detail": str(error_indication)
}
elif error_status:
# Lỗi SNMP protocol: OID không tồn tại, quyền truy cập bị từ chối, v.v.
return {
"status": "down",
"method": "snmp",
"detail": f"SNMP error: {error_status.prettyPrint()} at {error_index}"
}
else:
# Thành công! Thiết bị phản hồi SNMP
snmp_data = {}
for var_bind in var_binds:
oid = str(var_bind[0])
value = str(var_bind[1])
snmp_data[oid] = value
return {
"status": "up",
"method": "snmp",
"snmp_data": snmp_data,
"detail": f"SNMP {version} response received successfully"
}
except Exception as e:
return {
"status": "error",
"method": "snmp",
"detail": f"SNMP test failed: {str(e)}"
}