VWED_server/services/calldevice_service.py
2025-07-14 10:29:37 +08:00

2206 lines
106 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
呼叫器设备服务模块
提供呼叫器设备相关的服务方法
"""
import uuid
import time
import threading
import asyncio
import aiohttp
import json
import os
import datetime
from typing import Dict, List, Any, Optional, Union
from sqlalchemy import select, func, create_engine
from sqlalchemy.orm import sessionmaker
import requests
from data.models.calldevice import VWEDCallDevice, VWEDCallDeviceButton
from data.models.taskdef import VWEDTaskDef
from data.models.taskrecord import VWEDTaskRecord
from data.session import get_async_session, get_session
from utils.logger import get_logger
from data.enum.call_device_enum import CallDeviceStatus, CallDeviceButtonStatus, CallDeviceButtonType
from data.enum.task_record_enum import TaskStatus
from config.settings import settings
# 设置日志
logger = get_logger("service.calldevice_service")
# 存储设备监控线程的字典
device_monitor_threads = {}
# 存储设备监控状态的字典
device_monitor_status = {}
# 线程锁,用于线程安全操作
thread_lock = threading.Lock()
# 添加按钮级别的任务执行锁定状态字典
device_button_locks = {}
class CallDeviceService:
"""
呼叫器设备服务类
提供与呼叫器设备相关的方法
"""
@staticmethod
async def add_call_device(
protocol: str,
brand: str,
ip: str,
port: int,
device_name: str,
status: int = CallDeviceStatus.START,
slave_id: str = "1",
buttons: List[Dict[str, Any]] = [],
api_token: str = None
) -> Dict[str, Any]:
"""
新增呼叫器设备
Args:
protocol: 协议类型
brand: 品牌
ip: IP地址
port: 端口号
device_name: 设备名称
status: 状态(0:禁用,1:启用)
slave_id: 从机ID
function_code: 功能码
buttons: 按钮配置列表
api_token: 请求头中的API令牌
Returns:
Dict: 包含新增结果的字典
"""
try:
async with get_async_session() as session:
# 检查IP是否已存在
check_ip_query = select(VWEDCallDevice).where(
(VWEDCallDevice.ip == ip) &
(VWEDCallDevice.is_deleted == False)
)
check_ip_result = await session.execute(check_ip_query)
if check_ip_result.scalars().first():
return {
"success": False,
"message": f"IP地址 '{ip}' 已存在"
}
# 验证任务ID是否存在
if buttons:
task_ids_to_check = set()
for button in buttons:
if button.get("vwed_task_id"):
task_ids_to_check.add(button.get("vwed_task_id"))
if button.get("long_vwed_task_id"):
task_ids_to_check.add(button.get("long_vwed_task_id"))
if task_ids_to_check:
# 查询任务定义表检查任务ID是否存在
task_query = select(VWEDTaskDef.id).where(
(VWEDTaskDef.id.in_(list(task_ids_to_check))) &
(VWEDTaskDef.is_deleted == False)
)
task_result = await session.execute(task_query)
found_task_ids = {task_id for task_id, in task_result.all()}
# 找出不存在的任务ID
missing_task_ids = task_ids_to_check - found_task_ids
if missing_task_ids:
return {
"success": False,
"message": f"以下任务ID不存在: {', '.join(missing_task_ids)}"
}
# 创建呼叫器设备实例
device_id = "modbus_device_"+str(uuid.uuid4())
new_device = VWEDCallDevice(
id=device_id,
protocol=protocol,
brand=brand,
ip=ip,
port=port,
device_name=device_name,
status=status,
slave_id=slave_id,
)
# 添加设备
session.add(new_device)
# 添加按钮配置
button_list = []
if buttons:
for button in buttons:
button_id = str(uuid.uuid4())
new_button = VWEDCallDeviceButton(
id=button_id,
device_id=device_id,
signal_name=button.get("signal_name"),
signal_type=button.get("signal_type"),
signal_length=button.get("signal_length"),
register_address=button.get("register_address"),
function_code=button.get("function_code"),
remark=button.get("remark"),
vwed_task_id=button.get("vwed_task_id"),
long_vwed_task_id=button.get("long_vwed_task_id")
)
session.add(new_button)
button_list.append({
"id": button_id,
"signal_name": button.get("signal_name"),
"signal_type": button.get("signal_type"),
"signal_length": button.get("signal_length"),
"register_address": button.get("register_address"),
"function_code": button.get("function_code"),
"remark": button.get("remark"),
"vwed_task_id": button.get("vwed_task_id"),
"long_vwed_task_id": button.get("long_vwed_task_id")
})
# 提交事务
await session.commit()
if status == CallDeviceStatus.START:
# 启动设备监控
await CallDeviceService.start_device_monitor(device_id, api_token)
# 返回结果
return {
"success": True,
"message": "新增呼叫器设备成功",
"data": {
"id": device_id,
"protocol": protocol,
"brand": brand,
"ip": ip,
"port": port,
"device_name": device_name,
"status": status,
"slave_id": slave_id,
"buttons": button_list
}
}
except Exception as e:
logger.error(f"新增呼叫器设备失败: {str(e)}")
return {
"success": False,
"message": f"新增呼叫器设备失败: {str(e)}"
}
@staticmethod
async def update_call_device(
device_id: str,
protocol: str,
brand: str,
ip: str,
port: int,
device_name: str,
status: int = CallDeviceStatus.START,
slave_id: str = "1",
buttons: List[Dict[str, Any]] = []
) -> Dict[str, Any]:
"""
更新呼叫器设备
Args:
device_id: 设备ID
protocol: 协议类型
brand: 品牌
ip: IP地址
port: 端口号
device_name: 设备名称
status: 状态(0:禁用,1:启用)
slave_id: 从机ID
function_code: 功能码
buttons: 按钮配置列表
Returns:
Dict: 包含更新结果的字典
"""
try:
async with get_async_session() as session:
# 检查设备是否存在
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id == device_id) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
device = device_result.scalars().first()
if not device:
return {
"success": False,
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
}
# 检查IP是否已被其他设备使用
if device.ip != ip:
check_ip_query = select(VWEDCallDevice).where(
(VWEDCallDevice.ip == ip) &
(VWEDCallDevice.id != device_id) &
(VWEDCallDevice.is_deleted == False)
)
check_ip_result = await session.execute(check_ip_query)
if check_ip_result.scalars().first():
return {
"success": False,
"message": f"IP地址 '{ip}' 已被其他设备使用"
}
# 验证任务ID是否存在
if buttons:
task_ids_to_check = set()
for button in buttons:
if button.get("vwed_task_id"):
task_ids_to_check.add(button.get("vwed_task_id"))
if button.get("long_vwed_task_id"):
task_ids_to_check.add(button.get("long_vwed_task_id"))
if task_ids_to_check:
# 查询任务定义表检查任务ID是否存在
task_query = select(VWEDTaskDef.id).where(
(VWEDTaskDef.id.in_(list(task_ids_to_check))) &
(VWEDTaskDef.is_deleted == False)
)
task_result = await session.execute(task_query)
found_task_ids = {task_id for task_id, in task_result.all()}
# 找出不存在的任务ID
missing_task_ids = task_ids_to_check - found_task_ids
if missing_task_ids:
return {
"success": False,
"message": f"以下任务ID不存在: {', '.join(missing_task_ids)}"
}
# 更新设备信息
device.protocol = protocol
device.brand = brand
device.ip = ip
device.port = port
device.device_name = device_name
device.status = status
device.slave_id = slave_id
# 删除原有按钮(采用软删除方式)
button_delete_query = select(VWEDCallDeviceButton).where(
(VWEDCallDeviceButton.device_id == device_id) &
(VWEDCallDeviceButton.is_deleted == False)
)
button_delete_result = await session.execute(button_delete_query)
old_buttons = button_delete_result.scalars().all()
for old_button in old_buttons:
old_button.is_deleted = True
# 添加新按钮配置
button_list = []
if buttons:
for button in buttons:
button_id = str(uuid.uuid4())
new_button = VWEDCallDeviceButton(
id=button_id,
device_id=device_id,
signal_name=button.get("signal_name"),
signal_type=button.get("signal_type"),
signal_length=button.get("signal_length"),
register_address=button.get("register_address"),
function_code=button.get("function_code"),
remark=button.get("remark"),
vwed_task_id=button.get("vwed_task_id"),
long_vwed_task_id=button.get("long_vwed_task_id")
)
session.add(new_button)
button_list.append({
"id": button_id,
"signal_name": button.get("signal_name"),
"signal_type": button.get("signal_type"),
"signal_length": button.get("signal_length"),
"register_address": button.get("register_address"),
"function_code": button.get("function_code"),
"remark": button.get("remark"),
"vwed_task_id": button.get("vwed_task_id"),
"long_vwed_task_id": button.get("long_vwed_task_id")
})
# 提交事务
await session.commit()
# 返回结果
return {
"success": True,
"message": "更新呼叫器设备成功",
"data": {
"id": device_id,
"protocol": protocol,
"brand": brand,
"ip": ip,
"port": port,
"device_name": device_name,
"status": status,
"slave_id": slave_id,
"buttons": button_list
}
}
except Exception as e:
logger.error(f"更新呼叫器设备失败: {str(e)}")
return {
"success": False,
"message": f"更新呼叫器设备失败: {str(e)}"
}
@staticmethod
async def delete_call_device(device_ids: Union[str, List[str]]) -> Dict[str, Any]:
"""
删除呼叫器设备(软删除)
Args:
device_ids: 设备ID或设备ID列表
Returns:
Dict: 包含删除结果的字典
"""
try:
# 将单个ID转换为列表
if isinstance(device_ids, str):
device_ids = [device_ids]
if not device_ids:
return {
"success": False,
"message": "未提供设备ID"
}
async with get_async_session() as session:
# 检查设备是否存在
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id.in_(device_ids)) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
devices = device_result.scalars().all()
found_ids = [device.id for device in devices]
not_found_ids = list(set(device_ids) - set(found_ids))
if not_found_ids:
return {
"success": False,
"message": f"未找到以下ID的呼叫器设备: {', '.join(not_found_ids)}"
}
# 软删除设备
for device in devices:
device.is_deleted = True
# 软删除关联的按钮
button_query = select(VWEDCallDeviceButton).where(
(VWEDCallDeviceButton.device_id.in_(device_ids)) &
(VWEDCallDeviceButton.is_deleted == False)
)
button_result = await session.execute(button_query)
buttons = button_result.scalars().all()
for button in buttons:
button.is_deleted = True
# 提交事务
await session.commit()
# 返回结果
return {
"success": True,
"message": f"成功删除 {len(devices)} 个呼叫器设备",
"data": {
"ids": found_ids
}
}
except Exception as e:
logger.error(f"删除呼叫器设备失败: {str(e)}")
return {
"success": False,
"message": f"删除呼叫器设备失败: {str(e)}"
}
@staticmethod
async def get_call_device_list(
page: int = 1,
page_size: int = 10,
device_name: str = None,
ip: str = None,
protocol: str = None,
status: int = None
) -> Dict[str, Any]:
"""
获取呼叫器设备列表
Args:
page: 页码
page_size: 每页数量
device_name: 设备名称搜索
ip: IP地址搜索
protocol: 协议类型搜索
status: 状态过滤(0:禁用,1:启用)
Returns:
Dict: 包含设备列表和分页信息的字典
"""
try:
async with get_async_session() as session:
# 构建查询 - 使用子查询计算按钮数量
button_count_subquery = (
select(
VWEDCallDeviceButton.device_id,
func.count(VWEDCallDeviceButton.id).label("button_count")
)
.where(VWEDCallDeviceButton.is_deleted == False)
.group_by(VWEDCallDeviceButton.device_id)
.subquery()
)
# 主查询
query = select(
VWEDCallDevice,
func.coalesce(button_count_subquery.c.button_count, 0).label("button_count")
).outerjoin(
button_count_subquery,
VWEDCallDevice.id == button_count_subquery.c.device_id
).where(VWEDCallDevice.is_deleted == False)
# 添加过滤条件
if device_name:
query = query.where(VWEDCallDevice.device_name.like(f"%{device_name}%"))
if ip:
query = query.where(VWEDCallDevice.ip.like(f"%{ip}%"))
if protocol:
query = query.where(VWEDCallDevice.protocol == protocol)
if status is not None:
query = query.where(VWEDCallDevice.status == status)
# 计算总数
count_query = select(func.count()).select_from(VWEDCallDevice).where(
VWEDCallDevice.is_deleted == False
)
if device_name:
count_query = count_query.where(VWEDCallDevice.device_name.like(f"%{device_name}%"))
if ip:
count_query = count_query.where(VWEDCallDevice.ip.like(f"%{ip}%"))
if protocol:
count_query = count_query.where(VWEDCallDevice.protocol == protocol)
if status is not None:
count_query = count_query.where(VWEDCallDevice.status == status)
count_result = await session.execute(count_query)
total = count_result.scalar() or 0
# 分页
query = query.order_by(VWEDCallDevice.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
# 执行查询
result = await session.execute(query)
# 格式化结果
device_list = []
for device, button_count in result:
device_dict = {
"id": device.id,
"protocol": device.protocol,
"brand": device.brand,
"ip": device.ip,
"port": device.port,
"device_name": device.device_name,
"status": device.status,
"slave_id": device.slave_id,
"created_at": device.created_at.strftime("%Y-%m-%d %H:%M:%S") if device.created_at else None,
"button_count": button_count
}
device_list.append(device_dict)
# 返回结果
return {
"success": True,
"message": "获取呼叫器设备列表成功",
"data": {
"list": device_list,
"pagination": {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
}
}
except Exception as e:
logger.error(f"获取呼叫器设备列表失败: {str(e)}")
return {
"success": False,
"message": f"获取呼叫器设备列表失败: {str(e)}"
}
@staticmethod
async def get_call_device_detail(device_id: str) -> Dict[str, Any]:
"""
获取呼叫器设备详情
Args:
device_id: 设备ID
Returns:
Dict: 包含设备详情的字典
"""
try:
async with get_async_session() as session:
# 查询设备
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id == device_id) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
device = device_result.scalars().first()
if not device:
return {
"success": False,
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
}
# 查询按钮信息
button_query = select(VWEDCallDeviceButton).where(
(VWEDCallDeviceButton.device_id == device_id) &
(VWEDCallDeviceButton.is_deleted == False)
)
button_result = await session.execute(button_query)
buttons = button_result.scalars().all()
# 格式化按钮信息
button_list = []
for button in buttons:
button_dict = {
"id": button.id,
"signal_name": button.signal_name,
"signal_type": button.signal_type,
"signal_length": button.signal_length,
"register_address": button.register_address,
"function_code": button.function_code,
"remark": button.remark,
"vwed_task_id": button.vwed_task_id,
"long_vwed_task_id": button.long_vwed_task_id,
"created_at": button.created_at.strftime("%Y-%m-%d %H:%M:%S") if button.created_at else None
}
button_list.append(button_dict)
# 格式化设备信息
device_detail = {
"id": device.id,
"protocol": device.protocol,
"brand": device.brand,
"ip": device.ip,
"port": device.port,
"device_name": device.device_name,
"status": device.status,
"slave_id": device.slave_id,
"created_at": device.created_at.strftime("%Y-%m-%d %H:%M:%S") if device.created_at else None,
"updated_at": device.updated_at.strftime("%Y-%m-%d %H:%M:%S") if device.updated_at else None,
"buttons": button_list
}
# 返回结果
return {
"success": True,
"message": "获取呼叫器设备详情成功",
"data": device_detail
}
except Exception as e:
logger.error(f"获取呼叫器设备详情失败: {str(e)}")
return {
"success": False,
"message": f"获取呼叫器设备详情失败: {str(e)}"
}
@staticmethod
async def export_call_devices(device_ids: List[str]) -> Dict[str, Any]:
"""
导出呼叫器设备
Args:
device_ids: 要导出的设备ID列表
Returns:
Dict: 包含导出设备信息的字典
"""
try:
async with get_async_session() as session:
# 检查设备是否存在
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id.in_(device_ids)) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
devices = device_result.scalars().all()
found_ids = [device.id for device in devices]
not_found_ids = list(set(device_ids) - set(found_ids))
if not_found_ids:
return {
"success": False,
"message": f"未找到以下ID的呼叫器设备: {', '.join(not_found_ids)}"
}
# 导出设备数据
exported_devices = []
for device in devices:
# 查询关联的按钮
button_query = select(VWEDCallDeviceButton).where(
(VWEDCallDeviceButton.device_id == device.id) &
(VWEDCallDeviceButton.is_deleted == False)
)
button_result = await session.execute(button_query)
buttons = button_result.scalars().all()
# 格式化按钮数据
buttons_data = []
for button in buttons:
button_data = {
"signal_name": button.signal_name,
"signal_type": button.signal_type,
"signal_length": button.signal_length,
"register_address": button.register_address,
"function_code": button.function_code,
"remark": button.remark,
"vwed_task_id": button.vwed_task_id,
"long_vwed_task_id": button.long_vwed_task_id
}
buttons_data.append(button_data)
# 格式化设备数据
device_data = {
"protocol": device.protocol,
"brand": device.brand,
"ip": device.ip,
"port": device.port,
"device_name": device.device_name,
"status": device.status,
"slave_id": device.slave_id,
"buttons": buttons_data
}
exported_devices.append(device_data)
return {
"success": True,
"message": f"成功导出 {len(exported_devices)} 个呼叫器设备",
"data": exported_devices
}
except Exception as e:
logger.error(f"导出呼叫器设备失败: {str(e)}")
return {
"success": False,
"message": f"导出呼叫器设备失败: {str(e)}"
}
@staticmethod
async def import_call_devices(devices_data: List[Dict[str, Any]], rename_suffix: str = None) -> Dict[str, Any]:
"""
导入呼叫器设备
Args:
devices_data: 要导入的设备数据列表
rename_suffix: 重命名后缀,用于避免名称冲突
Returns:
Dict: 包含导入结果的字典
"""
try:
import_results = {
"success_count": 0,
"failed_count": 0,
"failed_devices": [],
"imported_devices": []
}
for index, device_data in enumerate(devices_data):
try:
# 验证必要字段
required_fields = ["protocol", "brand", "ip", "port", "device_name"]
for field in required_fields:
if field not in device_data:
raise ValueError(f"设备数据缺少{field}字段,无法导入")
# 处理设备名称,添加后缀以避免冲突
original_name = device_data["device_name"]
if rename_suffix:
device_data["device_name"] = f"{original_name}{rename_suffix}"
# 处理按钮数据
buttons = device_data.get("buttons", [])
buttons_dict = []
for button in buttons:
if "signal_name" not in button:
continue
button_dict = {
"signal_name": button.get("signal_name"),
"signal_type": button.get("signal_type"),
"signal_length": button.get("signal_length"),
"register_address": button.get("register_address"),
"function_code": button.get("function_code"),
"remark": button.get("remark"),
"vwed_task_id": button.get("vwed_task_id"),
"long_vwed_task_id": button.get("long_vwed_task_id")
}
buttons_dict.append(button_dict)
# 调用添加设备方法
result = await CallDeviceService.add_call_device(
protocol=device_data.get("protocol"),
brand=device_data.get("brand"),
ip=device_data.get("ip"),
port=device_data.get("port"),
device_name=device_data.get("device_name"),
status=device_data.get("status", CallDeviceStatus.START),
slave_id=device_data.get("slave_id", "1"),
buttons=buttons_dict
)
if result.get("success", False):
import_results["success_count"] += 1
import_results["imported_devices"].append({
"id": result.get("data", {}).get("id", ""),
"device_name": device_data.get("device_name"),
"original_name": original_name
})
else:
import_results["failed_count"] += 1
import_results["failed_devices"].append({
"index": index,
"device_name": device_data.get("device_name"),
"reason": result.get("message", "未知原因")
})
except Exception as e:
import_results["failed_count"] += 1
import_results["failed_devices"].append({
"index": index,
"device_name": device_data.get("device_name", "未知设备"),
"reason": str(e)
})
# 构建返回消息
if import_results["success_count"] > 0 and import_results["failed_count"] == 0:
message = f"成功导入 {import_results['success_count']} 个呼叫器设备"
elif import_results["success_count"] > 0 and import_results["failed_count"] > 0:
# 提取错误原因
error_reasons = [device.get("reason", "未知原因") for device in import_results["failed_devices"]]
message = f"部分导入成功: 成功 {import_results['success_count']} 个, 失败 {import_results['failed_count']} 个, 原因: {', '.join(error_reasons)}"
else:
# 提取错误原因
error_reasons = [device.get("reason", "未知原因") for device in import_results["failed_devices"]]
message = f"导入失败: 所有 {import_results['failed_count']} 个呼叫器设备导入失败, 原因: {', '.join(error_reasons)}"
return {
"success": import_results["success_count"] > 0,
"message": message,
"data": import_results
}
except Exception as e:
logger.error(f"导入呼叫器设备失败: {str(e)}")
return {
"success": False,
"message": f"导入呼叫器设备失败: {str(e)}",
"data": {
"success_count": 0,
"failed_count": 0,
"failed_devices": [],
"imported_devices": []
}
}
@staticmethod
async def start_device_monitor(device_id: str, api_token: str) -> Dict[str, Any]:
"""
启动设备监控
Args:
device_id: 设备ID
api_token: 系统任务令牌
Returns:
Dict: 包含启动结果的字典
"""
try:
# 检查设备是否存在
async with get_async_session() as session:
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id == device_id) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
device = device_result.scalars().first()
if not device:
return {
"success": False,
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
}
# 查询设备按钮配置
button_query = select(VWEDCallDeviceButton).where(
(VWEDCallDeviceButton.device_id == device_id) &
(VWEDCallDeviceButton.is_deleted == False)
)
button_result = await session.execute(button_query)
buttons = button_result.scalars().all()
if not buttons:
return {
"success": False,
"message": f"设备 '{device.device_name}' 没有配置按钮,无法启动监控"
}
# 更新设备状态为启用状态
device.status = CallDeviceStatus.START
await session.commit()
# 获取设备和按钮信息,传递给监控线程
device_info = {
"id": device.id,
"protocol": device.protocol,
"brand": device.brand,
"ip": device.ip,
"port": device.port,
"device_name": device.device_name,
"slave_id": device.slave_id,
"is_init": device.is_init,
"buttons": []
}
for button in buttons:
button_info = {
"id": button.id,
"signal_name": button.signal_name,
"signal_type": button.signal_type,
"signal_length": button.signal_length,
"register_address": button.register_address,
"function_code": button.function_code,
"remark": button.remark,
"vwed_task_id": button.vwed_task_id,
"long_vwed_task_id": button.long_vwed_task_id
}
device_info["buttons"].append(button_info)
with thread_lock:
# 检查是否已存在监控线程
if device_id in device_monitor_threads and device_monitor_threads[device_id].is_alive():
return {
"success": False,
"message": f"设备 '{device_info['device_name']}' 已在监控中"
}
# 设置设备监控状态为运行中
device_monitor_status[device_id] = {
"status": "running",
"start_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"device_name": device_info["device_name"]
}
# 创建并启动监控线程
monitor_thread = threading.Thread(
target=CallDeviceService._device_monitor_thread,
args=(device_info, api_token),
daemon=True # 设置为守护线程,这样当主线程退出时,此线程也会退出
)
monitor_thread.start()
# 保存线程引用
device_monitor_threads[device_id] = monitor_thread
logger.info(f"已启动设备 '{device_info['device_name']}' 的监控线程")
return {
"success": True,
"message": f"成功启动设备 '{device_info['device_name']}' 的监控",
"data": {
"device_id": device_id,
"device_name": device_info["device_name"],
"status": "running",
"start_time": device_monitor_status[device_id]["start_time"]
}
}
except Exception as e:
logger.error(f"启动设备监控失败: {str(e)}")
return {
"success": False,
"message": f"启动设备监控失败: {str(e)}"
}
@staticmethod
def _device_monitor_thread(device_info: Dict[str, Any], api_token: str):
"""
设备监控线程函数
Args:
device_info: 设备信息字典
"""
device_id = device_info["id"]
device_name = device_info["device_name"]
device_ip = str(device_info["ip"])
device_port = str(device_info["port"])
device_slave_id = str(device_info["slave_id"])
device_ip_port_slave_id = device_ip+"-"+device_port+"-"+device_slave_id
logger.info(f"设备 '{device_name}' 监控线程已启动")
# 初始化设备按钮锁定状态
with thread_lock:
device_button_locks[device_id] = {}
# 创建一个新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 初始化
with get_session() as session:
calldevice = session.query(VWEDCallDevice).filter(VWEDCallDevice.id == device_id).first()
button_infos = {c.register_address: [c.signal_type, c.long_vwed_task_id, c.vwed_task_id] for c in calldevice.buttons}
# print("button_infos::::::::", button_infos, "=====================================")
if not calldevice.is_init:
init_result = loop.run_until_complete(
CallDeviceService._init_device(device_info))
# time.sleep(3)
if init_result.get("success", False):
logger.info(f"设备 '{device_name}' 初始化成功")
# 使用同步方式更新数据库中设备的初始化状态
# 更改状态
result = CallDeviceService._update_device_init_status(device_id, True)
try:
# 持续监控设备状态,直到线程被取消
while device_id in device_monitor_status and device_monitor_status[device_id]["status"] == "running":
try:
# 读取所有按钮状态
# for button_address, button_info in button_states.items():
# 调用新的接口读取按钮状态
current_state = loop.run_until_complete(
CallDeviceService._read_button_state(
device_info, button_infos
)
)
if isinstance(current_state, int) and current_state == 0:
logger.info(f"设备不在线 '{device_name}' 的监控已停止")
return
num = 0
# print("current_state::::::::", current_state, "=====================================")
for button_info in current_state:
num += 1
button_address = button_info["register_address"]
button_values = button_info["register_values"]
bind_value = button_info.get("bind_value", "")
button_values = eval(button_values)
device_ip_port_slave_id_new = device_ip_port_slave_id + "-" + button_address
# print("button_values::::::::", button_info, "=====================================")
with get_session() as session:
task_record_all = session.query(VWEDTaskRecord).filter(VWEDTaskRecord.source_device == device_ip_port_slave_id_new, VWEDTaskRecord.status.in_([TaskStatus.RUNNING, TaskStatus.QUEUED, TaskStatus.PAUSED])).all()
print("task_record_all::::::::", len(task_record_all), num, "=====================================")
print("button_values::::::::", button_values, "=====================================")
if button_values and button_values[0] == CallDeviceButtonStatus.DOWN :
if bind_value:
try:
# task_record_all = session.query(VWEDTaskRecord).filter(VWEDTaskRecord.source_device == device_ip_port_slave_id).all()
if not task_record_all:
# 生成按钮锁定键,每个按钮+任务组合一个锁
button_lock_key = f"{button_address}_{bind_value}"
# 检查该按钮的该任务是否已经在处理中
with thread_lock:
if device_button_locks[device_id].get(button_lock_key, False):
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 任务 {bind_value} 正在处理中,跳过重复触发")
# time.sleep(2)
continue
# 设置按钮任务锁定状态
device_button_locks[device_id][button_lock_key] = True
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 任务 {bind_value} 已锁定,开始处理")
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 被按下,触发任务 {bind_value}")
task_result = loop.run_until_complete(
CallDeviceService._execute_task(
bind_value,
device_ip_port_slave_id_new,
api_token
)
)
print("task_result::::::::", task_result, "=====================================")
# 如果任务启动成功,启动异步任务状态监控
if task_result.get("success", False):
task_record_id = task_result.get("task_record_id")
if task_record_id:
logger.info(f"任务启动成功,开始监控任务状态: 任务记录ID={task_record_id}")
# 创建异步任务来监控任务状态并复位按钮
loop.create_task(
CallDeviceService._monitor_task_and_reset_button(
task_record_id,
device_info,
button_address,
device_id,
button_lock_key
)
)
else:
logger.warning(f"任务启动成功但未获取到任务记录ID无法监控任务状态")
# 如果任务启动成功但未获取到任务记录ID则将按钮状态设置为初始化状态
result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
if result.get("success", False):
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
else:
logger.error(f"按钮 {button_address} 状态重置失败: {result.get('message', '未知错误')}")
# 释放按钮锁定
with thread_lock:
device_button_locks[device_id][button_lock_key] = False
else:
logger.error(f"任务启动失败: {task_result.get('message', '未知错误')}")
# 如果任务启动失败,则将按钮状态设置为初始化状态
result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
if result.get("success", False):
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
else:
logger.error(f"按钮 {button_address} 状态重置失败: {result.get('message', '未知错误')}")
# 释放按钮锁定
with thread_lock:
device_button_locks[device_id][button_lock_key] = False
# else:
# logger.info(f"设备 '{device_name}' 有任务正在运行,按钮 {button_address} 任务 {bind_value} 跳过执行")
# # 释放按钮锁定
# with thread_lock:
# device_button_locks[device_id][button_lock_key] = False
except Exception as e:
logger.error(f"处理按钮 {button_address} 任务时发生异常: {str(e)}")
# 发生异常时也要释放锁定
with thread_lock:
device_button_locks[device_id][button_lock_key] = False
else:
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 被按下未配置任务ID跳过任务执行")
# 如果按钮状态为初始化状态,则将按钮状态设置为初始化状态
result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
if result.get("success", False):
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
else:
logger.error(f"按钮 {button_address} 状态重置失败: {result.get('message', '未知错误')}")
elif task_record_all and button_values and button_values[0] == CallDeviceButtonStatus.CANCEL:
# print("button_values[0]::::::::", button_values[0], "=====================================")
if button_infos.get(button_address, None)[1] != button_infos.get(button_address, None)[2]:
logger.warning(f"该设备按钮 {button_address} 触发任务与取消任务不一致,无法取消任务")
continue
# 如果按钮状态为初始化状态,则将按钮状态设置为初始化状态
if bind_value:
# 生成按钮取消锁定键
cancel_lock_key = f"cancel_{button_address}_{bind_value}"
# 检查该按钮的取消操作是否已经在处理中
with thread_lock:
if device_button_locks[device_id].get(cancel_lock_key, False):
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 取消操作正在处理中,跳过重复取消")
continue
# 设置按钮取消锁定状态
device_button_locks[device_id][cancel_lock_key] = True
logger.info(f"设备 '{device_name}' 的按钮 {button_address} 取消操作已锁定,开始处理")
try:
with get_session() as session:
task_record_all = session.query(VWEDTaskRecord).filter(VWEDTaskRecord.source_device == device_ip_port_slave_id_new, VWEDTaskRecord.status.in_([TaskStatus.RUNNING, TaskStatus.QUEUED, TaskStatus.PAUSED])).all()
if task_record_all:
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 被取消,停止任务")
from services.task_service import TaskService
task_record_id = task_record_all[0].id
# result = TaskService.stop_task_def(task_record_id)
task_result = loop.run_until_complete(
CallDeviceService._cancel_task(task_record_id)
)
if task_result.get("success", False):
logger.info(f"任务 {task_record_id} 已取消")
# 将按钮状态设置为初始化状态
reset_result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
if reset_result.get("success", False):
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
else:
logger.error(f"按钮 {button_address} 状态重置失败: {reset_result.get('message', '未知错误')}")
else:
logger.error(f"任务 {task_record_id} 取消失败: {result.get('message', '未知错误')}")
else:
logger.info(f"设备 '{device_name}' 没有正在运行的任务,直接重置按钮状态")
# 将按钮状态设置为初始化状态
reset_result = CallDeviceService.synchronous_reset_button_state(device_info, button_address)
if reset_result.get("success", False):
logger.info(f"按钮 {button_address} 状态已重置为初始化状态")
else:
logger.error(f"按钮 {button_address} 状态重置失败: {reset_result.get('message', '未知错误')}")
finally:
# 释放取消操作锁定
with thread_lock:
device_button_locks[device_id][cancel_lock_key] = False
logger.info(f"已释放取消操作锁定: 设备ID={device_id}, 取消锁定键={cancel_lock_key}")
else:
logger.info(f"设备 '{device_name}' 的按钮 {button_address} ({button_info['signal_name']}) 取消任务未配置任务ID跳过任务执行")
# 每1秒检查一次
time.sleep(1)
except Exception as e:
logger.error(f"设备 '{device_name}' 监控过程中发生错误: {str(e)}")
# 出错后暂停一段时间再继续
time.sleep(5)
except Exception as e:
logger.error(f"设备 '{device_name}' 监控线程异常: {str(e)}")
finally:
# 关闭事件循环
loop.close()
logger.info(f"设备 '{device_name}' 监控线程已结束")
# 清理设备按钮锁定状态
with thread_lock:
if device_id in device_button_locks:
del device_button_locks[device_id]
# 更新设备状态
with thread_lock:
if device_id in device_monitor_status:
device_monitor_status[device_id]["status"] = "stopped"
device_monitor_status[device_id]["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with get_session() as session:
calldevice = session.query(VWEDCallDevice).filter(VWEDCallDevice.id == device_id).first()
calldevice.status = CallDeviceStatus.END
session.commit()
@staticmethod
async def _read_button_state(device_info: Dict[str, Any], button_infos: Dict[str, Any] = None) -> int:
"""
读取按钮状态
Args:
device_info: 设备信息
button_address: 按钮地址
Returns:
int: 按钮状态(0或1)
"""
try:
# 构建设备ID格式为 agv:ip-port-slave_id
serial_number = f"{device_info.get('ip')}-{device_info.get('port')}-{device_info.get('slave_id')}"
agv_id = f"agvState:{serial_number}"
# 准备请求参数
request_data = {
"agvId": agv_id
}
# API URL
api_url = settings.CALLDEVICE_API_BASE_URL
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['get_device_state']}"
headers = {
"Content-Type": "application/json",
"User-Agent": "CallDeviceMonitor/1.0"
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=request_data, headers=headers) as response:
if response.status == 200:
response_data = await response.json()
if response_data.get("success", False) and response_data.get("data"):
# 解析返回的数据
data_list = response_data.get("data", [])
if data_list is None:
return 0
if data_list and len(data_list) > 0:
# 数据是JSON字符串需要解析
device_state_str = data_list[0]
# 处理双重转义的JSON字符串
try:
# 首先解析外层的JSON字符串
if isinstance(device_state_str, str):
# 如果是字符串,先解析一次
device_state_str = json.loads(device_state_str)
# 如果解析后还是字符串,再解析一次
if isinstance(device_state_str, str):
device_state = json.loads(device_state_str)
else:
device_state = device_state_str
except json.JSONDecodeError as e:
logger.error(f"解析设备状态JSON失败: {str(e)}, 原始数据: {device_state_str}")
return 0
result = []
# 查找对应按钮的状态
if device_state is None:
return 0
information = device_state.get("information", [])
for info in information:
if info.get("infotype") == "CUSTOM_REGISTER":
# 查找对应的寄存器信息
references = info.get("inforeferences", [])
register_name = None
register_address = None
register_values = None
bind_value = None
for ref in references:
# print("ref::::::::", ref, "=====================================")
if ref.get("referencekey") == "name":
register_name = ref.get("referencevalue")
elif ref.get("referencekey") == "address":
register_address = ref.get("referencevalue")
if button_infos and button_infos.get(register_address, None)[0] != CallDeviceButtonType.BUTTON:
continue
elif ref.get("referencekey") == "values":
register_values = ref.get("referencevalue")
elif ref.get("referencekey") == "bind":
bind_value = ref.get("referencevalue")
button_result = {
"register_address": register_address,
"register_values": register_values,
"bind_value": bind_value,
"signal_name": register_name,
# "vwed_task_id": button_config.get("vwed_task_id") if button_config else "",
}
result.append(button_result)
return result
else:
logger.warning(f"获取设备状态失败: {response_data.get('message', '未知错误')}")
return 0
else:
response_text = await response.text()
logger.warning(f"获取设备状态请求失败,状态码: {response.status}, 响应: {response_text}")
return 0
except asyncio.TimeoutError:
logger.error(f"获取设备状态超时: {url}")
return 0
except aiohttp.ClientError as e:
logger.error(f"获取设备状态网络错误: {str(e)}")
return 0
except Exception as e:
logger.error(f"读取按钮状态错误: {str(e)}")
return 0
@staticmethod
async def _execute_task(task_id: str, device_name: str, api_token: str) -> Dict[str, Any]:
"""
执行任务
Args:
task_id: 任务ID
device_name: 设备名称
api_token: 系统任务令牌
Returns:
Dict: 执行结果包含任务记录ID
"""
try:
from data.enum.task_record_enum import SourceType
# 准备请求参数
run_request = {
"taskId": task_id,
"source_system": "calldevice_monitor",
"source_device": f"{device_name}",
"source_type": SourceType.CALL_MACHINE,
"params": [] # 可以根据需要传入参数
}
if not api_token:
return {
"success": False,
"message": "系统任务令牌为空",
"data": {}
}
# 获取本地服务器URL (从当前请求环境中获取基础URL)
base_url = settings.TASK_EXECUTION_API_BASE_URL
# 检查是否有环境变量配置的API URL
import os
api_url = os.environ.get("VWED_API_URL", base_url)
# 调用任务执行API
async with aiohttp.ClientSession() as session:
url = f"{api_url}{settings.TASK_EXECUTION_API_ENDPOINTS['run_task']}"
# 添加必要的请求头
headers = {
"Content-Type": "application/json",
"User-Agent": "CallDeviceMonitor/1.0",
"X-Source-System": "calldevice_monitor",
"X-Source-Device": device_name,
"x-access-token": api_token,
"authorization": api_token
}
logger.info(f"正在调用任务执行API: {url}, 任务ID: {task_id}, 来源设备: {device_name}")
try:
async with session.post(url, json=run_request, headers=headers) as response:
response_text = await response.text()
try:
response_data = json.loads(response_text)
except json.JSONDecodeError:
logger.error(f"解析任务执行API响应失败响应内容不是有效的JSON: {response_text}")
return {
"success": False,
"message": "解析任务执行API响应失败响应内容不是有效的JSON",
"data": {"response_text": response_text}
}
if response.status == 200:
logger.info(f"任务启动成功: {task_id}, 响应: {response_data}")
# 提取任务记录ID
task_record_id = response_data.get("data", {}).get("taskRecordId")
return {
"success": True,
"message": "任务启动成功",
"data": response_data,
"task_record_id": task_record_id # 返回任务记录ID用于后续状态监控
}
else:
logger.warning(f"任务启动失败: {response_data}")
return {
"success": False,
"message": f"任务启动失败: {response_data.get('message', '未知错误')}",
"data": response_data
}
except asyncio.TimeoutError:
logger.error(f"调用任务执行API超时: {url}")
return {
"success": False,
"message": "调用任务执行API超时",
"data": {"url": url}
}
except aiohttp.ClientError as e:
logger.error(f"调用任务执行API出错: {str(e)}")
return {
"success": False,
"message": f"调用任务执行API出错: {str(e)}",
"data": {"url": url}
}
except Exception as e:
logger.error(f"执行任务错误: {str(e)}")
return {
"success": False,
"message": f"执行任务错误: {str(e)}"
}
@staticmethod
async def _cancel_task(task_record_id: str) -> Dict[str, Any]:
"""
取消任务
Args:
task_record_id: 任务记录ID
Returns:
Dict: 执行结果包含任务记录ID
"""
try:
from data.enum.task_record_enum import SourceType
# 准备请求参数
run_request = {
"task_record_id": task_record_id,
}
# 获取本地服务器URL (从当前请求环境中获取基础URL)
base_url = settings.TASK_EXECUTION_API_BASE_URL
# 检查是否有环境变量配置的API URL
import os
api_url = os.environ.get("VWED_API_URL", base_url)
# 调用任务执行API
async with aiohttp.ClientSession() as session:
url = f"{api_url}{settings.TASK_EXECUTION_API_ENDPOINTS['stop_task']}/{task_record_id}"
# 添加必要的请求头
headers = {
"Content-Type": "application/json",
"User-Agent": "CallDeviceMonitor/1.0",
"X-Source-System": "calldevice_monitor",
}
try:
async with session.post(url, json=run_request, headers=headers) as response:
response_text = await response.text()
try:
response_data = json.loads(response_text)
except json.JSONDecodeError:
logger.error(f"解析任务执行API响应失败响应内容不是有效的JSON: {response_text}")
return {
"success": False,
"message": "解析任务执行API响应失败响应内容不是有效的JSON",
"data": {"response_text": response_text}
}
if response.status == 200:
logger.info(f"任务取消成功: {task_record_id}, 响应: {response_data}")
# 提取任务记录ID
task_record_id = response_data.get("data", {}).get("taskRecordId")
return {
"success": True,
"message": "任务取消成功",
"data": response_data,
"task_record_id": task_record_id # 返回任务记录ID用于后续状态监控
}
else:
logger.warning(f"任务取消失败: {response_data}")
return {
"success": False,
"message": f"任务取消失败: {response_data.get('message', '未知错误')}",
"data": response_data
}
except asyncio.TimeoutError:
logger.error(f"调用任务取消API超时: {url}")
return {
"success": False,
"message": "调用任务取消API超时",
"data": {"url": url}
}
except aiohttp.ClientError as e:
logger.error(f"调用任务取消API出错: {str(e)}")
return {
"success": False,
"message": f"调用任务取消API出错: {str(e)}",
"data": {"url": url}
}
except Exception as e:
logger.error(f"执行任务错误: {str(e)}")
return {
"success": False,
"message": f"执行任务错误: {str(e)}"
}
@staticmethod
async def stop_device_monitor(device_id: str) -> Dict[str, Any]:
"""
停止设备监控
Args:
device_id: 设备ID
Returns:
Dict: 包含停止结果的字典
"""
try:
# 检查设备是否存在
async with get_async_session() as session:
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id == device_id) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
device = device_result.scalars().first()
if not device:
return {
"success": False,
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
}
device_name = device.device_name
with thread_lock:
# 检查是否存在监控线程
if device_id not in device_monitor_threads or not device_monitor_threads[device_id].is_alive():
return {
"success": False,
"message": f"设备 '{device_name}' 当前未在监控中"
}
# 更新设备监控状态为停止
if device_id in device_monitor_status:
device_monitor_status[device_id]["status"] = "stopping"
# 线程会自行检测状态变化并退出
# 等待线程结束(但不阻塞太久)
monitor_thread = device_monitor_threads[device_id]
# 设置一个超时时间,避免无限等待
max_wait_time = 5 # 最多等待5秒
wait_count = 0
# 释放锁后等待线程结束
while monitor_thread.is_alive() and wait_count < max_wait_time:
time.sleep(1)
wait_count += 1
# 即使线程未完全结束,也返回成功(线程会自行结束)
logger.info(f"已停止设备 '{device_name}' 的监控线程")
# 更新设备状态为禁用状态
async with get_async_session() as session:
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id == device_id) &
(VWEDCallDevice.is_deleted == False)
)
device_result = await session.execute(device_query)
device = device_result.scalars().first()
if device:
device.status = CallDeviceStatus.END
await session.commit()
with thread_lock:
# 清理线程引用
if device_id in device_monitor_threads:
del device_monitor_threads[device_id]
# 更新监控状态
if device_id in device_monitor_status:
device_monitor_status[device_id]["status"] = "stopped"
device_monitor_status[device_id]["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return {
"success": True,
"message": f"成功停止设备 '{device_name}' 的监控",
"data": {
"device_id": device_id,
"device_name": device_name,
"status": "stopped",
"end_time": device_monitor_status[device_id]["end_time"] if device_id in device_monitor_status else None
}
}
except Exception as e:
logger.error(f"停止设备监控失败: {str(e)}")
return {
"success": False,
"message": f"停止设备监控失败: {str(e)}"
}
@staticmethod
async def _init_device(device_info: Dict[str, Any]) -> Dict[str, Any]:
"""
初始化设备
Args:
device_info: 设备信息
Returns:
Dict: 初始化结果
"""
try:
# 准备初始化请求
print("device_info::::::::", device_info, "=====================================")
# serial_number = device_info.get("ip")+"-"+str(device_info.get("port"))+"-"+str(device_info.get("slave_id"))
serial_number = "EXT-0"
target_agv_id = serial_number # 在呼叫器中targetAgvId、serialNumber设置为相同的值
# 生成当前时间戳,格式为 ISO 8601
current_time = datetime.datetime.now().isoformat()
# 构建registers参数基于设备的按钮配置
registers = []
for button in device_info.get("buttons", []):
register_item = {
# "id": button.get("id"),
"fnCode": button.get("function_code"),
"name": button.get("signal_name"),
"regCount": button.get("signal_length"),
"regAddress": button.get("register_address"),
"bind": button.get("vwed_task_id"),
# "long_vwed_task_id": button.get("long_vwed_task_id")
}
registers.append(register_item)
# 构造初始化请求使用新的instantActions格式
init_request = {
"targetAgvId": target_agv_id,
"instantActions": {
"version": "2.0.0",
"headerId": int(time.time()),
"timestamp": current_time,
"manufacturer": "SEER",
"serialNumber": serial_number,
"instantActions": [
{
"actionType": "deviceSetup",
"actionId": device_info.get("id"),
"blockingType": "HARD",
"actionParameters": [
{
"key": "protocolType",
"value": device_info.get("protocol")
},
{
"key": "brandName",
"value": device_info.get("brand")
},
{
"key": "ip",
"value": device_info.get("ip")
},
{
"key": "port",
"value": str(device_info.get("port"))
},
{
"key": "deviceName",
"value": device_info.get("device_name")
},
{
"key": "slaveId",
"value": device_info.get("slave_id")
},
{
"key": "registers",
"value": json.dumps(registers)
}
],
"actionDescription": "action parameters"
}
]
}
}
logger.info(f"开始初始化设备: {device_info['device_name']}, 请求数据: {json.dumps(init_request)}")
# 获取本地服务器URL或环境变量配置的API URL
import socket
import os
hostname = socket.gethostname()
server_ip = socket.gethostbyname(hostname)
api_url = settings.CALLDEVICE_API_INIT_BASE_URL
# 调用初始化API
async with aiohttp.ClientSession() as session:
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['init_device']}"
print("url::::::::", url, "=====================================")
headers = {
"Content-Type": "application/json",
"User-Agent": "CallDeviceMonitor/1.0"
}
try:
async with session.post(url, json=init_request, headers=headers) as response:
response_text = await response.text()
print("response_text::::::::", response_text, "=====================================")
if response.status == 200:
# 尝试解析响应
try:
response_data = json.loads(response_text) if response_text else {}
logger.info(f"设备初始化成功: {device_info['device_name']}, 响应: {response_data}")
return {
"success": True,
"message": "设备初始化成功",
"data": response_data
}
except json.JSONDecodeError:
# 如果响应不是JSON但状态码是200仍然视为成功
logger.info(f"设备初始化成功: {device_info['device_name']}, 但响应不是JSON格式: {response_text}")
return {
"success": True,
"message": "设备初始化成功响应不是JSON格式",
"data": {"response_text": response_text}
}
else:
try:
response_data = json.loads(response_text) if response_text else {}
error_msg = response_data.get("message", f"初始化失败,状态码: {response.status}")
except json.JSONDecodeError:
error_msg = f"初始化失败,状态码: {response.status}, 响应: {response_text}"
logger.warning(f"设备初始化失败: {device_info['device_name']}, {error_msg}")
return {
"success": False,
"message": error_msg,
"data": {"status_code": response.status, "response_text": response_text}
}
except asyncio.TimeoutError:
logger.error(f"调用初始化API超时: {url}")
return {
"success": False,
"message": "调用初始化API超时",
"data": {"url": url}
}
except aiohttp.ClientError as e:
logger.error(f"调用初始化API出错: {str(e)}")
return {
"success": False,
"message": f"调用初始化API出错: {str(e)}",
"data": {"url": url}
}
except Exception as e:
logger.error(f"设备初始化过程发生错误: {str(e)}")
return {
"success": False,
"message": f"设备初始化过程发生错误: {str(e)}"
}
@staticmethod
def _update_device_init_status(device_id: str, is_init: bool) -> Dict[str, Any]:
"""
更新设备初始化状态
Args:
device_id: 设备ID
is_init: 初始化状态
Returns:
Dict: 更新结果
"""
try:
with get_session() as session:
# 查询设备
device_query = select(VWEDCallDevice).where(
(VWEDCallDevice.id == device_id) &
(VWEDCallDevice.is_deleted == False)
)
device_result = session.execute(device_query)
device = device_result.scalars().first()
if not device:
return {
"success": False,
"message": f"未找到ID为 '{device_id}' 的呼叫器设备"
}
# 更新初始化状态
device.is_init = is_init
session.commit()
return {
"success": True,
"message": f"成功更新设备 '{device.device_name}' 的初始化状态为 {is_init}"
}
except Exception as e:
logger.error(f"更新设备初始化状态失败: {str(e)}")
return {
"success": False,
"message": f"更新设备初始化状态失败: {str(e)}"
}
@staticmethod
def synchronous_reset_button_state(device_info: Dict[str, Any], button_address: str) -> Dict[str, Any]:
"""
复位按钮状态到初始化状态(同步版本)
Args:
device_info: 设备信息
button_address: 按钮寄存器地址
Returns:
Dict: 复位结果
"""
try:
import requests
# 构建设备标识
serial_number = f"{device_info.get('ip')}-{device_info.get('port')}-{device_info.get('slave_id')}"
target_agv_id = serial_number
# 生成当前时间戳,格式为 ISO 8601
current_time = datetime.datetime.now().isoformat()
# 查找对应的按钮配置
button_config = None
print("device_info::::::::", device_info, "=====================================")
for button in device_info.get("buttons", []):
if button.get("register_address") == button_address:
button_config = button
break
if not button_config:
return {
"success": False,
"message": f"未找到寄存器地址为 {button_address} 的按钮配置"
}
# 构建registers参数将按钮状态设置为初始化状态
registers = [{
"fnCode": button_config.get("function_code"),
"name": button_config.get("signal_name"),
"regCount": button_config.get("signal_length"),
"regAddress": button_address,
"regValue": CallDeviceButtonStatus.INIT # 设置为初始化状态
}]
# 构造复位请求
reset_request = {
"targetAgvId": target_agv_id,
"instantActions": {
"version": "2.0.0",
"headerId": int(time.time()),
"timestamp": current_time,
"manufacturer": device_info.get("brand"),
"serialNumber": serial_number,
"instantActions": [
{
"actionType": "deviceWrite",
"actionId": f"reset-{button_config.get('id')}-{int(time.time())}",
"blockingType": "HARD",
"actionParameters": [
{
"key": "protocolType",
"value": device_info.get("protocol")
},
{
"key": "brandName",
"value": device_info.get("brand")
},
{
"key": "ip",
"value": device_info.get("ip")
},
{
"key": "port",
"value": str(device_info.get("port"))
},
{
"key": "deviceName",
"value": device_info.get("device_name")
},
{
"key": "slaveId",
"value": device_info.get("slave_id")
},
{
"key": "registers",
"value": json.dumps(registers)
}
],
"actionDescription": "action write device"
}
]
}
}
logger.info(f"开始复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}, 请求数据: {json.dumps(reset_request)}")
# 获取API URL
api_url = settings.CALLDEVICE_API_BASE_URL
# 调用复位API
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['set_device_state']}"
headers = {
"Content-Type": "application/json",
"User-Agent": "CallDeviceMonitor/1.0"
}
try:
response = requests.post(url, json=reset_request, headers=headers)
response_text = response.text
logger.info(f"复位按钮状态响应: {response_text}")
if response.status_code == 200:
try:
response_data = json.loads(response_text)
if response_data.get("success", False):
logger.info(f"成功复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}")
return {
"success": True,
"message": "按钮状态复位成功",
"data": response_data
}
else:
logger.warning(f"复位按钮状态失败: {response_data.get('message', '未知错误')}")
return {
"success": False,
"message": f"复位按钮状态失败: {response_data.get('message', '未知错误')}",
"data": response_data
}
except json.JSONDecodeError:
logger.error(f"解析复位响应失败响应内容不是有效的JSON: {response_text}")
return {
"success": False,
"message": "解析复位响应失败响应内容不是有效的JSON",
"data": {"response_text": response_text}
}
else:
logger.error(f"复位按钮状态请求失败,状态码: {response.status_code}, 响应: {response_text}")
return {
"success": False,
"message": f"复位按钮状态请求失败,状态码: {response.status_code}",
"data": {"response_text": response_text}
}
except requests.exceptions.Timeout:
logger.error(f"复位按钮状态超时: {url}")
return {
"success": False,
"message": "复位按钮状态超时",
"data": {"url": url}
}
except requests.exceptions.RequestException as e:
logger.error(f"复位按钮状态网络错误: {str(e)}")
return {
"success": False,
"message": f"复位按钮状态网络错误: {str(e)}",
"data": {"url": url}
}
except Exception as e:
logger.error(f"复位按钮状态错误: {str(e)}")
return {
"success": False,
"message": f"复位按钮状态错误: {str(e)}"
}
@staticmethod
async def _reset_button_state(device_info: Dict[str, Any], button_address: str) -> Dict[str, Any]:
"""
复位按钮状态到初始化状态
Args:
device_info: 设备信息
button_address: 按钮寄存器地址
Returns:
Dict: 复位结果
"""
try:
# 构建设备标识
serial_number = f"{device_info.get('ip')}-{device_info.get('port')}-{device_info.get('slave_id')}"
target_agv_id = serial_number
# 生成当前时间戳,格式为 ISO 8601
current_time = datetime.datetime.now().isoformat()
# 查找对应的按钮配置
button_config = None
for button in device_info.get("buttons", []):
if button.get("register_address") == button_address:
button_config = button
break
if not button_config:
return {
"success": False,
"message": f"未找到寄存器地址为 {button_address} 的按钮配置"
}
# 构建registers参数将按钮状态设置为初始化状态
registers = [{
"fnCode": button_config.get("function_code"),
"name": button_config.get("signal_name"),
"regCount": button_config.get("signal_length"),
"regAddress": button_config.get("register_address"),
"regValue": CallDeviceButtonStatus.INIT # 设置为初始化状态
}]
# 构造复位请求
reset_request = {
"targetAgvId": target_agv_id,
"instantActions": {
"version": "2.0.0",
"headerId": int(time.time()),
"timestamp": current_time,
"manufacturer": device_info.get("brand"),
"serialNumber": serial_number,
"instantActions": [
{
"actionType": "deviceWrite",
"actionId": f"reset-{button_config.get('id')}-{int(time.time())}",
"blockingType": "HARD",
"actionParameters": [
{
"key": "protocolType",
"value": device_info.get("protocol")
},
{
"key": "brandName",
"value": device_info.get("brand")
},
{
"key": "ip",
"value": device_info.get("ip")
},
{
"key": "port",
"value": str(device_info.get("port"))
},
{
"key": "deviceName",
"value": device_info.get("device_name")
},
{
"key": "slaveId",
"value": device_info.get("slave_id")
},
{
"key": "registers",
"value": json.dumps(registers)
}
],
"actionDescription": "action write device"
}
]
}
}
logger.info(f"开始复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}, 请求数据: {json.dumps(reset_request)}")
# 获取API URL
api_url = settings.CALLDEVICE_API_INIT_BASE_URL
# 调用复位API
async with aiohttp.ClientSession() as session:
url = f"{api_url}{settings.CALLDEVICE_API_ENDPOINTS['set_device_state']}"
headers = {
"Content-Type": "application/json",
"User-Agent": "CallDeviceMonitor/1.0"
}
try:
async with session.post(url, json=reset_request, headers=headers) as response:
response_text = await response.text()
logger.info(f"复位按钮状态响应: {response_text}")
if response.status == 200:
try:
response_data = json.loads(response_text)
if response_data.get("success", False):
logger.info(f"成功复位按钮状态: 设备={device_info['device_name']}, 按钮地址={button_address}")
while True:
button_state = await CallDeviceService._read_button_state(device_info)
for button_info in button_state:
if button_info.get("register_address") == button_address:
register_values = button_info.get("register_values")
if register_values and register_values[0] == CallDeviceButtonStatus.INIT:
break
if not button_state:
break
await asyncio.sleep(1)
# await asyncio.sleep(1)
# button_state = [button.get("register_values") await CallDeviceService._read_button_state(device_info) if button.get("register_address") == button_address]
print("button_state::::::::", button_state, "=====================================")
return {
"success": True,
"message": "按钮状态复位成功",
"data": response_data
}
else:
logger.warning(f"复位按钮状态失败: {response_data.get('message', '未知错误')}")
return {
"success": False,
"message": f"复位按钮状态失败: {response_data.get('message', '未知错误')}",
"data": response_data
}
except json.JSONDecodeError:
logger.error(f"解析复位响应失败响应内容不是有效的JSON: {response_text}")
return {
"success": False,
"message": "解析复位响应失败响应内容不是有效的JSON",
"data": {"response_text": response_text}
}
else:
logger.error(f"复位按钮状态请求失败,状态码: {response.status}, 响应: {response_text}")
return {
"success": False,
"message": f"复位按钮状态请求失败,状态码: {response.status}",
"data": {"response_text": response_text}
}
except asyncio.TimeoutError:
logger.error(f"复位按钮状态超时: {url}")
return {
"success": False,
"message": "复位按钮状态超时",
"data": {"url": url}
}
except aiohttp.ClientError as e:
logger.error(f"复位按钮状态网络错误: {str(e)}")
return {
"success": False,
"message": f"复位按钮状态网络错误: {str(e)}",
"data": {"url": url}
}
except Exception as e:
logger.error(f"复位按钮状态错误: {str(e)}")
return {
"success": False,
"message": f"复位按钮状态错误: {str(e)}"
}
@staticmethod
async def _monitor_task_and_reset_button(task_record_id: str, device_info: Dict[str, Any], button_address: str, device_id: str = None, button_lock_key: str = None) -> None:
"""
监控任务状态并在任务完成后复位按钮状态
Args:
task_record_id: 任务记录ID
device_info: 设备信息
button_address: 按钮寄存器地址
device_id: 设备ID
button_lock_key: 按钮锁定键格式button_address_task_id
"""
if not task_record_id:
logger.warning(f"任务记录ID为空无法监控任务状态")
# 释放按钮锁定
if device_id and button_lock_key:
with thread_lock:
if device_id in device_button_locks:
device_button_locks[device_id][button_lock_key] = False
return
logger.info(f"开始监控任务状态: 任务记录ID={task_record_id}, 设备={device_info['device_name']}, 按钮地址={button_address}")
check_interval = 2 # 每2秒检查一次
elapsed_time = 0
# try:
while True:
try:
# 使用同步会话避免事件循环冲突
with get_session() as session:
task_record = session.query(VWEDTaskRecord).filter(
VWEDTaskRecord.id == task_record_id
).first()
if not task_record:
logger.warning(f"未找到任务记录: 任务记录ID={task_record_id}")
return
print("task_record.status::::::::", task_record.status, "=====================================")
task_status = task_record.status
if task_status in [1000, 2000, 2001]:
status_name = {1000: "完成", 2000: "失败", 2001: "取消"}.get(task_status, "未知")
logger.info(f"任务已{status_name}: 任务记录ID={task_record_id}, 开始复位按钮状态")
# 复位按钮状态
reset_result = await CallDeviceService._reset_button_state(device_info, button_address)
if reset_result.get("success", False):
logger.info(f"按钮状态复位成功: 设备={device_info['device_name']}, 按钮地址={button_address}")
# 无论任务是否完成,都要释放按钮锁定
if device_id and button_lock_key:
with thread_lock:
if device_id in device_button_locks:
time.sleep(2)
device_button_locks[device_id][button_lock_key] = False
logger.info(f"已释放按钮锁定: 设备ID={device_id}, 按钮锁定键={button_lock_key}")
else:
logger.error(f"按钮状态复位失败: {reset_result.get('message', '未知错误')}")
return # 任务已完成,退出监控
else:
logger.info(f"任务未完成: 任务记录ID={task_record_id}, 状态={task_status}")
except Exception as e:
logger.error(f"查询任务状态异常: {str(e)}")
# 等待一段时间后再次检查
await asyncio.sleep(check_interval)
elapsed_time += check_interval
# finally: