342 lines
14 KiB
Python
342 lines
14 KiB
Python
# 所有AP开头的动作点
|
||
ap_action_points = ['AP151', 'AP152', 'AP153', 'AP154', 'AP155', 'AP166', 'AP167', 'AP168', 'AP169', 'AP171', 'AP172', 'AP173', 'AP1', 'AP2', 'AP3', 'AP4', 'AP5', 'AP6', 'AP7', 'AP8', 'AP9', 'AP94', 'AP96', 'AP98', 'AP99', 'AP100', 'AP10', 'AP11', 'AP12', 'AP129', 'AP174', 'AP142', 'AP176', 'AP177', 'AP178', 'AP179']
|
||
|
||
# 5个任务ID,按顺序循环使用
|
||
task_ids = ['01dc17c1-3849-46f5-ae7c-a30b7734eaf6',
|
||
'3f21c538-10a8-4d8a-bd25-5626d13271a8',
|
||
'3f66fd28-13b7-4452-8beb-46ca1a3d364d',
|
||
'4508c91f-9fde-406d-805e-49c1205a4f5e',
|
||
'4e529db3-507a-40ec-b5c6-f3c2b60ea34c',
|
||
'5a33fd9d-86f5-45c4-8a2f-5db60c80f777',
|
||
'61aae3d1-bd2e-477e-b768-01358ce10fde',
|
||
'80a967f4-9359-4766-9b5a-52163572855f',
|
||
'81a3d966-9327-4eae-867b-7253d0ce1d3b',
|
||
'9e3f18b6-605a-4b7d-a857-9383e91624d5',
|
||
'b470013e-a7c0-4d46-881f-37e32208657b',
|
||
'd5377f04-8881-42fb-870f-c9c34d4f6869',
|
||
'dd84ae2f-2223-4d29-bcd9-737166d97ff7']
|
||
|
||
print(f"总共有 {len(ap_action_points)} 个AP开头的动作点:")
|
||
for i, ap in enumerate(ap_action_points, 1):
|
||
print(f"{i}. {ap}")
|
||
|
||
print(f"\n任务ID列表 ({len(task_ids)} 个):")
|
||
for i, task_id in enumerate(task_ids, 1):
|
||
print(f"{i}. {task_id}")
|
||
|
||
# HTTP请求代码
|
||
import requests
|
||
import json
|
||
import time
|
||
import random
|
||
import threading
|
||
from typing import Dict, Any, Optional
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from datetime import datetime
|
||
|
||
class VWEDTaskManager:
|
||
"""VWED任务管理器 - 并发模式"""
|
||
|
||
def __init__(self, base_url: str = "http://192.168.189.80:8000", action_points: list = None, task_ids: list = None):
|
||
self.base_url = base_url
|
||
self.action_points = action_points or ap_action_points
|
||
self.task_ids = task_ids or []
|
||
self.headers = {
|
||
"Content-Type": "application/json",
|
||
"Accept": "application/json",
|
||
"x-tenant-id": "1000",
|
||
"x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NTEzMjU4MDksInVzZXJuYW1lIjoiYWRtaW4ifQ.VMDCQhLMqbFKyXVeEwWha_jcqjW7PSG6Wn7ROeovsfs"
|
||
}
|
||
|
||
# 并发控制
|
||
self.lock = threading.Lock()
|
||
self.running_tasks = {} # {任务ID: {task_record_id, start_time, start_point, end_point}}
|
||
self.completed_tasks = 0
|
||
self.failed_tasks = 0
|
||
self.total_tasks_to_run = 0
|
||
self.stop_execution = False
|
||
|
||
def generate_random_route(self) -> tuple:
|
||
"""生成随机路径 - 确保不在同一库区"""
|
||
# 定义库区
|
||
zone_a = [f'AP{i}' for i in range(1, 13)] # AP1-AP12 库区A
|
||
zone_b = [f'AP{i}' for i in range(166, 180)] # AP166-AP179 库区B
|
||
|
||
# 随机选择开始点
|
||
start_point = random.choice(self.action_points)
|
||
|
||
# 根据开始点所在库区选择结束点
|
||
if start_point in zone_a:
|
||
# 开始点在库区A,结束点必须在库区A之外
|
||
available_end_points = [ap for ap in self.action_points if ap not in zone_a]
|
||
elif start_point in zone_b:
|
||
# 开始点在库区B,结束点必须在库区B之外
|
||
available_end_points = [ap for ap in self.action_points if ap not in zone_b]
|
||
else:
|
||
# 开始点不在任何库区,结束点可以是任意点(但不能相同)
|
||
available_end_points = [ap for ap in self.action_points if ap != start_point]
|
||
|
||
# 如果没有可用的结束点,回退到原逻辑
|
||
if not available_end_points:
|
||
available_end_points = [ap for ap in self.action_points if ap != start_point]
|
||
|
||
end_point = random.choice(available_end_points)
|
||
|
||
return start_point, end_point
|
||
|
||
def send_task_request(self, start_point: str, end_point: str, task_id: str) -> Optional[Dict[str, Any]]:
|
||
"""发送VWED任务请求"""
|
||
|
||
url = f"{self.base_url}/api/vwed-task-edit/run"
|
||
|
||
data = {
|
||
"taskId": task_id,
|
||
"source_type": 1,
|
||
"source_system": "SYSTEM",
|
||
"source_device": "AB2223Ndsa1",
|
||
"params": [
|
||
{
|
||
"name": "start",
|
||
"type": "STRING",
|
||
"label": "开始点",
|
||
"defaultValue": start_point
|
||
},
|
||
{
|
||
"name": "end",
|
||
"type": "STRING",
|
||
"label": "结束点",
|
||
"defaultValue": end_point
|
||
}
|
||
]
|
||
}
|
||
|
||
try:
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] 🚀 发送任务: {start_point} -> {end_point} (ID: {task_id[:8]}...)")
|
||
|
||
response = requests.post(url, headers=self.headers, json=data, timeout=30)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
task_record_id = result['data']['taskRecordId']
|
||
print(f"[{current_time}] ✅ 任务模板启动成功! 记录ID: {task_record_id} 任务ID: {task_id}")
|
||
return result
|
||
else:
|
||
print(f"[{current_time}] ❌ 任务模板启动失败,状态码: {response.status_code} 响应: {response.json()}")
|
||
return None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"任务模板请求出错: [{current_time}] ❌ 请求出错: {e}")
|
||
return None
|
||
|
||
def get_task_detail(self, task_record_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取任务执行详情"""
|
||
|
||
url = f"{self.base_url}/api/vwed-task-record/detail/{task_record_id}"
|
||
|
||
try:
|
||
response = requests.get(url, headers=self.headers, timeout=30)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
return None
|
||
except requests.exceptions.RequestException:
|
||
return None
|
||
|
||
def monitor_task(self, task_id: str, task_record_id: str, start_point: str, end_point: str):
|
||
"""监控单个任务的执行状态"""
|
||
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] 🔍 开始监控任务 {task_id[:8]}... -> {task_record_id}")
|
||
|
||
# 记录任务信息
|
||
with self.lock:
|
||
self.running_tasks[task_id] = {
|
||
'task_record_id': task_record_id,
|
||
'start_time': time.time(),
|
||
'start_point': start_point,
|
||
'end_point': end_point
|
||
}
|
||
|
||
# 监控任务状态
|
||
max_wait_time = 300*5 # 5分钟超时
|
||
check_interval = 3 # 3秒检查一次
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < max_wait_time and not self.stop_execution:
|
||
task_detail = self.get_task_detail(task_record_id)
|
||
|
||
if task_detail and task_detail.get('code') == 200:
|
||
data = task_detail['data']
|
||
status = data['status']
|
||
|
||
# 任务完成
|
||
if status == 1000: # 成功完成
|
||
with self.lock:
|
||
if task_id in self.running_tasks:
|
||
del self.running_tasks[task_id]
|
||
self.completed_tasks += 1
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] ✅ 任务完成! {start_point}->{end_point} (ID: {task_id[:8]}...) 总计: {self.completed_tasks}成功/{self.failed_tasks}失败")
|
||
|
||
# 立即发送下一个任务
|
||
self.send_next_task(task_id)
|
||
return True
|
||
|
||
elif status in [2000, 2001]: # 失败或取消
|
||
with self.lock:
|
||
if task_id in self.running_tasks:
|
||
del self.running_tasks[task_id]
|
||
self.failed_tasks += 1
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] ❌ 任务失败! {start_point}->{end_point} (ID: {task_id[:8]}...) 状态: {status} 详情: {task_detail}")
|
||
|
||
# 立即发送下一个任务
|
||
self.send_next_task(task_id)
|
||
return False
|
||
|
||
elif status in [1002, 1001]: # 运行中
|
||
time.sleep(check_interval)
|
||
continue
|
||
else:
|
||
time.sleep(check_interval)
|
||
continue
|
||
else:
|
||
time.sleep(check_interval)
|
||
continue
|
||
|
||
# 超时处理
|
||
with self.lock:
|
||
if task_id in self.running_tasks:
|
||
del self.running_tasks[task_id]
|
||
self.failed_tasks += 1
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] ⏰ 任务超时! {start_point}->{end_point} (ID: {task_id[:8]}...)")
|
||
|
||
# 发送下一个任务
|
||
self.send_next_task(task_id)
|
||
return False
|
||
|
||
def send_next_task(self, task_id: str):
|
||
"""发送下一个任务"""
|
||
|
||
# 检查是否还需要继续执行
|
||
with self.lock:
|
||
if self.stop_execution or (self.completed_tasks + self.failed_tasks) >= self.total_tasks_to_run:
|
||
return
|
||
|
||
# 生成随机路径
|
||
start_point, end_point = self.generate_random_route()
|
||
|
||
# 发送任务请求
|
||
result = self.send_task_request(start_point, end_point, task_id)
|
||
|
||
if result:
|
||
task_record_id = result['data']['taskRecordId']
|
||
|
||
# 在新线程中监控这个任务
|
||
threading.Thread(
|
||
target=self.monitor_task,
|
||
args=(task_id, task_record_id, start_point, end_point),
|
||
daemon=True
|
||
).start()
|
||
else:
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] ❌ 任务发送失败,将在3秒后重试 (ID: {task_id[:8]}...)")
|
||
|
||
# 3秒后重试
|
||
def retry_send():
|
||
time.sleep(3)
|
||
self.send_next_task(task_id)
|
||
|
||
threading.Thread(target=retry_send, daemon=True).start()
|
||
|
||
def execute_concurrent_tasks(self, total_tasks: int = 100):
|
||
"""并发执行任务 - 始终保持5个任务同时运行"""
|
||
|
||
# if len(self.task_ids) != 5:
|
||
# raise ValueError("必须有5个任务ID才能执行并发任务")
|
||
|
||
self.total_tasks_to_run = total_tasks
|
||
self.completed_tasks = 0
|
||
self.failed_tasks = 0
|
||
self.stop_execution = False
|
||
|
||
print("\n" + "="*80)
|
||
print("🚀 开始并发执行任务")
|
||
print(f"📊 目标任务数: {total_tasks}")
|
||
print(f"🔄 并发任务数: 5个 (每个任务ID一个)")
|
||
print(f"🎯 可用动作点: {len(self.action_points)} 个")
|
||
print("📋 执行策略: 任务完成后立即补充新任务,保持5个任务同时运行")
|
||
print("="*80)
|
||
|
||
# 初始发送5个任务
|
||
print("\n🎯 发送初始的" + str(len(self.task_ids)) + "个任务...")
|
||
for i, task_id in enumerate(self.task_ids):
|
||
start_point, end_point = self.generate_random_route()
|
||
result = self.send_task_request(start_point, end_point, task_id)
|
||
|
||
if result:
|
||
task_record_id = result['data']['taskRecordId']
|
||
|
||
# 在新线程中监控这个任务
|
||
threading.Thread(
|
||
target=self.monitor_task,
|
||
args=(task_id, task_record_id, start_point, end_point),
|
||
daemon=True
|
||
).start()
|
||
|
||
time.sleep(0.5) # 稍微延时避免请求过快
|
||
else:
|
||
print(f"❌ 初始任务 {i+1} 发送失败")
|
||
|
||
print(f"\n✅ 初始任务发送完成,开始监控执行...")
|
||
|
||
# 监控总体执行状态
|
||
try:
|
||
while True:
|
||
time.sleep(5) # 每5秒检查一次总体状态
|
||
|
||
with self.lock:
|
||
total_finished = self.completed_tasks + self.failed_tasks
|
||
running_count = len(self.running_tasks)
|
||
|
||
current_time = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{current_time}] 📊 状态更新: {self.completed_tasks}成功/{self.failed_tasks}失败/{total_finished}总完成 | 运行中: {running_count}个")
|
||
|
||
# 检查是否完成
|
||
if total_finished >= total_tasks:
|
||
self.stop_execution = True
|
||
print(f"\n🏁 所有任务执行完成!")
|
||
break
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ 用户中断执行")
|
||
self.stop_execution = True
|
||
|
||
# 等待所有线程结束
|
||
time.sleep(2)
|
||
|
||
# 最终统计
|
||
print("\n" + "="*80)
|
||
print("🏁 任务执行完成!")
|
||
print(f"📊 最终统计:")
|
||
print(f" ✅ 成功任务: {self.completed_tasks}")
|
||
print(f" ❌ 失败任务: {self.failed_tasks}")
|
||
print(f" 📈 总计任务: {self.completed_tasks + self.failed_tasks}")
|
||
print(f" 📊 成功率: {(self.completed_tasks/(self.completed_tasks + self.failed_tasks))*100:.1f}%")
|
||
print("="*80)
|
||
|
||
# 创建任务管理器,传入任务ID列表
|
||
task_manager = VWEDTaskManager(action_points=ap_action_points, task_ids=task_ids)
|
||
|
||
# 调用函数执行并发随机任务
|
||
if __name__ == "__main__":
|
||
print("\n" + "="*80)
|
||
print("🚀 VWED任务管理系统 - 并发执行模式")
|
||
print("📋 特点:同时运行" + str(len(task_ids)) + "个任务,任务完成后立即补充新任务")
|
||
print("🔄 策略:保持" + str(len(task_ids)) + "个任务ID始终都有任务在运行,防止空闲")
|
||
|
||
# 并发执行任务
|
||
# 同时运行5个任务,总共执行50个任务
|
||
task_manager.execute_concurrent_tasks(total_tasks=len(task_ids)*10)
|
||
|