2206 lines
106 KiB
Python
2206 lines
106 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
呼叫器设备服务模块
|
||
提供呼叫器设备相关的服务方法
|
||
"""
|
||
|
||
import uuid
|
||
import time
|
||
import threading
|
||
import asyncio
|
||
import aiohttp
|
||
import json
|
||
import os
|
||
import datetime
|
||
from typing import Dict, List, Any, Optional, Union
|
||
from sqlalchemy import select, func, create_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
import requests
|
||
|
||
from data.models.calldevice import VWEDCallDevice, VWEDCallDeviceButton
|
||
from data.models.taskdef import VWEDTaskDef
|
||
from data.models.taskrecord import VWEDTaskRecord
|
||
from data.session import get_async_session, get_session
|
||
from utils.logger import get_logger
|
||
from data.enum.call_device_enum import CallDeviceStatus, CallDeviceButtonStatus, CallDeviceButtonType
|
||
from data.enum.task_record_enum import TaskStatus
|
||
from config.settings import settings
|
||
# 设置日志
|
||
logger = get_logger("service.calldevice_service")
|
||
|
||
# 存储设备监控线程的字典
|
||
device_monitor_threads = {}
|
||
# 存储设备监控状态的字典
|
||
device_monitor_status = {}
|
||
# 线程锁,用于线程安全操作
|
||
thread_lock = threading.Lock()
|
||
# 添加按钮级别的任务执行锁定状态字典
|
||
device_button_locks = {}
|
||
|
||
class CallDeviceService:
|
||
"""
|
||
呼叫器设备服务类
|
||
提供与呼叫器设备相关的方法
|
||
"""
|
||
|
||
@staticmethod
|
||
async def add_call_device(
|
||
protocol: str,
|
||
brand: str,
|
||
ip: str,
|
||
port: int,
|
||
device_name: str,
|
||
status: int = CallDeviceStatus.START,
|
||
slave_id: str = "1",
|
||
buttons: List[Dict[str, Any]] = [],
|
||
api_token: str = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
新增呼叫器设备
|
||
|
||
Args:
|
||
protocol: 协议类型
|
||
brand: 品牌
|
||
ip: IP地址
|
||
port: 端口号
|
||
device_name: 设备名称
|
||
status: 状态(0:禁用,1:启用)
|
||
slave_id: 从机ID
|
||
function_code: 功能码
|
||
buttons: 按钮配置列表
|
||
api_token: 请求头中的API令牌
|
||
Returns:
|
||
Dict: 包含新增结果的字典
|
||
"""
|
||
try:
|
||
async with get_async_session() as session:
|
||
# 检查IP是否已存在
|
||
check_ip_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.ip == ip) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
check_ip_result = await session.execute(check_ip_query)
|
||
if check_ip_result.scalars().first():
|
||
return {
|
||
"success": False,
|
||
"message": f"IP地址 '{ip}' 已存在"
|
||
}
|
||
|
||
# 验证任务ID是否存在
|
||
if buttons:
|
||
task_ids_to_check = set()
|
||
for button in buttons:
|
||
if button.get("vwed_task_id"):
|
||
task_ids_to_check.add(button.get("vwed_task_id"))
|
||
if button.get("long_vwed_task_id"):
|
||
task_ids_to_check.add(button.get("long_vwed_task_id"))
|
||
|
||
if task_ids_to_check:
|
||
# 查询任务定义表,检查任务ID是否存在
|
||
task_query = select(VWEDTaskDef.id).where(
|
||
(VWEDTaskDef.id.in_(list(task_ids_to_check))) &
|
||
(VWEDTaskDef.is_deleted == False)
|
||
)
|
||
task_result = await session.execute(task_query)
|
||
found_task_ids = {task_id for task_id, in task_result.all()}
|
||
|
||
# 找出不存在的任务ID
|
||
missing_task_ids = task_ids_to_check - found_task_ids
|
||
if missing_task_ids:
|
||
return {
|
||
"success": False,
|
||
"message": f"以下任务ID不存在: {', '.join(missing_task_ids)}"
|
||
}
|
||
|
||
# 创建呼叫器设备实例
|
||
device_id = "modbus_device_"+str(uuid.uuid4())
|
||
new_device = VWEDCallDevice(
|
||
id=device_id,
|
||
protocol=protocol,
|
||
brand=brand,
|
||
ip=ip,
|
||
port=port,
|
||
device_name=device_name,
|
||
status=status,
|
||
slave_id=slave_id,
|
||
)
|
||
|
||
# 添加设备
|
||
session.add(new_device)
|
||
|
||
# 添加按钮配置
|
||
button_list = []
|
||
if buttons:
|
||
for button in buttons:
|
||
button_id = str(uuid.uuid4())
|
||
new_button = VWEDCallDeviceButton(
|
||
id=button_id,
|
||
device_id=device_id,
|
||
signal_name=button.get("signal_name"),
|
||
signal_type=button.get("signal_type"),
|
||
signal_length=button.get("signal_length"),
|
||
register_address=button.get("register_address"),
|
||
function_code=button.get("function_code"),
|
||
remark=button.get("remark"),
|
||
vwed_task_id=button.get("vwed_task_id"),
|
||
long_vwed_task_id=button.get("long_vwed_task_id")
|
||
)
|
||
session.add(new_button)
|
||
button_list.append({
|
||
"id": button_id,
|
||
"signal_name": button.get("signal_name"),
|
||
"signal_type": button.get("signal_type"),
|
||
"signal_length": button.get("signal_length"),
|
||
"register_address": button.get("register_address"),
|
||
"function_code": button.get("function_code"),
|
||
"remark": button.get("remark"),
|
||
"vwed_task_id": button.get("vwed_task_id"),
|
||
"long_vwed_task_id": button.get("long_vwed_task_id")
|
||
})
|
||
|
||
# 提交事务
|
||
await session.commit()
|
||
if status == CallDeviceStatus.START:
|
||
# 启动设备监控
|
||
await CallDeviceService.start_device_monitor(device_id, api_token)
|
||
# 返回结果
|
||
return {
|
||
"success": True,
|
||
"message": "新增呼叫器设备成功",
|
||
"data": {
|
||
"id": device_id,
|
||
"protocol": protocol,
|
||
"brand": brand,
|
||
"ip": ip,
|
||
"port": port,
|
||
"device_name": device_name,
|
||
"status": status,
|
||
"slave_id": slave_id,
|
||
"buttons": button_list
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"新增呼叫器设备失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"新增呼叫器设备失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def update_call_device(
|
||
device_id: str,
|
||
protocol: str,
|
||
brand: str,
|
||
ip: str,
|
||
port: int,
|
||
device_name: str,
|
||
status: int = CallDeviceStatus.START,
|
||
slave_id: str = "1",
|
||
buttons: List[Dict[str, Any]] = []
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
更新呼叫器设备
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
protocol: 协议类型
|
||
brand: 品牌
|
||
ip: IP地址
|
||
port: 端口号
|
||
device_name: 设备名称
|
||
status: 状态(0:禁用,1:启用)
|
||
slave_id: 从机ID
|
||
function_code: 功能码
|
||
buttons: 按钮配置列表
|
||
|
||
Returns:
|
||
Dict: 包含更新结果的字典
|
||
"""
|
||
try:
|
||
async with get_async_session() as session:
|
||
# 检查设备是否存在
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id == device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
device = device_result.scalars().first()
|
||
|
||
if not device:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
|
||
}
|
||
|
||
# 检查IP是否已被其他设备使用
|
||
if device.ip != ip:
|
||
check_ip_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.ip == ip) &
|
||
(VWEDCallDevice.id != device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
check_ip_result = await session.execute(check_ip_query)
|
||
if check_ip_result.scalars().first():
|
||
return {
|
||
"success": False,
|
||
"message": f"IP地址 '{ip}' 已被其他设备使用"
|
||
}
|
||
|
||
# 验证任务ID是否存在
|
||
if buttons:
|
||
task_ids_to_check = set()
|
||
for button in buttons:
|
||
if button.get("vwed_task_id"):
|
||
task_ids_to_check.add(button.get("vwed_task_id"))
|
||
if button.get("long_vwed_task_id"):
|
||
task_ids_to_check.add(button.get("long_vwed_task_id"))
|
||
|
||
if task_ids_to_check:
|
||
# 查询任务定义表,检查任务ID是否存在
|
||
task_query = select(VWEDTaskDef.id).where(
|
||
(VWEDTaskDef.id.in_(list(task_ids_to_check))) &
|
||
(VWEDTaskDef.is_deleted == False)
|
||
)
|
||
task_result = await session.execute(task_query)
|
||
found_task_ids = {task_id for task_id, in task_result.all()}
|
||
|
||
# 找出不存在的任务ID
|
||
missing_task_ids = task_ids_to_check - found_task_ids
|
||
if missing_task_ids:
|
||
return {
|
||
"success": False,
|
||
"message": f"以下任务ID不存在: {', '.join(missing_task_ids)}"
|
||
}
|
||
|
||
# 更新设备信息
|
||
device.protocol = protocol
|
||
device.brand = brand
|
||
device.ip = ip
|
||
device.port = port
|
||
device.device_name = device_name
|
||
device.status = status
|
||
device.slave_id = slave_id
|
||
|
||
# 删除原有按钮(采用软删除方式)
|
||
button_delete_query = select(VWEDCallDeviceButton).where(
|
||
(VWEDCallDeviceButton.device_id == device_id) &
|
||
(VWEDCallDeviceButton.is_deleted == False)
|
||
)
|
||
button_delete_result = await session.execute(button_delete_query)
|
||
old_buttons = button_delete_result.scalars().all()
|
||
for old_button in old_buttons:
|
||
old_button.is_deleted = True
|
||
|
||
# 添加新按钮配置
|
||
button_list = []
|
||
if buttons:
|
||
for button in buttons:
|
||
button_id = str(uuid.uuid4())
|
||
new_button = VWEDCallDeviceButton(
|
||
id=button_id,
|
||
device_id=device_id,
|
||
signal_name=button.get("signal_name"),
|
||
signal_type=button.get("signal_type"),
|
||
signal_length=button.get("signal_length"),
|
||
register_address=button.get("register_address"),
|
||
function_code=button.get("function_code"),
|
||
remark=button.get("remark"),
|
||
vwed_task_id=button.get("vwed_task_id"),
|
||
long_vwed_task_id=button.get("long_vwed_task_id")
|
||
)
|
||
session.add(new_button)
|
||
button_list.append({
|
||
"id": button_id,
|
||
"signal_name": button.get("signal_name"),
|
||
"signal_type": button.get("signal_type"),
|
||
"signal_length": button.get("signal_length"),
|
||
"register_address": button.get("register_address"),
|
||
"function_code": button.get("function_code"),
|
||
"remark": button.get("remark"),
|
||
"vwed_task_id": button.get("vwed_task_id"),
|
||
"long_vwed_task_id": button.get("long_vwed_task_id")
|
||
})
|
||
|
||
# 提交事务
|
||
await session.commit()
|
||
|
||
# 返回结果
|
||
return {
|
||
"success": True,
|
||
"message": "更新呼叫器设备成功",
|
||
"data": {
|
||
"id": device_id,
|
||
"protocol": protocol,
|
||
"brand": brand,
|
||
"ip": ip,
|
||
"port": port,
|
||
"device_name": device_name,
|
||
"status": status,
|
||
"slave_id": slave_id,
|
||
"buttons": button_list
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新呼叫器设备失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"更新呼叫器设备失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def delete_call_device(device_ids: Union[str, List[str]]) -> Dict[str, Any]:
|
||
"""
|
||
删除呼叫器设备(软删除)
|
||
|
||
Args:
|
||
device_ids: 设备ID或设备ID列表
|
||
|
||
Returns:
|
||
Dict: 包含删除结果的字典
|
||
"""
|
||
try:
|
||
# 将单个ID转换为列表
|
||
if isinstance(device_ids, str):
|
||
device_ids = [device_ids]
|
||
|
||
if not device_ids:
|
||
return {
|
||
"success": False,
|
||
"message": "未提供设备ID"
|
||
}
|
||
|
||
async with get_async_session() as session:
|
||
# 检查设备是否存在
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id.in_(device_ids)) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
devices = device_result.scalars().all()
|
||
|
||
found_ids = [device.id for device in devices]
|
||
not_found_ids = list(set(device_ids) - set(found_ids))
|
||
|
||
if not_found_ids:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到以下ID的呼叫器设备: {', '.join(not_found_ids)}"
|
||
}
|
||
|
||
# 软删除设备
|
||
for device in devices:
|
||
device.is_deleted = True
|
||
|
||
# 软删除关联的按钮
|
||
button_query = select(VWEDCallDeviceButton).where(
|
||
(VWEDCallDeviceButton.device_id.in_(device_ids)) &
|
||
(VWEDCallDeviceButton.is_deleted == False)
|
||
)
|
||
button_result = await session.execute(button_query)
|
||
buttons = button_result.scalars().all()
|
||
|
||
for button in buttons:
|
||
button.is_deleted = True
|
||
|
||
# 提交事务
|
||
await session.commit()
|
||
|
||
# 返回结果
|
||
return {
|
||
"success": True,
|
||
"message": f"成功删除 {len(devices)} 个呼叫器设备",
|
||
"data": {
|
||
"ids": found_ids
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除呼叫器设备失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"删除呼叫器设备失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_call_device_list(
|
||
page: int = 1,
|
||
page_size: int = 10,
|
||
device_name: str = None,
|
||
ip: str = None,
|
||
protocol: str = None,
|
||
status: int = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
获取呼叫器设备列表
|
||
|
||
Args:
|
||
page: 页码
|
||
page_size: 每页数量
|
||
device_name: 设备名称搜索
|
||
ip: IP地址搜索
|
||
protocol: 协议类型搜索
|
||
status: 状态过滤(0:禁用,1:启用)
|
||
|
||
Returns:
|
||
Dict: 包含设备列表和分页信息的字典
|
||
"""
|
||
try:
|
||
async with get_async_session() as session:
|
||
# 构建查询 - 使用子查询计算按钮数量
|
||
button_count_subquery = (
|
||
select(
|
||
VWEDCallDeviceButton.device_id,
|
||
func.count(VWEDCallDeviceButton.id).label("button_count")
|
||
)
|
||
.where(VWEDCallDeviceButton.is_deleted == False)
|
||
.group_by(VWEDCallDeviceButton.device_id)
|
||
.subquery()
|
||
)
|
||
|
||
# 主查询
|
||
query = select(
|
||
VWEDCallDevice,
|
||
func.coalesce(button_count_subquery.c.button_count, 0).label("button_count")
|
||
).outerjoin(
|
||
button_count_subquery,
|
||
VWEDCallDevice.id == button_count_subquery.c.device_id
|
||
).where(VWEDCallDevice.is_deleted == False)
|
||
|
||
# 添加过滤条件
|
||
if device_name:
|
||
query = query.where(VWEDCallDevice.device_name.like(f"%{device_name}%"))
|
||
|
||
if ip:
|
||
query = query.where(VWEDCallDevice.ip.like(f"%{ip}%"))
|
||
|
||
if protocol:
|
||
query = query.where(VWEDCallDevice.protocol == protocol)
|
||
|
||
if status is not None:
|
||
query = query.where(VWEDCallDevice.status == status)
|
||
|
||
# 计算总数
|
||
count_query = select(func.count()).select_from(VWEDCallDevice).where(
|
||
VWEDCallDevice.is_deleted == False
|
||
)
|
||
if device_name:
|
||
count_query = count_query.where(VWEDCallDevice.device_name.like(f"%{device_name}%"))
|
||
|
||
if ip:
|
||
count_query = count_query.where(VWEDCallDevice.ip.like(f"%{ip}%"))
|
||
|
||
if protocol:
|
||
count_query = count_query.where(VWEDCallDevice.protocol == protocol)
|
||
|
||
if status is not None:
|
||
count_query = count_query.where(VWEDCallDevice.status == status)
|
||
|
||
count_result = await session.execute(count_query)
|
||
total = count_result.scalar() or 0
|
||
|
||
# 分页
|
||
query = query.order_by(VWEDCallDevice.created_at.desc())
|
||
query = query.offset((page - 1) * page_size).limit(page_size)
|
||
|
||
# 执行查询
|
||
result = await session.execute(query)
|
||
|
||
# 格式化结果
|
||
device_list = []
|
||
for device, button_count in result:
|
||
device_dict = {
|
||
"id": device.id,
|
||
"protocol": device.protocol,
|
||
"brand": device.brand,
|
||
"ip": device.ip,
|
||
"port": device.port,
|
||
"device_name": device.device_name,
|
||
"status": device.status,
|
||
"slave_id": device.slave_id,
|
||
"created_at": device.created_at.strftime("%Y-%m-%d %H:%M:%S") if device.created_at else None,
|
||
"button_count": button_count
|
||
}
|
||
device_list.append(device_dict)
|
||
|
||
# 返回结果
|
||
return {
|
||
"success": True,
|
||
"message": "获取呼叫器设备列表成功",
|
||
"data": {
|
||
"list": device_list,
|
||
"pagination": {
|
||
"total": total,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total_pages": (total + page_size - 1) // page_size
|
||
}
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取呼叫器设备列表失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"获取呼叫器设备列表失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def get_call_device_detail(device_id: str) -> Dict[str, Any]:
|
||
"""
|
||
获取呼叫器设备详情
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
Dict: 包含设备详情的字典
|
||
"""
|
||
try:
|
||
async with get_async_session() as session:
|
||
# 查询设备
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id == device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
device = device_result.scalars().first()
|
||
|
||
if not device:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
|
||
}
|
||
|
||
# 查询按钮信息
|
||
button_query = select(VWEDCallDeviceButton).where(
|
||
(VWEDCallDeviceButton.device_id == device_id) &
|
||
(VWEDCallDeviceButton.is_deleted == False)
|
||
)
|
||
button_result = await session.execute(button_query)
|
||
buttons = button_result.scalars().all()
|
||
|
||
# 格式化按钮信息
|
||
button_list = []
|
||
for button in buttons:
|
||
button_dict = {
|
||
"id": button.id,
|
||
"signal_name": button.signal_name,
|
||
"signal_type": button.signal_type,
|
||
"signal_length": button.signal_length,
|
||
"register_address": button.register_address,
|
||
"function_code": button.function_code,
|
||
"remark": button.remark,
|
||
"vwed_task_id": button.vwed_task_id,
|
||
"long_vwed_task_id": button.long_vwed_task_id,
|
||
"created_at": button.created_at.strftime("%Y-%m-%d %H:%M:%S") if button.created_at else None
|
||
}
|
||
button_list.append(button_dict)
|
||
|
||
# 格式化设备信息
|
||
device_detail = {
|
||
"id": device.id,
|
||
"protocol": device.protocol,
|
||
"brand": device.brand,
|
||
"ip": device.ip,
|
||
"port": device.port,
|
||
"device_name": device.device_name,
|
||
"status": device.status,
|
||
"slave_id": device.slave_id,
|
||
"created_at": device.created_at.strftime("%Y-%m-%d %H:%M:%S") if device.created_at else None,
|
||
"updated_at": device.updated_at.strftime("%Y-%m-%d %H:%M:%S") if device.updated_at else None,
|
||
"buttons": button_list
|
||
}
|
||
|
||
# 返回结果
|
||
return {
|
||
"success": True,
|
||
"message": "获取呼叫器设备详情成功",
|
||
"data": device_detail
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取呼叫器设备详情失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"获取呼叫器设备详情失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def export_call_devices(device_ids: List[str]) -> Dict[str, Any]:
|
||
"""
|
||
导出呼叫器设备
|
||
|
||
Args:
|
||
device_ids: 要导出的设备ID列表
|
||
|
||
Returns:
|
||
Dict: 包含导出设备信息的字典
|
||
"""
|
||
try:
|
||
async with get_async_session() as session:
|
||
# 检查设备是否存在
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id.in_(device_ids)) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
devices = device_result.scalars().all()
|
||
|
||
found_ids = [device.id for device in devices]
|
||
not_found_ids = list(set(device_ids) - set(found_ids))
|
||
|
||
if not_found_ids:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到以下ID的呼叫器设备: {', '.join(not_found_ids)}"
|
||
}
|
||
|
||
# 导出设备数据
|
||
exported_devices = []
|
||
for device in devices:
|
||
# 查询关联的按钮
|
||
button_query = select(VWEDCallDeviceButton).where(
|
||
(VWEDCallDeviceButton.device_id == device.id) &
|
||
(VWEDCallDeviceButton.is_deleted == False)
|
||
)
|
||
button_result = await session.execute(button_query)
|
||
buttons = button_result.scalars().all()
|
||
|
||
# 格式化按钮数据
|
||
buttons_data = []
|
||
for button in buttons:
|
||
button_data = {
|
||
"signal_name": button.signal_name,
|
||
"signal_type": button.signal_type,
|
||
"signal_length": button.signal_length,
|
||
"register_address": button.register_address,
|
||
"function_code": button.function_code,
|
||
"remark": button.remark,
|
||
"vwed_task_id": button.vwed_task_id,
|
||
"long_vwed_task_id": button.long_vwed_task_id
|
||
}
|
||
buttons_data.append(button_data)
|
||
|
||
# 格式化设备数据
|
||
device_data = {
|
||
"protocol": device.protocol,
|
||
"brand": device.brand,
|
||
"ip": device.ip,
|
||
"port": device.port,
|
||
"device_name": device.device_name,
|
||
"status": device.status,
|
||
"slave_id": device.slave_id,
|
||
"buttons": buttons_data
|
||
}
|
||
exported_devices.append(device_data)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"成功导出 {len(exported_devices)} 个呼叫器设备",
|
||
"data": exported_devices
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"导出呼叫器设备失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"导出呼叫器设备失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def import_call_devices(devices_data: List[Dict[str, Any]], rename_suffix: str = None) -> Dict[str, Any]:
|
||
"""
|
||
导入呼叫器设备
|
||
|
||
Args:
|
||
devices_data: 要导入的设备数据列表
|
||
rename_suffix: 重命名后缀,用于避免名称冲突
|
||
|
||
Returns:
|
||
Dict: 包含导入结果的字典
|
||
"""
|
||
try:
|
||
import_results = {
|
||
"success_count": 0,
|
||
"failed_count": 0,
|
||
"failed_devices": [],
|
||
"imported_devices": []
|
||
}
|
||
|
||
for index, device_data in enumerate(devices_data):
|
||
try:
|
||
# 验证必要字段
|
||
required_fields = ["protocol", "brand", "ip", "port", "device_name"]
|
||
for field in required_fields:
|
||
if field not in device_data:
|
||
raise ValueError(f"设备数据缺少{field}字段,无法导入")
|
||
|
||
# 处理设备名称,添加后缀以避免冲突
|
||
original_name = device_data["device_name"]
|
||
if rename_suffix:
|
||
device_data["device_name"] = f"{original_name}{rename_suffix}"
|
||
|
||
# 处理按钮数据
|
||
buttons = device_data.get("buttons", [])
|
||
buttons_dict = []
|
||
for button in buttons:
|
||
if "signal_name" not in button:
|
||
continue
|
||
button_dict = {
|
||
"signal_name": button.get("signal_name"),
|
||
"signal_type": button.get("signal_type"),
|
||
"signal_length": button.get("signal_length"),
|
||
"register_address": button.get("register_address"),
|
||
"function_code": button.get("function_code"),
|
||
"remark": button.get("remark"),
|
||
"vwed_task_id": button.get("vwed_task_id"),
|
||
"long_vwed_task_id": button.get("long_vwed_task_id")
|
||
}
|
||
buttons_dict.append(button_dict)
|
||
|
||
# 调用添加设备方法
|
||
result = await CallDeviceService.add_call_device(
|
||
protocol=device_data.get("protocol"),
|
||
brand=device_data.get("brand"),
|
||
ip=device_data.get("ip"),
|
||
port=device_data.get("port"),
|
||
device_name=device_data.get("device_name"),
|
||
status=device_data.get("status", CallDeviceStatus.START),
|
||
slave_id=device_data.get("slave_id", "1"),
|
||
buttons=buttons_dict
|
||
)
|
||
|
||
if result.get("success", False):
|
||
import_results["success_count"] += 1
|
||
import_results["imported_devices"].append({
|
||
"id": result.get("data", {}).get("id", ""),
|
||
"device_name": device_data.get("device_name"),
|
||
"original_name": original_name
|
||
})
|
||
else:
|
||
import_results["failed_count"] += 1
|
||
import_results["failed_devices"].append({
|
||
"index": index,
|
||
"device_name": device_data.get("device_name"),
|
||
"reason": result.get("message", "未知原因")
|
||
})
|
||
|
||
except Exception as e:
|
||
import_results["failed_count"] += 1
|
||
import_results["failed_devices"].append({
|
||
"index": index,
|
||
"device_name": device_data.get("device_name", "未知设备"),
|
||
"reason": str(e)
|
||
})
|
||
|
||
# 构建返回消息
|
||
if import_results["success_count"] > 0 and import_results["failed_count"] == 0:
|
||
message = f"成功导入 {import_results['success_count']} 个呼叫器设备"
|
||
elif import_results["success_count"] > 0 and import_results["failed_count"] > 0:
|
||
# 提取错误原因
|
||
error_reasons = [device.get("reason", "未知原因") for device in import_results["failed_devices"]]
|
||
message = f"部分导入成功: 成功 {import_results['success_count']} 个, 失败 {import_results['failed_count']} 个, 原因: {', '.join(error_reasons)}"
|
||
else:
|
||
# 提取错误原因
|
||
error_reasons = [device.get("reason", "未知原因") for device in import_results["failed_devices"]]
|
||
message = f"导入失败: 所有 {import_results['failed_count']} 个呼叫器设备导入失败, 原因: {', '.join(error_reasons)}"
|
||
|
||
return {
|
||
"success": import_results["success_count"] > 0,
|
||
"message": message,
|
||
"data": import_results
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"导入呼叫器设备失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"导入呼叫器设备失败: {str(e)}",
|
||
"data": {
|
||
"success_count": 0,
|
||
"failed_count": 0,
|
||
"failed_devices": [],
|
||
"imported_devices": []
|
||
}
|
||
}
|
||
|
||
@staticmethod
|
||
async def start_device_monitor(device_id: str, api_token: str) -> Dict[str, Any]:
|
||
"""
|
||
启动设备监控
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
api_token: 系统任务令牌
|
||
Returns:
|
||
Dict: 包含启动结果的字典
|
||
"""
|
||
try:
|
||
# 检查设备是否存在
|
||
async with get_async_session() as session:
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id == device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
device = device_result.scalars().first()
|
||
|
||
if not device:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
|
||
}
|
||
|
||
# 查询设备按钮配置
|
||
button_query = select(VWEDCallDeviceButton).where(
|
||
(VWEDCallDeviceButton.device_id == device_id) &
|
||
(VWEDCallDeviceButton.is_deleted == False)
|
||
)
|
||
button_result = await session.execute(button_query)
|
||
buttons = button_result.scalars().all()
|
||
|
||
if not buttons:
|
||
return {
|
||
"success": False,
|
||
"message": f"设备 '{device.device_name}' 没有配置按钮,无法启动监控"
|
||
}
|
||
|
||
# 更新设备状态为启用状态
|
||
device.status = CallDeviceStatus.START
|
||
await session.commit()
|
||
|
||
# 获取设备和按钮信息,传递给监控线程
|
||
device_info = {
|
||
"id": device.id,
|
||
"protocol": device.protocol,
|
||
"brand": device.brand,
|
||
"ip": device.ip,
|
||
"port": device.port,
|
||
"device_name": device.device_name,
|
||
"slave_id": device.slave_id,
|
||
"is_init": device.is_init,
|
||
"buttons": []
|
||
}
|
||
|
||
for button in buttons:
|
||
button_info = {
|
||
"id": button.id,
|
||
"signal_name": button.signal_name,
|
||
"signal_type": button.signal_type,
|
||
"signal_length": button.signal_length,
|
||
"register_address": button.register_address,
|
||
"function_code": button.function_code,
|
||
"remark": button.remark,
|
||
"vwed_task_id": button.vwed_task_id,
|
||
"long_vwed_task_id": button.long_vwed_task_id
|
||
}
|
||
device_info["buttons"].append(button_info)
|
||
|
||
with thread_lock:
|
||
# 检查是否已存在监控线程
|
||
if device_id in device_monitor_threads and device_monitor_threads[device_id].is_alive():
|
||
return {
|
||
"success": False,
|
||
"message": f"设备 '{device_info['device_name']}' 已在监控中"
|
||
}
|
||
|
||
# 设置设备监控状态为运行中
|
||
device_monitor_status[device_id] = {
|
||
"status": "running",
|
||
"start_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"device_name": device_info["device_name"]
|
||
}
|
||
|
||
# 创建并启动监控线程
|
||
monitor_thread = threading.Thread(
|
||
target=CallDeviceService._device_monitor_thread,
|
||
args=(device_info, api_token),
|
||
daemon=True # 设置为守护线程,这样当主线程退出时,此线程也会退出
|
||
)
|
||
monitor_thread.start()
|
||
|
||
# 保存线程引用
|
||
device_monitor_threads[device_id] = monitor_thread
|
||
|
||
logger.info(f"已启动设备 '{device_info['device_name']}' 的监控线程")
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"成功启动设备 '{device_info['device_name']}' 的监控",
|
||
"data": {
|
||
"device_id": device_id,
|
||
"device_name": device_info["device_name"],
|
||
"status": "running",
|
||
"start_time": device_monitor_status[device_id]["start_time"]
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"启动设备监控失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"启动设备监控失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
def _device_monitor_thread(device_info: Dict[str, Any], api_token: str):
|
||
"""
|
||
设备监控线程函数
|
||
|
||
Args:
|
||
device_info: 设备信息字典
|
||
"""
|
||
device_id = device_info["id"]
|
||
device_name = device_info["device_name"]
|
||
device_ip = str(device_info["ip"])
|
||
device_port = str(device_info["port"])
|
||
device_slave_id = str(device_info["slave_id"])
|
||
device_ip_port_slave_id = device_ip+"-"+device_port+"-"+device_slave_id
|
||
logger.info(f"设备 '{device_name}' 监控线程已启动")
|
||
|
||
# 初始化设备按钮锁定状态
|
||
with thread_lock:
|
||
device_button_locks[device_id] = {}
|
||
|
||
# 创建一个新的事件循环
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
# 初始化
|
||
with get_session() as session:
|
||
calldevice = session.query(VWEDCallDevice).filter(VWEDCallDevice.id == device_id).first()
|
||
button_infos = {c.register_address: [c.signal_type, c.long_vwed_task_id, c.vwed_task_id] for c in calldevice.buttons}
|
||
# print("button_infos::::::::", button_infos, "=====================================")
|
||
if not calldevice.is_init:
|
||
init_result = loop.run_until_complete(
|
||
CallDeviceService._init_device(device_info))
|
||
# time.sleep(3)
|
||
if init_result.get("success", False):
|
||
logger.info(f"设备 '{device_name}' 初始化成功")
|
||
# 使用同步方式更新数据库中设备的初始化状态
|
||
# 更改状态
|
||
result = CallDeviceService._update_device_init_status(device_id, True)
|
||
try:
|
||
# 持续监控设备状态,直到线程被取消
|
||
while device_id in device_monitor_status and device_monitor_status[device_id]["status"] == "running":
|
||
|
||
try:
|
||
# 读取所有按钮状态
|
||
# for button_address, button_info in button_states.items():
|
||
# 调用新的接口读取按钮状态
|
||
current_state = loop.run_until_complete(
|
||
CallDeviceService._read_button_state(
|
||
device_info, button_infos
|
||
)
|
||
)
|
||
if isinstance(current_state, int) and current_state == 0:
|
||
logger.info(f"设备不在线 '{device_name}' 的监控已停止")
|
||
return
|
||
num = 0
|
||
# print("current_state::::::::", current_state, "=====================================")
|
||
for button_info in current_state:
|
||
num += 1
|
||
button_address = button_info["register_address"]
|
||
button_values = button_info["register_values"]
|
||
bind_value = button_info.get("bind_value", "")
|
||
button_values = eval(button_values)
|
||
device_ip_port_slave_id_new = device_ip_port_slave_id + "-" + button_address
|
||
# print("button_values::::::::", button_info, "=====================================")
|
||
with get_session() as session:
|
||
task_record_all = session.query(VWEDTaskRecord).filter(VWEDTaskRecord.source_device == device_ip_port_slave_id_new, VWEDTaskRecord.status.in_([TaskStatus.RUNNING, TaskStatus.QUEUED, TaskStatus.PAUSED])).all()
|
||
print("task_record_all::::::::", len(task_record_all), num, "=====================================")
|
||
print("button_values::::::::", button_values, "=====================================")
|
||
if button_values and button_values[0] == CallDeviceButtonStatus.DOWN :
|
||
if bind_value:
|
||
try:
|
||
# task_record_all = session.query(VWEDTaskRecord).filter(VWEDTaskRecord.source_device == device_ip_port_slave_id).all()
|
||
if not task_record_all:
|
||
# 生成按钮锁定键,每个按钮+任务组合一个锁
|
||
button_lock_key = f"{button_address}_{bind_value}"
|
||
# 检查该按钮的该任务是否已经在处理中
|
||
with thread_lock:
|
||
if device_button_locks[device_id].get(button_lock_key, False):
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 任务 {bind_value} 正在处理中,跳过重复触发")
|
||
# time.sleep(2)
|
||
continue
|
||
|
||
# 设置按钮任务锁定状态
|
||
device_button_locks[device_id][button_lock_key] = True
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 任务 {bind_value} 已锁定,开始处理")
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 被按下,触发任务 {bind_value}")
|
||
task_result = loop.run_until_complete(
|
||
CallDeviceService._execute_task(
|
||
bind_value,
|
||
device_ip_port_slave_id_new,
|
||
api_token
|
||
)
|
||
)
|
||
print("task_result::::::::", task_result, "=====================================")
|
||
# 如果任务启动成功,启动异步任务状态监控
|
||
if task_result.get("success", False):
|
||
task_record_id = task_result.get("task_record_id")
|
||
if task_record_id:
|
||
logger.info(f"任务启动成功,开始监控任务状态: 任务记录ID={task_record_id}")
|
||
# 创建异步任务来监控任务状态并复位按钮
|
||
loop.create_task(
|
||
CallDeviceService._monitor_task_and_reset_button(
|
||
task_record_id,
|
||
device_info,
|
||
button_address,
|
||
device_id,
|
||
button_lock_key
|
||
)
|
||
)
|
||
else:
|
||
logger.warning(f"任务启动成功但未获取到任务记录ID,无法监控任务状态")
|
||
# 如果任务启动成功但未获取到任务记录ID,则将按钮状态设置为初始化状态
|
||
result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
|
||
if result.get("success", False):
|
||
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
|
||
else:
|
||
logger.error(f"按钮 {button_address} 状态重置失败: {result.get('message', '未知错误')}")
|
||
# 释放按钮锁定
|
||
with thread_lock:
|
||
device_button_locks[device_id][button_lock_key] = False
|
||
else:
|
||
logger.error(f"任务启动失败: {task_result.get('message', '未知错误')}")
|
||
# 如果任务启动失败,则将按钮状态设置为初始化状态
|
||
result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
|
||
if result.get("success", False):
|
||
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
|
||
else:
|
||
logger.error(f"按钮 {button_address} 状态重置失败: {result.get('message', '未知错误')}")
|
||
# 释放按钮锁定
|
||
with thread_lock:
|
||
device_button_locks[device_id][button_lock_key] = False
|
||
# else:
|
||
# logger.info(f"设备 '{device_name}' 有任务正在运行,按钮 {button_address} 任务 {bind_value} 跳过执行")
|
||
# # 释放按钮锁定
|
||
# with thread_lock:
|
||
# device_button_locks[device_id][button_lock_key] = False
|
||
except Exception as e:
|
||
logger.error(f"处理按钮 {button_address} 任务时发生异常: {str(e)}")
|
||
# 发生异常时也要释放锁定
|
||
with thread_lock:
|
||
device_button_locks[device_id][button_lock_key] = False
|
||
else:
|
||
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 被按下,未配置任务ID,跳过任务执行")
|
||
# 如果按钮状态为初始化状态,则将按钮状态设置为初始化状态
|
||
result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
|
||
if result.get("success", False):
|
||
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
|
||
else:
|
||
logger.error(f"按钮 {button_address} 状态重置失败: {result.get('message', '未知错误')}")
|
||
elif task_record_all and button_values and button_values[0] == CallDeviceButtonStatus.CANCEL:
|
||
# print("button_values[0]::::::::", button_values[0], "=====================================")
|
||
if button_infos.get(button_address, None)[1] != button_infos.get(button_address, None)[2]:
|
||
logger.warning(f"该设备按钮 {button_address} 触发任务与取消任务不一致,无法取消任务")
|
||
continue
|
||
# 如果按钮状态为初始化状态,则将按钮状态设置为初始化状态
|
||
if bind_value:
|
||
# 生成按钮取消锁定键
|
||
cancel_lock_key = f"cancel_{button_address}_{bind_value}"
|
||
|
||
# 检查该按钮的取消操作是否已经在处理中
|
||
with thread_lock:
|
||
if device_button_locks[device_id].get(cancel_lock_key, False):
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 取消操作正在处理中,跳过重复取消")
|
||
continue
|
||
|
||
# 设置按钮取消锁定状态
|
||
device_button_locks[device_id][cancel_lock_key] = True
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 取消操作已锁定,开始处理")
|
||
|
||
try:
|
||
with get_session() as session:
|
||
task_record_all = session.query(VWEDTaskRecord).filter(VWEDTaskRecord.source_device == device_ip_port_slave_id_new, VWEDTaskRecord.status.in_([TaskStatus.RUNNING, TaskStatus.QUEUED, TaskStatus.PAUSED])).all()
|
||
if task_record_all:
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 被取消,停止任务")
|
||
from services.task_service import TaskService
|
||
task_record_id = task_record_all[0].id
|
||
# result = TaskService.stop_task_def(task_record_id)
|
||
task_result = loop.run_until_complete(
|
||
CallDeviceService._cancel_task(task_record_id)
|
||
)
|
||
if task_result.get("success", False):
|
||
logger.info(f"任务 {task_record_id} 已取消")
|
||
# 将按钮状态设置为初始化状态
|
||
reset_result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
|
||
if reset_result.get("success", False):
|
||
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
|
||
else:
|
||
logger.error(f"按钮 {button_address} 状态重置失败: {reset_result.get('message', '未知错误')}")
|
||
else:
|
||
logger.error(f"任务 {task_record_id} 取消失败: {result.get('message', '未知错误')}")
|
||
else:
|
||
logger.info(f"设备 '{device_name}' 没有正在运行的任务,直接重置按钮状态")
|
||
# 将按钮状态设置为初始化状态
|
||
reset_result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
|
||
if reset_result.get("success", False):
|
||
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
|
||
else:
|
||
logger.error(f"按钮 {button_address} 状态重置失败: {reset_result.get('message', '未知错误')}")
|
||
finally:
|
||
# 释放取消操作锁定
|
||
with thread_lock:
|
||
device_button_locks[device_id][cancel_lock_key] = False
|
||
logger.info(f"已释放取消操作锁定: 设备ID={device_id}, 取消锁定键={cancel_lock_key}")
|
||
else:
|
||
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 取消任务,未配置任务ID,跳过任务执行")
|
||
# 每1秒检查一次
|
||
time.sleep(1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"设备 '{device_name}' 监控过程中发生错误: {str(e)}")
|
||
# 出错后暂停一段时间再继续
|
||
time.sleep(5)
|
||
|
||
except Exception as e:
|
||
logger.error(f"设备 '{device_name}' 监控线程异常: {str(e)}")
|
||
finally:
|
||
# 关闭事件循环
|
||
loop.close()
|
||
logger.info(f"设备 '{device_name}' 监控线程已结束")
|
||
|
||
# 清理设备按钮锁定状态
|
||
with thread_lock:
|
||
if device_id in device_button_locks:
|
||
del device_button_locks[device_id]
|
||
|
||
# 更新设备状态
|
||
with thread_lock:
|
||
if device_id in device_monitor_status:
|
||
device_monitor_status[device_id]["status"] = "stopped"
|
||
device_monitor_status[device_id]["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
with get_session() as session:
|
||
calldevice = session.query(VWEDCallDevice).filter(VWEDCallDevice.id == device_id).first()
|
||
calldevice.status = CallDeviceStatus.END
|
||
session.commit()
|
||
|
||
@staticmethod
|
||
async def _read_button_state(device_info: Dict[str, Any], button_infos: Dict[str, Any] = None) -> int:
|
||
"""
|
||
读取按钮状态
|
||
|
||
Args:
|
||
device_info: 设备信息
|
||
button_address: 按钮地址
|
||
|
||
Returns:
|
||
int: 按钮状态(0或1)
|
||
"""
|
||
try:
|
||
# 构建设备ID,格式为 agv:ip-port-slave_id
|
||
serial_number = f"{device_info.get('ip')}-{device_info.get('port')}-{device_info.get('slave_id')}"
|
||
agv_id = f"agvState:{serial_number}"
|
||
# 准备请求参数
|
||
request_data = {
|
||
"agvId": agv_id
|
||
}
|
||
# API URL
|
||
api_url = settings.CALLDEVICE_API_BASE_URL
|
||
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['get_device_state']}"
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "CallDeviceMonitor/1.0"
|
||
}
|
||
async with aiohttp.ClientSession() as session:
|
||
try:
|
||
async with session.post(url, json=request_data, headers=headers) as response:
|
||
if response.status == 200:
|
||
response_data = await response.json()
|
||
if response_data.get("success", False) and response_data.get("data"):
|
||
# 解析返回的数据
|
||
data_list = response_data.get("data", [])
|
||
if data_list is None:
|
||
return 0
|
||
if data_list and len(data_list) > 0:
|
||
# 数据是JSON字符串,需要解析
|
||
device_state_str = data_list[0]
|
||
# 处理双重转义的JSON字符串
|
||
try:
|
||
# 首先解析外层的JSON字符串
|
||
if isinstance(device_state_str, str):
|
||
# 如果是字符串,先解析一次
|
||
device_state_str = json.loads(device_state_str)
|
||
|
||
# 如果解析后还是字符串,再解析一次
|
||
if isinstance(device_state_str, str):
|
||
device_state = json.loads(device_state_str)
|
||
else:
|
||
device_state = device_state_str
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"解析设备状态JSON失败: {str(e)}, 原始数据: {device_state_str}")
|
||
return 0
|
||
result = []
|
||
# 查找对应按钮的状态
|
||
if device_state is None:
|
||
return 0
|
||
information = device_state.get("information", [])
|
||
for info in information:
|
||
if info.get("infotype") == "CUSTOM_REGISTER":
|
||
# 查找对应的寄存器信息
|
||
references = info.get("inforeferences", [])
|
||
register_name = None
|
||
register_address = None
|
||
register_values = None
|
||
bind_value = None
|
||
for ref in references:
|
||
# print("ref::::::::", ref, "=====================================")
|
||
if ref.get("referencekey") == "name":
|
||
register_name = ref.get("referencevalue")
|
||
elif ref.get("referencekey") == "address":
|
||
register_address = ref.get("referencevalue")
|
||
if button_infos and button_infos.get(register_address, None)[0] != CallDeviceButtonType.BUTTON:
|
||
continue
|
||
elif ref.get("referencekey") == "values":
|
||
register_values = ref.get("referencevalue")
|
||
elif ref.get("referencekey") == "bind":
|
||
bind_value = ref.get("referencevalue")
|
||
|
||
button_result = {
|
||
"register_address": register_address,
|
||
"register_values": register_values,
|
||
"bind_value": bind_value,
|
||
"signal_name": register_name,
|
||
# "vwed_task_id": button_config.get("vwed_task_id") if button_config else "",
|
||
}
|
||
result.append(button_result)
|
||
return result
|
||
else:
|
||
logger.warning(f"获取设备状态失败: {response_data.get('message', '未知错误')}")
|
||
return 0
|
||
else:
|
||
response_text = await response.text()
|
||
logger.warning(f"获取设备状态请求失败,状态码: {response.status}, 响应: {response_text}")
|
||
return 0
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.error(f"获取设备状态超时: {url}")
|
||
return 0
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"获取设备状态网络错误: {str(e)}")
|
||
return 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取按钮状态错误: {str(e)}")
|
||
return 0
|
||
|
||
@staticmethod
|
||
async def _execute_task(task_id: str, device_name: str, api_token: str) -> Dict[str, Any]:
|
||
"""
|
||
执行任务
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
device_name: 设备名称
|
||
api_token: 系统任务令牌
|
||
Returns:
|
||
Dict: 执行结果,包含任务记录ID
|
||
"""
|
||
try:
|
||
from data.enum.task_record_enum import SourceType
|
||
|
||
# 准备请求参数
|
||
run_request = {
|
||
"taskId": task_id,
|
||
"source_system": "calldevice_monitor",
|
||
"source_device": f"{device_name}",
|
||
"source_type": SourceType.CALL_MACHINE,
|
||
"params": [] # 可以根据需要传入参数
|
||
}
|
||
if not api_token:
|
||
return {
|
||
"success": False,
|
||
"message": "系统任务令牌为空",
|
||
"data": {}
|
||
}
|
||
# 获取本地服务器URL (从当前请求环境中获取基础URL)
|
||
base_url = settings.TASK_EXECUTION_API_BASE_URL
|
||
|
||
# 检查是否有环境变量配置的API URL
|
||
import os
|
||
api_url = os.environ.get("VWED_API_URL", base_url)
|
||
|
||
# 调用任务执行API
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"{api_url}{settings.TASK_EXECUTION_API_ENDPOINTS['run_task']}"
|
||
# 添加必要的请求头
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "CallDeviceMonitor/1.0",
|
||
"X-Source-System": "calldevice_monitor",
|
||
"X-Source-Device": device_name,
|
||
"x-access-token": api_token,
|
||
"authorization": api_token
|
||
}
|
||
|
||
logger.info(f"正在调用任务执行API: {url}, 任务ID: {task_id}, 来源设备: {device_name}")
|
||
|
||
try:
|
||
async with session.post(url, json=run_request, headers=headers) as response:
|
||
response_text = await response.text()
|
||
try:
|
||
response_data = json.loads(response_text)
|
||
except json.JSONDecodeError:
|
||
logger.error(f"解析任务执行API响应失败,响应内容不是有效的JSON: {response_text}")
|
||
return {
|
||
"success": False,
|
||
"message": "解析任务执行API响应失败,响应内容不是有效的JSON",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
|
||
if response.status == 200:
|
||
logger.info(f"任务启动成功: {task_id}, 响应: {response_data}")
|
||
# 提取任务记录ID
|
||
task_record_id = response_data.get("data", {}).get("taskRecordId")
|
||
return {
|
||
"success": True,
|
||
"message": "任务启动成功",
|
||
"data": response_data,
|
||
"task_record_id": task_record_id # 返回任务记录ID用于后续状态监控
|
||
}
|
||
else:
|
||
logger.warning(f"任务启动失败: {response_data}")
|
||
return {
|
||
"success": False,
|
||
"message": f"任务启动失败: {response_data.get('message', '未知错误')}",
|
||
"data": response_data
|
||
}
|
||
except asyncio.TimeoutError:
|
||
logger.error(f"调用任务执行API超时: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "调用任务执行API超时",
|
||
"data": {"url": url}
|
||
}
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"调用任务执行API出错: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"调用任务执行API出错: {str(e)}",
|
||
"data": {"url": url}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行任务错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"执行任务错误: {str(e)}"
|
||
}
|
||
|
||
|
||
@staticmethod
|
||
async def _cancel_task(task_record_id: str) -> Dict[str, Any]:
|
||
"""
|
||
取消任务
|
||
|
||
Args:
|
||
task_record_id: 任务记录ID
|
||
Returns:
|
||
Dict: 执行结果,包含任务记录ID
|
||
"""
|
||
try:
|
||
from data.enum.task_record_enum import SourceType
|
||
|
||
# 准备请求参数
|
||
run_request = {
|
||
"task_record_id": task_record_id,
|
||
}
|
||
|
||
# 获取本地服务器URL (从当前请求环境中获取基础URL)
|
||
base_url = settings.TASK_EXECUTION_API_BASE_URL
|
||
|
||
# 检查是否有环境变量配置的API URL
|
||
import os
|
||
api_url = os.environ.get("VWED_API_URL", base_url)
|
||
|
||
# 调用任务执行API
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"{api_url}{settings.TASK_EXECUTION_API_ENDPOINTS['stop_task']}/{task_record_id}"
|
||
# 添加必要的请求头
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "CallDeviceMonitor/1.0",
|
||
"X-Source-System": "calldevice_monitor",
|
||
}
|
||
|
||
|
||
try:
|
||
async with session.post(url, json=run_request, headers=headers) as response:
|
||
response_text = await response.text()
|
||
try:
|
||
response_data = json.loads(response_text)
|
||
except json.JSONDecodeError:
|
||
logger.error(f"解析任务执行API响应失败,响应内容不是有效的JSON: {response_text}")
|
||
return {
|
||
"success": False,
|
||
"message": "解析任务执行API响应失败,响应内容不是有效的JSON",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
|
||
if response.status == 200:
|
||
logger.info(f"任务取消成功: {task_record_id}, 响应: {response_data}")
|
||
# 提取任务记录ID
|
||
task_record_id = response_data.get("data", {}).get("taskRecordId")
|
||
return {
|
||
"success": True,
|
||
"message": "任务取消成功",
|
||
"data": response_data,
|
||
"task_record_id": task_record_id # 返回任务记录ID用于后续状态监控
|
||
}
|
||
else:
|
||
logger.warning(f"任务取消失败: {response_data}")
|
||
return {
|
||
"success": False,
|
||
"message": f"任务取消失败: {response_data.get('message', '未知错误')}",
|
||
"data": response_data
|
||
}
|
||
except asyncio.TimeoutError:
|
||
logger.error(f"调用任务取消API超时: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "调用任务取消API超时",
|
||
"data": {"url": url}
|
||
}
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"调用任务取消API出错: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"调用任务取消API出错: {str(e)}",
|
||
"data": {"url": url}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行任务错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"执行任务错误: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def stop_device_monitor(device_id: str) -> Dict[str, Any]:
|
||
"""
|
||
停止设备监控
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
|
||
Returns:
|
||
Dict: 包含停止结果的字典
|
||
"""
|
||
try:
|
||
# 检查设备是否存在
|
||
async with get_async_session() as session:
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id == device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
device = device_result.scalars().first()
|
||
|
||
if not device:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
|
||
}
|
||
|
||
device_name = device.device_name
|
||
|
||
with thread_lock:
|
||
# 检查是否存在监控线程
|
||
if device_id not in device_monitor_threads or not device_monitor_threads[device_id].is_alive():
|
||
return {
|
||
"success": False,
|
||
"message": f"设备 '{device_name}' 当前未在监控中"
|
||
}
|
||
|
||
# 更新设备监控状态为停止
|
||
if device_id in device_monitor_status:
|
||
device_monitor_status[device_id]["status"] = "stopping"
|
||
|
||
# 线程会自行检测状态变化并退出
|
||
# 等待线程结束(但不阻塞太久)
|
||
monitor_thread = device_monitor_threads[device_id]
|
||
# 设置一个超时时间,避免无限等待
|
||
max_wait_time = 5 # 最多等待5秒
|
||
wait_count = 0
|
||
|
||
# 释放锁后等待线程结束
|
||
while monitor_thread.is_alive() and wait_count < max_wait_time:
|
||
time.sleep(1)
|
||
wait_count += 1
|
||
|
||
# 即使线程未完全结束,也返回成功(线程会自行结束)
|
||
logger.info(f"已停止设备 '{device_name}' 的监控线程")
|
||
|
||
# 更新设备状态为禁用状态
|
||
async with get_async_session() as session:
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id == device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = await session.execute(device_query)
|
||
device = device_result.scalars().first()
|
||
|
||
if device:
|
||
device.status = CallDeviceStatus.END
|
||
await session.commit()
|
||
|
||
with thread_lock:
|
||
# 清理线程引用
|
||
if device_id in device_monitor_threads:
|
||
del device_monitor_threads[device_id]
|
||
|
||
# 更新监控状态
|
||
if device_id in device_monitor_status:
|
||
device_monitor_status[device_id]["status"] = "stopped"
|
||
device_monitor_status[device_id]["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"成功停止设备 '{device_name}' 的监控",
|
||
"data": {
|
||
"device_id": device_id,
|
||
"device_name": device_name,
|
||
"status": "stopped",
|
||
"end_time": device_monitor_status[device_id]["end_time"] if device_id in device_monitor_status else None
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"停止设备监控失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"停止设备监控失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def _init_device(device_info: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
初始化设备
|
||
|
||
Args:
|
||
device_info: 设备信息
|
||
|
||
Returns:
|
||
Dict: 初始化结果
|
||
"""
|
||
try:
|
||
# 准备初始化请求
|
||
print("device_info::::::::", device_info, "=====================================")
|
||
# serial_number = device_info.get("ip")+"-"+str(device_info.get("port"))+"-"+str(device_info.get("slave_id"))
|
||
serial_number = "EXT-0"
|
||
target_agv_id = serial_number # 在呼叫器中targetAgvId、serialNumber设置为相同的值
|
||
|
||
# 生成当前时间戳,格式为 ISO 8601
|
||
current_time = datetime.datetime.now().isoformat()
|
||
# 构建registers参数,基于设备的按钮配置
|
||
registers = []
|
||
for button in device_info.get("buttons", []):
|
||
register_item = {
|
||
# "id": button.get("id"),
|
||
"fnCode": button.get("function_code"),
|
||
"name": button.get("signal_name"),
|
||
"regCount": button.get("signal_length"),
|
||
"regAddress": button.get("register_address"),
|
||
"bind": button.get("vwed_task_id"),
|
||
# "long_vwed_task_id": button.get("long_vwed_task_id")
|
||
}
|
||
registers.append(register_item)
|
||
|
||
# 构造初始化请求,使用新的instantActions格式
|
||
init_request = {
|
||
"targetAgvId": target_agv_id,
|
||
"instantActions": {
|
||
"version": "2.0.0",
|
||
"headerId": int(time.time()),
|
||
"timestamp": current_time,
|
||
"manufacturer": "SEER",
|
||
"serialNumber": serial_number,
|
||
"instantActions": [
|
||
{
|
||
"actionType": "deviceSetup",
|
||
"actionId": device_info.get("id"),
|
||
"blockingType": "HARD",
|
||
"actionParameters": [
|
||
{
|
||
"key": "protocolType",
|
||
"value": device_info.get("protocol")
|
||
},
|
||
{
|
||
"key": "brandName",
|
||
"value": device_info.get("brand")
|
||
},
|
||
{
|
||
"key": "ip",
|
||
"value": device_info.get("ip")
|
||
},
|
||
{
|
||
"key": "port",
|
||
"value": str(device_info.get("port"))
|
||
},
|
||
{
|
||
"key": "deviceName",
|
||
"value": device_info.get("device_name")
|
||
},
|
||
{
|
||
"key": "slaveId",
|
||
"value": device_info.get("slave_id")
|
||
},
|
||
{
|
||
"key": "registers",
|
||
"value": json.dumps(registers)
|
||
}
|
||
],
|
||
"actionDescription": "action parameters"
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
logger.info(f"开始初始化设备: {device_info['device_name']}, 请求数据: {json.dumps(init_request)}")
|
||
|
||
# 获取本地服务器URL或环境变量配置的API URL
|
||
import socket
|
||
import os
|
||
hostname = socket.gethostname()
|
||
server_ip = socket.gethostbyname(hostname)
|
||
api_url = settings.CALLDEVICE_API_INIT_BASE_URL
|
||
|
||
# 调用初始化API
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['init_device']}"
|
||
print("url::::::::", url, "=====================================")
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "CallDeviceMonitor/1.0"
|
||
}
|
||
|
||
try:
|
||
async with session.post(url, json=init_request, headers=headers) as response:
|
||
response_text = await response.text()
|
||
print("response_text::::::::", response_text, "=====================================")
|
||
if response.status == 200:
|
||
# 尝试解析响应
|
||
try:
|
||
response_data = json.loads(response_text) if response_text else {}
|
||
logger.info(f"设备初始化成功: {device_info['device_name']}, 响应: {response_data}")
|
||
return {
|
||
"success": True,
|
||
"message": "设备初始化成功",
|
||
"data": response_data
|
||
}
|
||
except json.JSONDecodeError:
|
||
# 如果响应不是JSON,但状态码是200,仍然视为成功
|
||
logger.info(f"设备初始化成功: {device_info['device_name']}, 但响应不是JSON格式: {response_text}")
|
||
return {
|
||
"success": True,
|
||
"message": "设备初始化成功,响应不是JSON格式",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
else:
|
||
try:
|
||
response_data = json.loads(response_text) if response_text else {}
|
||
error_msg = response_data.get("message", f"初始化失败,状态码: {response.status}")
|
||
except json.JSONDecodeError:
|
||
error_msg = f"初始化失败,状态码: {response.status}, 响应: {response_text}"
|
||
|
||
logger.warning(f"设备初始化失败: {device_info['device_name']}, {error_msg}")
|
||
return {
|
||
"success": False,
|
||
"message": error_msg,
|
||
"data": {"status_code": response.status, "response_text": response_text}
|
||
}
|
||
except asyncio.TimeoutError:
|
||
logger.error(f"调用初始化API超时: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "调用初始化API超时",
|
||
"data": {"url": url}
|
||
}
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"调用初始化API出错: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"调用初始化API出错: {str(e)}",
|
||
"data": {"url": url}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"设备初始化过程发生错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"设备初始化过程发生错误: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
def _update_device_init_status(device_id: str, is_init: bool) -> Dict[str, Any]:
|
||
"""
|
||
更新设备初始化状态
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
is_init: 初始化状态
|
||
|
||
Returns:
|
||
Dict: 更新结果
|
||
"""
|
||
try:
|
||
with get_session() as session:
|
||
# 查询设备
|
||
device_query = select(VWEDCallDevice).where(
|
||
(VWEDCallDevice.id == device_id) &
|
||
(VWEDCallDevice.is_deleted == False)
|
||
)
|
||
device_result = session.execute(device_query)
|
||
device = device_result.scalars().first()
|
||
|
||
if not device:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
|
||
}
|
||
|
||
# 更新初始化状态
|
||
device.is_init = is_init
|
||
session.commit()
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"成功更新设备 '{device.device_name}' 的初始化状态为 {is_init}"
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新设备初始化状态失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"更新设备初始化状态失败: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
def synchronous_reset_button_state(device_info: Dict[str, Any], button_address: str) -> Dict[str, Any]:
|
||
"""
|
||
复位按钮状态到初始化状态(同步版本)
|
||
|
||
Args:
|
||
device_info: 设备信息
|
||
button_address: 按钮寄存器地址
|
||
|
||
Returns:
|
||
Dict: 复位结果
|
||
"""
|
||
try:
|
||
import requests
|
||
|
||
# 构建设备标识
|
||
serial_number = f"{device_info.get('ip')}-{device_info.get('port')}-{device_info.get('slave_id')}"
|
||
target_agv_id = serial_number
|
||
|
||
# 生成当前时间戳,格式为 ISO 8601
|
||
current_time = datetime.datetime.now().isoformat()
|
||
|
||
# 查找对应的按钮配置
|
||
button_config = None
|
||
print("device_info::::::::", device_info, "=====================================")
|
||
for button in device_info.get("buttons", []):
|
||
if button.get("register_address") == button_address:
|
||
button_config = button
|
||
break
|
||
|
||
if not button_config:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到寄存器地址为 {button_address} 的按钮配置"
|
||
}
|
||
|
||
# 构建registers参数,将按钮状态设置为初始化状态
|
||
registers = [{
|
||
"fnCode": button_config.get("function_code"),
|
||
"name": button_config.get("signal_name"),
|
||
"regCount": button_config.get("signal_length"),
|
||
"regAddress": button_address,
|
||
"regValue": CallDeviceButtonStatus.INIT # 设置为初始化状态
|
||
}]
|
||
|
||
# 构造复位请求
|
||
reset_request = {
|
||
"targetAgvId": target_agv_id,
|
||
"instantActions": {
|
||
"version": "2.0.0",
|
||
"headerId": int(time.time()),
|
||
"timestamp": current_time,
|
||
"manufacturer": device_info.get("brand"),
|
||
"serialNumber": serial_number,
|
||
"instantActions": [
|
||
{
|
||
"actionType": "deviceWrite",
|
||
"actionId": f"reset-{button_config.get('id')}-{int(time.time())}",
|
||
"blockingType": "HARD",
|
||
"actionParameters": [
|
||
{
|
||
"key": "protocolType",
|
||
"value": device_info.get("protocol")
|
||
},
|
||
{
|
||
"key": "brandName",
|
||
"value": device_info.get("brand")
|
||
},
|
||
{
|
||
"key": "ip",
|
||
"value": device_info.get("ip")
|
||
},
|
||
{
|
||
"key": "port",
|
||
"value": str(device_info.get("port"))
|
||
},
|
||
{
|
||
"key": "deviceName",
|
||
"value": device_info.get("device_name")
|
||
},
|
||
{
|
||
"key": "slaveId",
|
||
"value": device_info.get("slave_id")
|
||
},
|
||
{
|
||
"key": "registers",
|
||
"value": json.dumps(registers)
|
||
}
|
||
],
|
||
"actionDescription": "action write device"
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
logger.info(f"开始复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}, 请求数据: {json.dumps(reset_request)}")
|
||
|
||
# 获取API URL
|
||
api_url = settings.CALLDEVICE_API_BASE_URL
|
||
|
||
# 调用复位API
|
||
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['set_device_state']}"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "CallDeviceMonitor/1.0"
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, json=reset_request, headers=headers)
|
||
response_text = response.text
|
||
logger.info(f"复位按钮状态响应: {response_text}")
|
||
|
||
if response.status_code == 200:
|
||
try:
|
||
response_data = json.loads(response_text)
|
||
if response_data.get("success", False):
|
||
logger.info(f"成功复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}")
|
||
return {
|
||
"success": True,
|
||
"message": "按钮状态复位成功",
|
||
"data": response_data
|
||
}
|
||
else:
|
||
logger.warning(f"复位按钮状态失败: {response_data.get('message', '未知错误')}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态失败: {response_data.get('message', '未知错误')}",
|
||
"data": response_data
|
||
}
|
||
except json.JSONDecodeError:
|
||
logger.error(f"解析复位响应失败,响应内容不是有效的JSON: {response_text}")
|
||
return {
|
||
"success": False,
|
||
"message": "解析复位响应失败,响应内容不是有效的JSON",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
else:
|
||
logger.error(f"复位按钮状态请求失败,状态码: {response.status_code}, 响应: {response_text}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态请求失败,状态码: {response.status_code}",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
|
||
except requests.exceptions.Timeout:
|
||
logger.error(f"复位按钮状态超时: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "复位按钮状态超时",
|
||
"data": {"url": url}
|
||
}
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"复位按钮状态网络错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态网络错误: {str(e)}",
|
||
"data": {"url": url}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"复位按钮状态错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态错误: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def _reset_button_state(device_info: Dict[str, Any], button_address: str) -> Dict[str, Any]:
|
||
"""
|
||
复位按钮状态到初始化状态
|
||
|
||
Args:
|
||
device_info: 设备信息
|
||
button_address: 按钮寄存器地址
|
||
|
||
Returns:
|
||
Dict: 复位结果
|
||
"""
|
||
try:
|
||
# 构建设备标识
|
||
serial_number = f"{device_info.get('ip')}-{device_info.get('port')}-{device_info.get('slave_id')}"
|
||
target_agv_id = serial_number
|
||
|
||
# 生成当前时间戳,格式为 ISO 8601
|
||
current_time = datetime.datetime.now().isoformat()
|
||
|
||
# 查找对应的按钮配置
|
||
button_config = None
|
||
for button in device_info.get("buttons", []):
|
||
if button.get("register_address") == button_address:
|
||
button_config = button
|
||
break
|
||
|
||
if not button_config:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到寄存器地址为 {button_address} 的按钮配置"
|
||
}
|
||
|
||
# 构建registers参数,将按钮状态设置为初始化状态
|
||
registers = [{
|
||
"fnCode": button_config.get("function_code"),
|
||
"name": button_config.get("signal_name"),
|
||
"regCount": button_config.get("signal_length"),
|
||
"regAddress": button_config.get("register_address"),
|
||
"regValue": CallDeviceButtonStatus.INIT # 设置为初始化状态
|
||
}]
|
||
|
||
# 构造复位请求
|
||
reset_request = {
|
||
"targetAgvId": target_agv_id,
|
||
"instantActions": {
|
||
"version": "2.0.0",
|
||
"headerId": int(time.time()),
|
||
"timestamp": current_time,
|
||
"manufacturer": device_info.get("brand"),
|
||
"serialNumber": serial_number,
|
||
"instantActions": [
|
||
{
|
||
"actionType": "deviceWrite",
|
||
"actionId": f"reset-{button_config.get('id')}-{int(time.time())}",
|
||
"blockingType": "HARD",
|
||
"actionParameters": [
|
||
{
|
||
"key": "protocolType",
|
||
"value": device_info.get("protocol")
|
||
},
|
||
{
|
||
"key": "brandName",
|
||
"value": device_info.get("brand")
|
||
},
|
||
{
|
||
"key": "ip",
|
||
"value": device_info.get("ip")
|
||
},
|
||
{
|
||
"key": "port",
|
||
"value": str(device_info.get("port"))
|
||
},
|
||
{
|
||
"key": "deviceName",
|
||
"value": device_info.get("device_name")
|
||
},
|
||
{
|
||
"key": "slaveId",
|
||
"value": device_info.get("slave_id")
|
||
},
|
||
{
|
||
"key": "registers",
|
||
"value": json.dumps(registers)
|
||
}
|
||
],
|
||
"actionDescription": "action write device"
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
logger.info(f"开始复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}, 请求数据: {json.dumps(reset_request)}")
|
||
|
||
# 获取API URL
|
||
api_url = settings.CALLDEVICE_API_INIT_BASE_URL
|
||
|
||
# 调用复位API
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['set_device_state']}"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "CallDeviceMonitor/1.0"
|
||
}
|
||
|
||
try:
|
||
async with session.post(url, json=reset_request, headers=headers) as response:
|
||
response_text = await response.text()
|
||
logger.info(f"复位按钮状态响应: {response_text}")
|
||
|
||
if response.status == 200:
|
||
try:
|
||
response_data = json.loads(response_text)
|
||
if response_data.get("success", False):
|
||
logger.info(f"成功复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}")
|
||
while True:
|
||
button_state = await CallDeviceService._read_button_state(device_info)
|
||
for button_info in button_state:
|
||
if button_info.get("register_address") == button_address:
|
||
register_values = button_info.get("register_values")
|
||
if register_values and register_values[0] == CallDeviceButtonStatus.INIT:
|
||
break
|
||
if not button_state:
|
||
break
|
||
await asyncio.sleep(1)
|
||
# await asyncio.sleep(1)
|
||
# button_state = [button.get("register_values") await CallDeviceService._read_button_state(device_info) if button.get("register_address") == button_address]
|
||
|
||
print("button_state::::::::", button_state, "=====================================")
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "按钮状态复位成功",
|
||
"data": response_data
|
||
}
|
||
else:
|
||
logger.warning(f"复位按钮状态失败: {response_data.get('message', '未知错误')}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态失败: {response_data.get('message', '未知错误')}",
|
||
"data": response_data
|
||
}
|
||
except json.JSONDecodeError:
|
||
logger.error(f"解析复位响应失败,响应内容不是有效的JSON: {response_text}")
|
||
return {
|
||
"success": False,
|
||
"message": "解析复位响应失败,响应内容不是有效的JSON",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
else:
|
||
logger.error(f"复位按钮状态请求失败,状态码: {response.status}, 响应: {response_text}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态请求失败,状态码: {response.status}",
|
||
"data": {"response_text": response_text}
|
||
}
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.error(f"复位按钮状态超时: {url}")
|
||
return {
|
||
"success": False,
|
||
"message": "复位按钮状态超时",
|
||
"data": {"url": url}
|
||
}
|
||
except aiohttp.ClientError as e:
|
||
logger.error(f"复位按钮状态网络错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态网络错误: {str(e)}",
|
||
"data": {"url": url}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"复位按钮状态错误: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"message": f"复位按钮状态错误: {str(e)}"
|
||
}
|
||
|
||
@staticmethod
|
||
async def _monitor_task_and_reset_button(task_record_id: str, device_info: Dict[str, Any], button_address: str, device_id: str = None, button_lock_key: str = None) -> None:
|
||
"""
|
||
监控任务状态并在任务完成后复位按钮状态
|
||
|
||
Args:
|
||
task_record_id: 任务记录ID
|
||
device_info: 设备信息
|
||
button_address: 按钮寄存器地址
|
||
device_id: 设备ID
|
||
button_lock_key: 按钮锁定键(格式:button_address_task_id)
|
||
"""
|
||
if not task_record_id:
|
||
logger.warning(f"任务记录ID为空,无法监控任务状态")
|
||
# 释放按钮锁定
|
||
if device_id and button_lock_key:
|
||
with thread_lock:
|
||
if device_id in device_button_locks:
|
||
device_button_locks[device_id][button_lock_key] = False
|
||
return
|
||
|
||
logger.info(f"开始监控任务状态: 任务记录ID={task_record_id}, 设备={device_info['device_name']}, 按钮地址={button_address}")
|
||
|
||
check_interval = 2 # 每2秒检查一次
|
||
elapsed_time = 0
|
||
# try:
|
||
while True:
|
||
try:
|
||
# 使用同步会话避免事件循环冲突
|
||
with get_session() as session:
|
||
task_record = session.query(VWEDTaskRecord).filter(
|
||
VWEDTaskRecord.id == task_record_id
|
||
).first()
|
||
|
||
if not task_record:
|
||
logger.warning(f"未找到任务记录: 任务记录ID={task_record_id}")
|
||
return
|
||
print("task_record.status::::::::", task_record.status, "=====================================")
|
||
|
||
task_status = task_record.status
|
||
if task_status in [1000, 2000, 2001]:
|
||
status_name = {1000: "完成", 2000: "失败", 2001: "取消"}.get(task_status, "未知")
|
||
logger.info(f"任务已{status_name}: 任务记录ID={task_record_id}, 开始复位按钮状态")
|
||
|
||
# 复位按钮状态
|
||
reset_result = await CallDeviceService._reset_button_state(device_info, button_address)
|
||
if reset_result.get("success", False):
|
||
logger.info(f"按钮状态复位成功: 设备={device_info['device_name']}, 按钮地址={button_address}")
|
||
# 无论任务是否完成,都要释放按钮锁定
|
||
if device_id and button_lock_key:
|
||
with thread_lock:
|
||
if device_id in device_button_locks:
|
||
time.sleep(2)
|
||
device_button_locks[device_id][button_lock_key] = False
|
||
logger.info(f"已释放按钮锁定: 设备ID={device_id}, 按钮锁定键={button_lock_key}")
|
||
else:
|
||
logger.error(f"按钮状态复位失败: {reset_result.get('message', '未知错误')}")
|
||
|
||
return # 任务已完成,退出监控
|
||
else:
|
||
logger.info(f"任务未完成: 任务记录ID={task_record_id}, 状态={task_status}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"查询任务状态异常: {str(e)}")
|
||
|
||
# 等待一段时间后再次检查
|
||
await asyncio.sleep(check_interval)
|
||
elapsed_time += check_interval
|
||
# finally:
|
||
|
||
|
||
|
||
|
||
|