300 lines
9.7 KiB
Python
300 lines
9.7 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
告警同步服务模块
|
||
负责将本地告警信息同步到主系统
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import threading
|
||
import time
|
||
import hashlib
|
||
from queue import Queue, Empty
|
||
from typing import Dict, Any, Optional
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import requests
|
||
import json
|
||
from datetime import datetime
|
||
from config.tf_api_config import TF_API_TOKEN
|
||
from config.settings import settings
|
||
|
||
|
||
|
||
class AlertSyncService:
|
||
"""
|
||
告警同步服务类
|
||
负责将告警信息异步同步到主系统
|
||
"""
|
||
|
||
# 日志级别映射到主系统告警级别
|
||
LEVEL_MAPPING = {
|
||
logging.WARNING: 2, # 警告
|
||
logging.ERROR: 3, # 错误
|
||
logging.CRITICAL: 4 # 严重
|
||
}
|
||
|
||
# 告警类型映射
|
||
TYPE_MAPPING = {
|
||
'task': 3, # 任务告警
|
||
'system': 1, # 系统告警
|
||
'robot': 2 # 机器人告警
|
||
}
|
||
|
||
def __init__(self):
|
||
"""初始化告警同步服务"""
|
||
self.enabled = settings.ALERT_SYNC_ENABLED
|
||
self.sync_url = settings.ALERT_SYNC_URL
|
||
self.timeout = settings.ALERT_SYNC_TIMEOUT
|
||
self.retry_count = settings.ALERT_SYNC_RETRY_COUNT
|
||
self.retry_delay = settings.ALERT_SYNC_RETRY_DELAY
|
||
self.min_level = getattr(logging, settings.ALERT_SYNC_MIN_LEVEL.upper(), logging.WARNING)
|
||
|
||
# 创建队列和线程池
|
||
self.alert_queue = Queue(maxsize=settings.ALERT_SYNC_QUEUE_SIZE)
|
||
self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="AlertSync")
|
||
self.running = True
|
||
|
||
# 启动后台处理线程
|
||
if self.enabled:
|
||
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
|
||
self.worker_thread.start()
|
||
|
||
def _generate_alert_code(self, logger_name: str, message: str) -> int:
|
||
"""
|
||
生成4位告警码
|
||
|
||
Args:
|
||
logger_name: logger名称
|
||
message: 告警消息
|
||
|
||
Returns:
|
||
4位告警码
|
||
"""
|
||
# 使用新的错误代码映射机制
|
||
try:
|
||
from config.error_code_mapping import ErrorCodeMapping
|
||
return ErrorCodeMapping.get_error_code(logger_name, message)
|
||
except ImportError:
|
||
# 如果导入失败,使用原有的哈希生成方式作为备用
|
||
hash_input = f"{logger_name}:{message}"
|
||
hash_value = hashlib.md5(hash_input.encode()).hexdigest()
|
||
# 取hash值的前4位并转换为数字,确保在5400-9999范围内
|
||
code = int(hash_value[:4], 16) % 4600 + 5400
|
||
return code
|
||
|
||
def _determine_alert_type(self, logger_name: str) -> int:
|
||
"""
|
||
根据logger名称判断告警类型
|
||
|
||
Args:
|
||
logger_name: logger名称
|
||
|
||
Returns:
|
||
告警类型: 1-系统告警, 2-机器人告警, 3-任务告警
|
||
"""
|
||
logger_lower = logger_name.lower()
|
||
|
||
if any(keyword in logger_lower for keyword in ['task', 'execution', 'scheduler', 'template']):
|
||
return self.TYPE_MAPPING['task']
|
||
elif any(keyword in logger_lower for keyword in ['robot', 'vehicle', 'amr']):
|
||
return self.TYPE_MAPPING['robot']
|
||
else:
|
||
return self.TYPE_MAPPING['system']
|
||
|
||
def _generate_alert_name(self, logger_name: str, level: int) -> str:
|
||
"""
|
||
生成告警名称
|
||
|
||
Args:
|
||
logger_name: logger名称
|
||
level: 告警级别
|
||
|
||
Returns:
|
||
告警名称
|
||
"""
|
||
level_names = {
|
||
logging.WARNING: "WARNING",
|
||
logging.ERROR: "ERROR",
|
||
logging.CRITICAL: "CRITICAL"
|
||
}
|
||
level_name = level_names.get(level, "UNKNOWN")
|
||
|
||
# 提取模块名
|
||
module_parts = logger_name.split('.')
|
||
if len(module_parts) > 1:
|
||
module_name = module_parts[-1].upper()
|
||
else:
|
||
module_name = logger_name.upper()
|
||
|
||
return f"{module_name}_{level_name}"
|
||
|
||
def _format_alert_data(self, record: logging.LogRecord) -> Dict[str, Any]:
|
||
"""
|
||
格式化告警数据为主系统要求的格式
|
||
|
||
Args:
|
||
record: 日志记录
|
||
|
||
Returns:
|
||
格式化后的告警数据
|
||
"""
|
||
# 生成告警码
|
||
alert_code = self._generate_alert_code(record.name, record.getMessage())
|
||
|
||
# 确定告警类型
|
||
alert_type = self._determine_alert_type(record.name)
|
||
|
||
# 映射告警级别
|
||
alert_level = self.LEVEL_MAPPING.get(record.levelno, 2)
|
||
|
||
# 生成告警名称
|
||
alert_name = self._generate_alert_name(record.name, record.levelno)
|
||
|
||
# 构建描述信息
|
||
description = record.getMessage()
|
||
if hasattr(record, 'filename') and hasattr(record, 'lineno'):
|
||
description += f" [文件: {record.filename}:{record.lineno}]"
|
||
if hasattr(record, 'funcName'):
|
||
description += f" [函数: {record.funcName}()]"
|
||
|
||
# 构建解决方案(如果有异常信息)
|
||
# solution = None
|
||
# if record.exc_info:
|
||
|
||
alert_data = {
|
||
"type": alert_type,
|
||
"level": alert_level,
|
||
"code": alert_code,
|
||
"name": alert_name,
|
||
"description": description,
|
||
"solution": "请检查相关日志文件获取详细的异常堆栈信息,并根据异常类型进行排查"
|
||
}
|
||
# print(alert_data,"=================")
|
||
# if solution:
|
||
# alert_data["solution"] = solution
|
||
|
||
return alert_data
|
||
|
||
def _send_alert_to_main_system(self, alert_data: Dict[str, Any]) -> bool:
|
||
"""
|
||
发送告警到主系统
|
||
|
||
Args:
|
||
alert_data: 告警数据
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
|
||
for attempt in range(self.retry_count):
|
||
try:
|
||
print("------------------====>>>", alert_data,"<<<<=================")
|
||
response = requests.post(
|
||
self.sync_url,
|
||
json=alert_data,
|
||
timeout=self.timeout,
|
||
headers={'Content-Type': 'application/json',
|
||
'x-access-token': TF_API_TOKEN
|
||
}
|
||
)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('success', False) or result.get('code') == 200:
|
||
return True
|
||
else:
|
||
print(f"告警同步失败,主系统返回: {result.get('message', 'Unknown error')}")
|
||
else:
|
||
print(f"告警同步HTTP错误: {response.status_code}")
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"告警同步网络错误 (尝试 {attempt + 1}/{self.retry_count}): {e}")
|
||
|
||
except Exception as e:
|
||
print(f"告警同步未知错误 (尝试 {attempt + 1}/{self.retry_count}): {e}")
|
||
|
||
# 如果不是最后一次尝试,等待后重试
|
||
if attempt < self.retry_count - 1:
|
||
time.sleep(self.retry_delay * (attempt + 1)) # 递增延迟
|
||
|
||
return False
|
||
|
||
def _worker(self):
|
||
"""后台工作线程,处理告警队列"""
|
||
while self.running:
|
||
try:
|
||
# 从队列获取告警数据
|
||
alert_data = self.alert_queue.get(timeout=1.0)
|
||
|
||
# 提交到线程池异步发送
|
||
future = self.executor.submit(self._send_alert_to_main_system, alert_data)
|
||
|
||
# 标记任务完成
|
||
self.alert_queue.task_done()
|
||
|
||
except Empty:
|
||
continue
|
||
except Exception as e:
|
||
print(f"告警同步工作线程错误: {e}")
|
||
|
||
def sync_alert(self, record: logging.LogRecord) -> bool:
|
||
"""
|
||
同步告警到主系统
|
||
|
||
Args:
|
||
record: 日志记录
|
||
|
||
Returns:
|
||
是否成功添加到队列
|
||
"""
|
||
if not self.enabled:
|
||
return False
|
||
|
||
# 检查日志级别
|
||
if record.levelno < self.min_level:
|
||
return False
|
||
|
||
try:
|
||
# 格式化告警数据
|
||
alert_data = self._format_alert_data(record)
|
||
|
||
# 添加到队列
|
||
self.alert_queue.put_nowait(alert_data)
|
||
return True
|
||
|
||
except Exception as e:
|
||
# 避免告警同步本身产生错误影响正常日志
|
||
print(f"告警同步格式化错误: {e}")
|
||
return False
|
||
|
||
def shutdown(self):
|
||
"""关闭告警同步服务"""
|
||
self.running = False
|
||
if hasattr(self, 'worker_thread'):
|
||
self.worker_thread.join(timeout=5)
|
||
self.executor.shutdown(wait=True)
|
||
|
||
|
||
# 全局告警同步服务实例
|
||
_alert_sync_service = None
|
||
|
||
def get_alert_sync_service() -> AlertSyncService:
|
||
"""获取告警同步服务实例(单例模式)"""
|
||
global _alert_sync_service
|
||
if _alert_sync_service is None:
|
||
_alert_sync_service = AlertSyncService()
|
||
return _alert_sync_service
|
||
|
||
def sync_alert_from_record(record: logging.LogRecord) -> bool:
|
||
"""
|
||
从日志记录同步告警(外部调用接口)
|
||
|
||
Args:
|
||
record: 日志记录
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
service = get_alert_sync_service()
|
||
return service.sync_alert(record) |