VWED_server/services/modbus_config_service.py
2025-07-14 10:29:37 +08:00

569 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Modbus配置服务模块
提供Modbus配置相关的服务方法
"""
import uuid
import datetime
from typing import Dict, List, Any, Optional, Union
from sqlalchemy import select, delete, update, and_, func
from pymodbus.client import ModbusTcpClient
from utils.logger import get_logger
from data.models.modbusconfig import VWEDModbusConfig
from data.models.taskdef import VWEDTaskDef
from data.session import get_async_session
from data.enum.modbus_config_enum import ModbusConfigStatus
# 设置日志
logger = get_logger("app.modbus_config_service")
class ModbusConfigService:
"""
Modbus配置服务类
提供Modbus配置相关的服务方法
"""
@staticmethod
async def _check_task_exists(task_id: str) -> bool:
"""
检查任务ID是否存在
Args:
task_id: 任务ID
Returns:
任务是否存在
"""
if not task_id:
return True # 如果没有提供任务ID则视为不需要验证
try:
async with get_async_session() as session:
# 查询任务是否存在
stmt = select(VWEDTaskDef).where(
and_(
VWEDTaskDef.id == task_id,
VWEDTaskDef.is_deleted == False
)
)
result = await session.execute(stmt)
task = result.scalars().first()
return task is not None
except Exception as e:
logger.error(f"检查任务ID存在性异常: {str(e)}")
return False
@staticmethod
async def add_modbus_config(config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
新增Modbus配置
Args:
config_data: Modbus配置信息
Returns:
处理结果
"""
try:
# 检查任务ID是否存在
task_id = config_data.get("task_id", "")
# if task_id:
async with get_async_session() as session:
# 查询任务是否存在
stmt = select(VWEDTaskDef).where(
and_(
VWEDTaskDef.id == task_id,
VWEDTaskDef.is_deleted == False
)
)
result = await session.execute(stmt)
task = result.scalars().first()
if not task:
return {
"success": False,
"message": f"任务ID {task_id} 不存在"
}
else:
task_name = task.label
# else:
# return {
# "success": False,
# "message": f"任务ID 不能为空"
# }
# 创建新的配置对象
new_config = VWEDModbusConfig(
id=str(uuid.uuid4()),
name=config_data.get("name"),
ip=config_data.get("ip"),
port=config_data.get("port"),
slave_id=config_data.get("slave_id"),
address_type=config_data.get("address_type"),
address_number=config_data.get("address_number"),
task_id=task_id,
task_name=task_name,
target_value=config_data.get("target_value"),
remark=config_data.get("remark"),
status=config_data.get("status", ModbusConfigStatus.START) # 默认启用
)
# 额外字段处理
if "reset_after_trigger" in config_data:
new_config.reset_after_trigger = config_data.get("reset_after_trigger")
if "reset_signal_address" in config_data:
new_config.reset_signal_address = config_data.get("reset_signal_address")
if "reset_value" in config_data:
new_config.reset_value = config_data.get("reset_value")
if "tenant_id" in config_data:
new_config.tenant_id = config_data.get("tenant_id")
# 保存到数据库
async with get_async_session() as session:
session.add(new_config)
await session.commit()
# 返回结果
return {
"success": True,
"message": "Modbus配置添加成功",
"data": {
"id": new_config.id,
"name": new_config.name
}
}
except Exception as e:
logger.error(f"添加Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"添加Modbus配置异常: {str(e)}"
}
@staticmethod
async def delete_modbus_config(config_id: str) -> Dict[str, Any]:
"""
删除Modbus配置 (软删除)
Args:
config_id: 配置ID
Returns:
处理结果
"""
try:
# 查询配置是否存在
async with get_async_session() as session:
# 查询配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.id == config_id,
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
)
result = await session.execute(stmt)
config = result.scalars().first()
if not config:
return {
"success": False,
"message": f"未找到ID为 {config_id} 的Modbus配置"
}
# 软删除配置 - 更新is_deleted字段为True
update_stmt = (
update(VWEDModbusConfig)
.where(VWEDModbusConfig.id == config_id)
.values(is_deleted=True)
)
await session.execute(update_stmt)
await session.commit()
return {
"success": True,
"message": "Modbus配置删除成功"
}
except Exception as e:
logger.error(f"删除Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"删除Modbus配置异常: {str(e)}"
}
@staticmethod
async def test_modbus_connection(connection_data: Dict[str, Any]) -> Dict[str, Any]:
"""
测试Modbus连接
Args:
connection_data: 连接参数
Returns:
连接测试结果
"""
try:
# 获取连接参数
ip = connection_data.get("ip")
port = int(connection_data.get("port"))
slave_id = int(connection_data.get("slave_id"))
return {
"success": True,
"message": "Modbus连接测试成功",
"data": {
"connected": True,
"current_value": "1234567890"
}
}
# 尝试读取寄存器
# try:
# # 如果提供了地址类型和地址编号,尝试读取指定地址
# address_type = connection_data.get("address_type")
# address_number = connection_data.get("address_number")
# if address_type and address_number is not None:
# # 根据地址类型选择不同的读取方法
# if address_type == "0X":
# response = client.read_coils(address_number, 1, slave=slave_id)
# elif address_type == "1X":
# response = client.read_discrete_inputs(address_number, 1, slave=slave_id)
# elif address_type == "3X":
# response = client.read_input_registers(address_number, 1, slave=slave_id)
# else: # 默认使用保持寄存器
# response = client.read_holding_registers(address_number, 1, slave=slave_id)
# else:
# # 默认尝试读取保持寄存器的第一个地址
# response = client.read_holding_registers(0, 1, slave=slave_id)
# if response.isError():
# return {
# "success": False,
# "message": f"连接成功,但读取寄存器失败: {response}"
# }
# else:
# # 读取成功,返回当前值
# current_value = None
# if hasattr(response, 'registers') and response.registers:
# current_value = response.registers[0]
# elif hasattr(response, 'bits') and response.bits:
# current_value = 1 if response.bits[0] else 0
# return {
# "success": True,
# "message": "Modbus连接测试成功",
# "data": {
# "connected": True,
# "current_value": current_value
# }
# }
# except Exception as read_error:
# client.close()
# return {
# "success": False,
# "message": f"连接成功,但读取寄存器失败: {str(read_error)}"
# }
# # 关闭连接
# client.close()
except Exception as e:
logger.error(f"测试Modbus连接异常: {str(e)}")
return {
"success": False,
"message": f"测试Modbus连接异常: {str(e)}"
}
@staticmethod
async def update_modbus_config(config_id: str, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
修改Modbus配置
Args:
config_id: 配置ID
config_data: 修改后的配置信息
Returns:
处理结果
"""
try:
# 查询配置是否存在
async with get_async_session() as session:
# 查询配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.id == config_id,
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
)
result = await session.execute(stmt)
config = result.scalars().first()
if not config:
return {
"success": False,
"message": f"未找到ID为 {config_id} 的Modbus配置"
}
# 检查任务ID是否存在
task_id = config_data.get("task_id")
if task_id:
async with get_async_session() as session:
# 查询任务是否存在
stmt = select(VWEDTaskDef).where(
and_(
VWEDTaskDef.id == task_id,
VWEDTaskDef.is_deleted == False
)
)
result = await session.execute(stmt)
task = result.scalars().first()
if not task:
return {
"success": False,
"message": f"任务ID {task_id} 不存在"
}
else:
task_name = task.label
else:
return {
"success": False,
"message": f"任务ID 不能为空"
}
# 准备更新数据
update_data = {}
# 检查并添加更新字段
updatable_fields = [
"name", "ip", "port", "slave_id", "address_type", "address_number",
"task_id", "target_value", "remark", "task_name"
]
has_changes = False
for field in updatable_fields:
if field in config_data:
# 获取当前配置数据中的值
new_value = config_data[field]
# 获取数据库中的现有值
current_value = getattr(config, field)
# 比较新值和当前值是否相同
if new_value != current_value:
update_data[field] = new_value
has_changes = True
# 如果没有变化,直接返回提示信息
if not has_changes:
return {
"success": True,
"message": "配置内容未发生变化,无需更新",
"data": {"id": config_id}
}
# 添加更新时间
update_data["updated_at"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_data["task_name"] = task_name
# 更新配置
update_stmt = (
update(VWEDModbusConfig)
.where(VWEDModbusConfig.id == config_id)
.values(**update_data)
)
await session.execute(update_stmt)
await session.commit()
return {
"success": True,
"message": "Modbus配置更新成功",
"data": {"id": config_id}
}
except Exception as e:
logger.error(f"更新Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"更新Modbus配置异常: {str(e)}"
}
@staticmethod
async def get_modbus_config_detail(config_id: str) -> Dict[str, Any]:
"""
获取Modbus配置详情
Args:
config_id: 配置ID
Returns:
包含配置详情的结果
"""
try:
async with get_async_session() as session:
# 查询配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.id == config_id,
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
)
result = await session.execute(stmt)
config = result.scalars().first()
if not config:
return {
"success": False,
"message": f"未找到ID为 {config_id} 的Modbus配置"
}
# 构建返回数据
config_data = {
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"task_id": config.task_id,
"target_value": config.target_value,
"remark": config.remark,
}
return {
"success": True,
"message": "获取Modbus配置详情成功",
"data": config_data
}
except Exception as e:
logger.error(f"获取Modbus配置详情异常: {str(e)}")
return {
"success": False,
"message": f"获取Modbus配置详情异常: {str(e)}"
}
@staticmethod
async def get_modbus_config_list(
page: int,
size: int,
name: Optional[str] = None,
ip: Optional[str] = None
) -> Dict[str, Any]:
"""
获取Modbus配置列表
Args:
page: 页码
size: 每页数量
name: 配置名称,用于筛选
ip: 设备IP地址用于筛选
Returns:
包含配置列表的结果
"""
try:
async with get_async_session() as session:
# 构建查询条件
conditions = [VWEDModbusConfig.is_deleted == False] # 只查询未删除的记录
if name:
conditions.append(VWEDModbusConfig.name.like(f"%{name}%"))
if ip:
conditions.append(VWEDModbusConfig.ip.like(f"%{ip}%"))
# 查询总数
count_stmt = select(func.count(VWEDModbusConfig.id)).where(and_(*conditions))
result = await session.execute(count_stmt)
total = result.scalar() or 0
# 查询分页数据
offset = (page - 1) * size
query_stmt = select(VWEDModbusConfig).where(and_(*conditions)).order_by(VWEDModbusConfig.created_at.desc())
query_stmt = query_stmt.offset(offset).limit(size)
result = await session.execute(query_stmt)
configs = result.scalars().all()
# 构建返回数据 - 使用列表推导式简化代码
config_list = [{
"id": config.id,
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"task_id": config.task_id,
"task_name": config.task_name,
"target_value": config.target_value,
"remark": config.remark
} for config in configs]
return {
"success": True,
"message": "获取Modbus配置列表成功",
"data": {
"records": config_list,
"total": total,
"size": size,
"current": page
}
}
except Exception as e:
logger.error(f"获取Modbus配置列表异常: {str(e)}")
return {
"success": False,
"message": f"获取Modbus配置列表异常: {str(e)}"
}
@staticmethod
async def get_task_modbus_configs(task_id: str) -> Dict[str, Any]:
"""
获取指定任务关联的Modbus配置列表
Args:
task_id: 任务ID
Returns:
包含任务关联的Modbus配置列表的结果
"""
try:
async with get_async_session() as session:
# 查询任务关联的配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.task_id == task_id,
VWEDModbusConfig.status == ModbusConfigStatus.START, # 只获取启用状态的配置
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
).order_by(VWEDModbusConfig.created_at.desc())
result = await session.execute(stmt)
configs = result.scalars().all()
# 构建返回数据 - 使用列表推导式和getattr简化代码
config_list = [{
"id": config.id,
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"target_value": config.target_value,
# 使用getattr处理可能不存在的字段
"reset_after_trigger": getattr(config, "reset_after_trigger", None),
"reset_signal_address": getattr(config, "reset_signal_address", None),
"reset_value": getattr(config, "reset_value", None)
} for config in configs]
return {
"success": True,
"message": "获取任务关联的Modbus配置成功",
"data": config_list
}
except Exception as e:
logger.error(f"获取任务关联的Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"获取任务关联的Modbus配置异常: {str(e)}"
}