398 lines
14 KiB
Python
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
工作线程管理模块
负责工作线程的创建监控和调整
"""
import asyncio
import logging
import time
from typing import Dict, List, Any, Optional, Set, Callable, Coroutine
from datetime import datetime, timedelta
import psutil
from utils.logger import get_logger
# 获取日志记录器
logger = get_logger("services.enhanced_scheduler.worker_manager")
class WorkerManager:
"""
工作线程管理器
负责管理和监控工作线程池
"""
def __init__(self,
min_workers: int = 5,
max_workers: int = 20,
worker_heartbeat_interval: int = 30,
auto_scale_interval: int = 60,
cpu_threshold: float = 80.0,
2025-05-12 15:43:21 +08:00
memory_threshold: float = 80.0,
queue_manager = None):
2025-04-30 16:57:46 +08:00
"""
初始化工作线程管理器
Args:
min_workers: 最小工作线程数
max_workers: 最大工作线程数
worker_heartbeat_interval: 工作线程心跳间隔
auto_scale_interval: 自动扩缩容间隔
cpu_threshold: CPU使用率阈值百分比
memory_threshold: 内存使用率阈值百分比
"""
self.min_workers = min_workers
self.max_workers = max_workers
self.worker_heartbeat_interval = worker_heartbeat_interval
self.auto_scale_interval = auto_scale_interval
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.workers = {} # 工作线程字典 {worker_id: worker_task}
self.worker_status = {} # 工作线程状态 {worker_id: status_dict}
self.worker_heartbeats = {} # 工作线程心跳 {worker_id: last_heartbeat_time}
self.worker_factory = None # 工作线程工厂函数
self.is_running = False
self.monitor_task = None # 监控任务
self.last_auto_scale_time = datetime.now()
2025-05-12 15:43:21 +08:00
self.queue_manager = queue_manager
2025-04-30 16:57:46 +08:00
logger.info(f"初始化工作线程管理器: min={min_workers}, max={max_workers}, "
f"心跳间隔={worker_heartbeat_interval}秒, 自动扩缩容间隔={auto_scale_interval}")
def set_worker_factory(self, factory: Callable[[int], Coroutine]):
"""
设置工作线程工厂函数
Args:
factory: 工作线程工厂函数接受worker_id参数返回协程对象
"""
self.worker_factory = factory
async def start(self, initial_workers: int = None) -> None:
"""
启动工作线程管理器
Args:
initial_workers: 初始工作线程数默认为min_workers
"""
if self.is_running:
logger.warning("工作线程管理器已经在运行中")
return
if not self.worker_factory:
raise ValueError("未设置工作线程工厂函数")
self.is_running = True
# 启动初始工作线程
initial_count = initial_workers if initial_workers is not None else self.min_workers
initial_count = max(self.min_workers, min(initial_count, self.max_workers))
for i in range(initial_count):
await self.add_worker()
# 启动监控任务
2025-05-12 15:43:21 +08:00
self.monitor_task = asyncio.create_task(self._monitor_workers())
2025-04-30 16:57:46 +08:00
logger.info(f"工作线程管理器启动成功,初始工作线程数: {initial_count}")
async def stop(self) -> None:
"""
停止工作线程管理器
"""
if not self.is_running:
logger.warning("工作线程管理器未在运行")
return
self.is_running = False
# 取消监控任务
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
self.monitor_task = None
# 取消所有工作线程
for worker_id, worker in list(self.workers.items()):
await self.remove_worker(worker_id)
logger.info("工作线程管理器已停止")
async def add_worker(self) -> int:
"""
添加工作线程
Returns:
int: 工作线程ID
"""
if len(self.workers) >= self.max_workers:
logger.warning(f"已达到最大工作线程数 {self.max_workers}")
return -1
# 生成新的工作线程ID
worker_id = 0
while worker_id in self.workers:
worker_id += 1
# 创建工作线程
worker = asyncio.create_task(self.worker_factory(worker_id))
# 记录工作线程
self.workers[worker_id] = worker
self.worker_status[worker_id] = {
"state": "running",
"start_time": datetime.now(),
"last_activity": datetime.now(),
"task_count": 0
}
self.worker_heartbeats[worker_id] = datetime.now()
logger.info(f"添加工作线程 {worker_id}, 当前工作线程数: {len(self.workers)}")
return worker_id
async def remove_worker(self, worker_id: int) -> bool:
"""
移除工作线程
Args:
worker_id: 工作线程ID
Returns:
bool: 是否成功移除
"""
if worker_id not in self.workers:
logger.warning(f"工作线程 {worker_id} 不存在")
return False
# 取消工作线程
worker = self.workers.pop(worker_id)
worker.cancel()
try:
await worker
except asyncio.CancelledError:
pass
# 移除状态记录
self.worker_status.pop(worker_id, None)
self.worker_heartbeats.pop(worker_id, None)
logger.info(f"移除工作线程 {worker_id}, 当前工作线程数: {len(self.workers)}")
return True
def update_worker_heartbeat(self, worker_id: int) -> None:
"""
更新工作线程心跳
Args:
worker_id: 工作线程ID
"""
if worker_id in self.worker_heartbeats:
self.worker_heartbeats[worker_id] = datetime.now()
self.worker_status[worker_id]["last_activity"] = datetime.now()
def update_worker_status(self, worker_id: int, status_update: Dict[str, Any]) -> None:
"""
更新工作线程状态
Args:
worker_id: 工作线程ID
status_update: 状态更新字典
"""
if worker_id in self.worker_status:
self.worker_status[worker_id].update(status_update)
self.update_worker_heartbeat(worker_id)
async def _monitor_workers(self) -> None:
"""
监控工作线程
检查心跳资源使用情况自动扩缩容
"""
logger.info("工作线程监控任务启动")
while self.is_running:
try:
# 检查工作线程心跳
now = datetime.now()
for worker_id, last_heartbeat in list(self.worker_heartbeats.items()):
if (now - last_heartbeat).total_seconds() > self.worker_heartbeat_interval * 2:
2025-05-12 15:43:21 +08:00
if self.worker_status[worker_id].get("current_task",None) is None:
logger.info(f"工作线程 {worker_id} 心跳超时,重启中...")
# 重启工作线程
await self.remove_worker(worker_id)
await self.add_worker()
else:
logger.info(f"工作线程 {worker_id} 心跳超时,但当前有任务,不重启")
2025-04-30 16:57:46 +08:00
# 自动扩缩容
if (now - self.last_auto_scale_time).total_seconds() > self.auto_scale_interval:
await self._auto_scale()
self.last_auto_scale_time = now
# 休眠一段时间
2025-05-12 15:43:21 +08:00
await asyncio.sleep(self.worker_heartbeat_interval)
2025-04-30 16:57:46 +08:00
except asyncio.CancelledError:
# 取消异常,退出循环
logger.info("工作线程监控任务被取消")
break
except Exception as e:
logger.error(f"工作线程监控任务异常: {str(e)}")
# 出现异常时短暂休眠,避免频繁错误
await asyncio.sleep(5.0)
logger.info("工作线程监控任务结束")
2025-05-12 15:43:21 +08:00
async def _check_workers(self, flag: bool = False) -> List[int]:
"""
检查指定的工作线程
"""
worker_ids = []
unused_worker_ids = []
for worker_id, _ in list(self.worker_heartbeats.items()):
if self.worker_status[worker_id].get("current_task",None) is None:
unused_worker_ids.append(worker_id)
else:
worker_ids.append(worker_id)
if flag:
return unused_worker_ids
else:
return worker_ids
2025-04-30 16:57:46 +08:00
2025-05-12 15:43:21 +08:00
async def _delete_unused_workers(self) -> bool:
"""
删除空闲工作线程
"""
import random
worker_ids = await self._check_workers(flag=True)
if len(worker_ids) > 0:
worker_id = random.choice(worker_ids)
await self.remove_worker(worker_id)
logger.info(f"移除未使用的空闲工作线程 {worker_id}")
return True
else:
logger.info("没有未使用的空闲工作线程")
return False
def _calculate_threads_to_add(self, queue_size: int, current_workers: int) -> int:
"""
根据队列大小和当前工作线程数计算需要增加的线程数
Args:
queue_size: 队列中的任务数量
current_workers: 当前正在运行任务的线程数
Returns:
int: 需要增加的线程数
"""
total_tasks = queue_size + current_workers
current_pool_size = len(self.workers)
remaining_capacity = self.max_workers - current_pool_size
if remaining_capacity <= 0:
return 0
# 计算任务与线程的比率
task_thread_ratio = total_tasks / max(1, current_pool_size)
# 根据任务与线程比率动态计算增加的线程数
if task_thread_ratio > 3.0:
# 负载非常高,尝试增加更多线程
threads_to_add = min(remaining_capacity, max(3, total_tasks // 3))
elif task_thread_ratio > 2.0:
# 中等负载,适量增加线程
threads_to_add = min(remaining_capacity, max(2, total_tasks // 4))
else:
# 轻微负载,小幅增加线程
threads_to_add = min(remaining_capacity, 1)
return threads_to_add
2025-04-30 16:57:46 +08:00
async def _auto_scale(self) -> None:
"""
自动扩缩容
根据系统资源使用情况和任务队列长度自动调整工作线程数量
"""
try:
2025-05-12 15:43:21 +08:00
current_workers = len(await self._check_workers(flag=False))
2025-04-30 16:57:46 +08:00
cpu_percent = psutil.cpu_percent()
2025-05-12 15:43:21 +08:00
# 获取系统资源使用情况
memory_percent = psutil.virtual_memory().percent # 获取系统的虚拟内存(包括物理内存和交换内存)的使用率
2025-04-30 16:57:46 +08:00
# 获取任务队列长度(需要外部传入或通过其他方式获取)
queue_size = self.get_queue_size()
# 根据资源使用情况和队列长度决定是否调整工作线程数量
if cpu_percent > self.cpu_threshold or memory_percent > self.memory_threshold:
# 资源使用率过高,减少工作线程
2025-05-12 15:43:21 +08:00
# 移除最后添加的工作线程
if len(self.workers) > self.min_workers:
if await self._delete_unused_workers():
logger.info(f"资源使用率过高(CPU:{cpu_percent}%, MEM:{memory_percent}%), 减少工作线程至 {current_workers-1}")
elif queue_size+current_workers > len(self.workers):
# 队列任务较多,动态增加工作线程
threads_to_add = self._calculate_threads_to_add(queue_size, current_workers)
if threads_to_add > 0:
for _ in range(threads_to_add):
await self.add_worker()
logger.info(f"队列任务较多(size:{queue_size}, running:{current_workers}), "
f"动态增加{threads_to_add}个工作线程至 {len(self.workers)}")
elif queue_size == 0 and len(self.workers) > self.min_workers:
2025-04-30 16:57:46 +08:00
# 队列为空,减少工作线程
2025-05-12 15:43:21 +08:00
if await self._delete_unused_workers():
logger.info(f"队列为空,减少工作线程至 {current_workers-1}")
2025-04-30 16:57:46 +08:00
except Exception as e:
logger.error(f"自动扩缩容异常: {str(e)}")
def get_queue_size(self) -> int:
"""
获取任务队列长度
默认实现需要在子类中重写或设置外部获取方法
Returns:
int: 队列长度
"""
# 此方法应由子类重写或设置外部方法
2025-05-12 15:43:21 +08:00
return sum(self.queue_manager.get_queue_sizes())
2025-04-30 16:57:46 +08:00
def set_queue_size_getter(self, getter: Callable[[], int]) -> None:
"""
设置队列长度获取方法
Args:
getter: 队列长度获取函数
"""
self.get_queue_size = getter
def get_worker_count(self) -> int:
"""
获取当前工作线程数量
Returns:
int: 工作线程数量
"""
return len(self.workers)
def get_worker_status(self) -> Dict[str, Any]:
"""
获取工作线程状态信息
Returns:
Dict[str, Any]: 工作线程状态
"""
return {
"worker_count": len(self.workers),
"min_workers": self.min_workers,
"max_workers": self.max_workers,
"workers": self.worker_status,
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent
}