VWED_server/utils/alert_sync.py

300 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
告警同步服务模块
负责将本地告警信息同步到主系统
"""
import asyncio
import logging
import threading
import time
import hashlib
from queue import Queue, Empty
from typing import Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
import requests
import json
from datetime import datetime
from config.tf_api_config import TF_API_TOKEN
from config.settings import settings
class AlertSyncService:
"""
告警同步服务类
负责将告警信息异步同步到主系统
"""
# 日志级别映射到主系统告警级别
LEVEL_MAPPING = {
logging.WARNING: 2, # 警告
logging.ERROR: 3, # 错误
logging.CRITICAL: 4 # 严重
}
# 告警类型映射
TYPE_MAPPING = {
'task': 3, # 任务告警
'system': 1, # 系统告警
'robot': 2 # 机器人告警
}
def __init__(self):
"""初始化告警同步服务"""
self.enabled = settings.ALERT_SYNC_ENABLED
self.sync_url = settings.ALERT_SYNC_URL
self.timeout = settings.ALERT_SYNC_TIMEOUT
self.retry_count = settings.ALERT_SYNC_RETRY_COUNT
self.retry_delay = settings.ALERT_SYNC_RETRY_DELAY
self.min_level = getattr(logging, settings.ALERT_SYNC_MIN_LEVEL.upper(), logging.WARNING)
# 创建队列和线程池
self.alert_queue = Queue(maxsize=settings.ALERT_SYNC_QUEUE_SIZE)
self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="AlertSync")
self.running = True
# 启动后台处理线程
if self.enabled:
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def _generate_alert_code(self, logger_name: str, message: str) -> int:
"""
生成4位告警码
Args:
logger_name: logger名称
message: 告警消息
Returns:
4位告警码
"""
# 使用新的错误代码映射机制
try:
from config.error_code_mapping import ErrorCodeMapping
return ErrorCodeMapping.get_error_code(logger_name, message)
except ImportError:
# 如果导入失败,使用原有的哈希生成方式作为备用
hash_input = f"{logger_name}:{message}"
hash_value = hashlib.md5(hash_input.encode()).hexdigest()
# 取hash值的前4位并转换为数字确保在5400-9999范围内
code = int(hash_value[:4], 16) % 4600 + 5400
return code
def _determine_alert_type(self, logger_name: str) -> int:
"""
根据logger名称判断告警类型
Args:
logger_name: logger名称
Returns:
告警类型: 1-系统告警, 2-机器人告警, 3-任务告警
"""
logger_lower = logger_name.lower()
if any(keyword in logger_lower for keyword in ['task', 'execution', 'scheduler', 'template']):
return self.TYPE_MAPPING['task']
elif any(keyword in logger_lower for keyword in ['robot', 'vehicle', 'amr']):
return self.TYPE_MAPPING['robot']
else:
return self.TYPE_MAPPING['system']
def _generate_alert_name(self, logger_name: str, level: int) -> str:
"""
生成告警名称
Args:
logger_name: logger名称
level: 告警级别
Returns:
告警名称
"""
level_names = {
logging.WARNING: "WARNING",
logging.ERROR: "ERROR",
logging.CRITICAL: "CRITICAL"
}
level_name = level_names.get(level, "UNKNOWN")
# 提取模块名
module_parts = logger_name.split('.')
if len(module_parts) > 1:
module_name = module_parts[-1].upper()
else:
module_name = logger_name.upper()
return f"{module_name}_{level_name}"
def _format_alert_data(self, record: logging.LogRecord) -> Dict[str, Any]:
"""
格式化告警数据为主系统要求的格式
Args:
record: 日志记录
Returns:
格式化后的告警数据
"""
# 生成告警码
alert_code = self._generate_alert_code(record.name, record.getMessage())
# 确定告警类型
alert_type = self._determine_alert_type(record.name)
# 映射告警级别
alert_level = self.LEVEL_MAPPING.get(record.levelno, 2)
# 生成告警名称
alert_name = self._generate_alert_name(record.name, record.levelno)
# 构建描述信息
description = record.getMessage()
if hasattr(record, 'filename') and hasattr(record, 'lineno'):
description += f" [文件: {record.filename}:{record.lineno}]"
if hasattr(record, 'funcName'):
description += f" [函数: {record.funcName}()]"
# 构建解决方案(如果有异常信息)
# solution = None
# if record.exc_info:
alert_data = {
"type": alert_type,
"level": alert_level,
"code": alert_code,
"name": alert_name,
"description": description,
"solution": "请检查相关日志文件获取详细的异常堆栈信息,并根据异常类型进行排查"
}
# print(alert_data,"=================")
# if solution:
# alert_data["solution"] = solution
return alert_data
def _send_alert_to_main_system(self, alert_data: Dict[str, Any]) -> bool:
"""
发送告警到主系统
Args:
alert_data: 告警数据
Returns:
是否发送成功
"""
for attempt in range(self.retry_count):
try:
print("------------------====>>>", alert_data,"<<<<=================")
response = requests.post(
self.sync_url,
json=alert_data,
timeout=self.timeout,
headers={'Content-Type': 'application/json',
'x-access-token': TF_API_TOKEN
}
)
if response.status_code == 200:
result = response.json()
if result.get('success', False) or result.get('code') == 200:
return True
else:
print(f"告警同步失败,主系统返回: {result.get('message', 'Unknown error')}")
else:
print(f"告警同步HTTP错误: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"告警同步网络错误 (尝试 {attempt + 1}/{self.retry_count}): {e}")
except Exception as e:
print(f"告警同步未知错误 (尝试 {attempt + 1}/{self.retry_count}): {e}")
# 如果不是最后一次尝试,等待后重试
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay * (attempt + 1)) # 递增延迟
return False
def _worker(self):
"""后台工作线程,处理告警队列"""
while self.running:
try:
# 从队列获取告警数据
alert_data = self.alert_queue.get(timeout=1.0)
# 提交到线程池异步发送
future = self.executor.submit(self._send_alert_to_main_system, alert_data)
# 标记任务完成
self.alert_queue.task_done()
except Empty:
continue
except Exception as e:
print(f"告警同步工作线程错误: {e}")
def sync_alert(self, record: logging.LogRecord) -> bool:
"""
同步告警到主系统
Args:
record: 日志记录
Returns:
是否成功添加到队列
"""
if not self.enabled:
return False
# 检查日志级别
if record.levelno < self.min_level:
return False
try:
# 格式化告警数据
alert_data = self._format_alert_data(record)
# 添加到队列
self.alert_queue.put_nowait(alert_data)
return True
except Exception as e:
# 避免告警同步本身产生错误影响正常日志
print(f"告警同步格式化错误: {e}")
return False
def shutdown(self):
"""关闭告警同步服务"""
self.running = False
if hasattr(self, 'worker_thread'):
self.worker_thread.join(timeout=5)
self.executor.shutdown(wait=True)
# 全局告警同步服务实例
_alert_sync_service = None
def get_alert_sync_service() -> AlertSyncService:
"""获取告警同步服务实例(单例模式)"""
global _alert_sync_service
if _alert_sync_service is None:
_alert_sync_service = AlertSyncService()
return _alert_sync_service
def sync_alert_from_record(record: logging.LogRecord) -> bool:
"""
从日志记录同步告警(外部调用接口)
Args:
record: 日志记录
Returns:
是否成功
"""
service = get_alert_sync_service()
return service.sync_alert(record)