2025-07-14 10:29:37 +08:00

342 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 所有AP开头的动作点
ap_action_points = ['AP151', 'AP152', 'AP153', 'AP154', 'AP155', 'AP166', 'AP167', 'AP168', 'AP169', 'AP171', 'AP172', 'AP173', 'AP1', 'AP2', 'AP3', 'AP4', 'AP5', 'AP6', 'AP7', 'AP8', 'AP9', 'AP94', 'AP96', 'AP98', 'AP99', 'AP100', 'AP10', 'AP11', 'AP12', 'AP129', 'AP174', 'AP142', 'AP176', 'AP177', 'AP178', 'AP179']
# 5个任务ID按顺序循环使用
task_ids = ['01dc17c1-3849-46f5-ae7c-a30b7734eaf6',
'3f21c538-10a8-4d8a-bd25-5626d13271a8',
'3f66fd28-13b7-4452-8beb-46ca1a3d364d',
'4508c91f-9fde-406d-805e-49c1205a4f5e',
'4e529db3-507a-40ec-b5c6-f3c2b60ea34c',
'5a33fd9d-86f5-45c4-8a2f-5db60c80f777',
'61aae3d1-bd2e-477e-b768-01358ce10fde',
'80a967f4-9359-4766-9b5a-52163572855f',
'81a3d966-9327-4eae-867b-7253d0ce1d3b',
'9e3f18b6-605a-4b7d-a857-9383e91624d5',
'b470013e-a7c0-4d46-881f-37e32208657b',
'd5377f04-8881-42fb-870f-c9c34d4f6869',
'dd84ae2f-2223-4d29-bcd9-737166d97ff7']
print(f"总共有 {len(ap_action_points)} 个AP开头的动作点:")
for i, ap in enumerate(ap_action_points, 1):
print(f"{i}. {ap}")
print(f"\n任务ID列表 ({len(task_ids)} 个):")
for i, task_id in enumerate(task_ids, 1):
print(f"{i}. {task_id}")
# HTTP请求代码
import requests
import json
import time
import random
import threading
from typing import Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
class VWEDTaskManager:
"""VWED任务管理器 - 并发模式"""
def __init__(self, base_url: str = "http://192.168.189.80:8000", action_points: list = None, task_ids: list = None):
self.base_url = base_url
self.action_points = action_points or ap_action_points
self.task_ids = task_ids or []
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"x-tenant-id": "1000",
"x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3NTEzMjU4MDksInVzZXJuYW1lIjoiYWRtaW4ifQ.VMDCQhLMqbFKyXVeEwWha_jcqjW7PSG6Wn7ROeovsfs"
}
# 并发控制
self.lock = threading.Lock()
self.running_tasks = {} # {任务ID: {task_record_id, start_time, start_point, end_point}}
self.completed_tasks = 0
self.failed_tasks = 0
self.total_tasks_to_run = 0
self.stop_execution = False
def generate_random_route(self) -> tuple:
"""生成随机路径 - 确保不在同一库区"""
# 定义库区
zone_a = [f'AP{i}' for i in range(1, 13)] # AP1-AP12 库区A
zone_b = [f'AP{i}' for i in range(166, 180)] # AP166-AP179 库区B
# 随机选择开始点
start_point = random.choice(self.action_points)
# 根据开始点所在库区选择结束点
if start_point in zone_a:
# 开始点在库区A结束点必须在库区A之外
available_end_points = [ap for ap in self.action_points if ap not in zone_a]
elif start_point in zone_b:
# 开始点在库区B结束点必须在库区B之外
available_end_points = [ap for ap in self.action_points if ap not in zone_b]
else:
# 开始点不在任何库区,结束点可以是任意点(但不能相同)
available_end_points = [ap for ap in self.action_points if ap != start_point]
# 如果没有可用的结束点,回退到原逻辑
if not available_end_points:
available_end_points = [ap for ap in self.action_points if ap != start_point]
end_point = random.choice(available_end_points)
return start_point, end_point
def send_task_request(self, start_point: str, end_point: str, task_id: str) -> Optional[Dict[str, Any]]:
"""发送VWED任务请求"""
url = f"{self.base_url}/api/vwed-task-edit/run"
data = {
"taskId": task_id,
"source_type": 1,
"source_system": "SYSTEM",
"source_device": "AB2223Ndsa1",
"params": [
{
"name": "start",
"type": "STRING",
"label": "开始点",
"defaultValue": start_point
},
{
"name": "end",
"type": "STRING",
"label": "结束点",
"defaultValue": end_point
}
]
}
try:
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] 🚀 发送任务: {start_point} -> {end_point} (ID: {task_id[:8]}...)")
response = requests.post(url, headers=self.headers, json=data, timeout=30)
if response.status_code == 200:
result = response.json()
task_record_id = result['data']['taskRecordId']
print(f"[{current_time}] ✅ 任务模板启动成功! 记录ID: {task_record_id} 任务ID: {task_id}")
return result
else:
print(f"[{current_time}] ❌ 任务模板启动失败,状态码: {response.status_code} 响应: {response.json()}")
return None
except requests.exceptions.RequestException as e:
print(f"任务模板请求出错: [{current_time}] ❌ 请求出错: {e}")
return None
def get_task_detail(self, task_record_id: str) -> Optional[Dict[str, Any]]:
"""获取任务执行详情"""
url = f"{self.base_url}/api/vwed-task-record/detail/{task_record_id}"
try:
response = requests.get(url, headers=self.headers, timeout=30)
if response.status_code == 200:
return response.json()
return None
except requests.exceptions.RequestException:
return None
def monitor_task(self, task_id: str, task_record_id: str, start_point: str, end_point: str):
"""监控单个任务的执行状态"""
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] 🔍 开始监控任务 {task_id[:8]}... -> {task_record_id}")
# 记录任务信息
with self.lock:
self.running_tasks[task_id] = {
'task_record_id': task_record_id,
'start_time': time.time(),
'start_point': start_point,
'end_point': end_point
}
# 监控任务状态
max_wait_time = 300*5 # 5分钟超时
check_interval = 3 # 3秒检查一次
start_time = time.time()
while time.time() - start_time < max_wait_time and not self.stop_execution:
task_detail = self.get_task_detail(task_record_id)
if task_detail and task_detail.get('code') == 200:
data = task_detail['data']
status = data['status']
# 任务完成
if status == 1000: # 成功完成
with self.lock:
if task_id in self.running_tasks:
del self.running_tasks[task_id]
self.completed_tasks += 1
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] ✅ 任务完成! {start_point}->{end_point} (ID: {task_id[:8]}...) 总计: {self.completed_tasks}成功/{self.failed_tasks}失败")
# 立即发送下一个任务
self.send_next_task(task_id)
return True
elif status in [2000, 2001]: # 失败或取消
with self.lock:
if task_id in self.running_tasks:
del self.running_tasks[task_id]
self.failed_tasks += 1
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] ❌ 任务失败! {start_point}->{end_point} (ID: {task_id[:8]}...) 状态: {status} 详情: {task_detail}")
# 立即发送下一个任务
self.send_next_task(task_id)
return False
elif status in [1002, 1001]: # 运行中
time.sleep(check_interval)
continue
else:
time.sleep(check_interval)
continue
else:
time.sleep(check_interval)
continue
# 超时处理
with self.lock:
if task_id in self.running_tasks:
del self.running_tasks[task_id]
self.failed_tasks += 1
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] ⏰ 任务超时! {start_point}->{end_point} (ID: {task_id[:8]}...)")
# 发送下一个任务
self.send_next_task(task_id)
return False
def send_next_task(self, task_id: str):
"""发送下一个任务"""
# 检查是否还需要继续执行
with self.lock:
if self.stop_execution or (self.completed_tasks + self.failed_tasks) >= self.total_tasks_to_run:
return
# 生成随机路径
start_point, end_point = self.generate_random_route()
# 发送任务请求
result = self.send_task_request(start_point, end_point, task_id)
if result:
task_record_id = result['data']['taskRecordId']
# 在新线程中监控这个任务
threading.Thread(
target=self.monitor_task,
args=(task_id, task_record_id, start_point, end_point),
daemon=True
).start()
else:
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] ❌ 任务发送失败将在3秒后重试 (ID: {task_id[:8]}...)")
# 3秒后重试
def retry_send():
time.sleep(3)
self.send_next_task(task_id)
threading.Thread(target=retry_send, daemon=True).start()
def execute_concurrent_tasks(self, total_tasks: int = 100):
"""并发执行任务 - 始终保持5个任务同时运行"""
# if len(self.task_ids) != 5:
# raise ValueError("必须有5个任务ID才能执行并发任务")
self.total_tasks_to_run = total_tasks
self.completed_tasks = 0
self.failed_tasks = 0
self.stop_execution = False
print("\n" + "="*80)
print("🚀 开始并发执行任务")
print(f"📊 目标任务数: {total_tasks}")
print(f"🔄 并发任务数: 5个 (每个任务ID一个)")
print(f"🎯 可用动作点: {len(self.action_points)}")
print("📋 执行策略: 任务完成后立即补充新任务保持5个任务同时运行")
print("="*80)
# 初始发送5个任务
print("\n🎯 发送初始的" + str(len(self.task_ids)) + "个任务...")
for i, task_id in enumerate(self.task_ids):
start_point, end_point = self.generate_random_route()
result = self.send_task_request(start_point, end_point, task_id)
if result:
task_record_id = result['data']['taskRecordId']
# 在新线程中监控这个任务
threading.Thread(
target=self.monitor_task,
args=(task_id, task_record_id, start_point, end_point),
daemon=True
).start()
time.sleep(0.5) # 稍微延时避免请求过快
else:
print(f"❌ 初始任务 {i+1} 发送失败")
print(f"\n✅ 初始任务发送完成,开始监控执行...")
# 监控总体执行状态
try:
while True:
time.sleep(5) # 每5秒检查一次总体状态
with self.lock:
total_finished = self.completed_tasks + self.failed_tasks
running_count = len(self.running_tasks)
current_time = datetime.now().strftime("%H:%M:%S")
print(f"[{current_time}] 📊 状态更新: {self.completed_tasks}成功/{self.failed_tasks}失败/{total_finished}总完成 | 运行中: {running_count}")
# 检查是否完成
if total_finished >= total_tasks:
self.stop_execution = True
print(f"\n🏁 所有任务执行完成!")
break
except KeyboardInterrupt:
print("\n⚠️ 用户中断执行")
self.stop_execution = True
# 等待所有线程结束
time.sleep(2)
# 最终统计
print("\n" + "="*80)
print("🏁 任务执行完成!")
print(f"📊 最终统计:")
print(f" ✅ 成功任务: {self.completed_tasks}")
print(f" ❌ 失败任务: {self.failed_tasks}")
print(f" 📈 总计任务: {self.completed_tasks + self.failed_tasks}")
print(f" 📊 成功率: {(self.completed_tasks/(self.completed_tasks + self.failed_tasks))*100:.1f}%")
print("="*80)
# 创建任务管理器传入任务ID列表
task_manager = VWEDTaskManager(action_points=ap_action_points, task_ids=task_ids)
# 调用函数执行并发随机任务
if __name__ == "__main__":
print("\n" + "="*80)
print("🚀 VWED任务管理系统 - 并发执行模式")
print("📋 特点:同时运行" + str(len(task_ids)) + "个任务,任务完成后立即补充新任务")
print("🔄 策略:保持" + str(len(task_ids)) + "个任务ID始终都有任务在运行防止空闲")
# 并发执行任务
# 同时运行5个任务总共执行50个任务
task_manager.execute_concurrent_tasks(total_tasks=len(task_ids)*10)