VWED_server/services/modbus_config_service.py

569 lines
21 KiB
Python
Raw Permalink Normal View History

2025-07-14 10:29:37 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Modbus配置服务模块
提供Modbus配置相关的服务方法
"""
import uuid
import datetime
from typing import Dict, List, Any, Optional, Union
from sqlalchemy import select, delete, update, and_, func
from pymodbus.client import ModbusTcpClient
from utils.logger import get_logger
from data.models.modbusconfig import VWEDModbusConfig
from data.models.taskdef import VWEDTaskDef
from data.session import get_async_session
from data.enum.modbus_config_enum import ModbusConfigStatus
# 设置日志
logger = get_logger("app.modbus_config_service")
class ModbusConfigService:
"""
Modbus配置服务类
提供Modbus配置相关的服务方法
"""
@staticmethod
async def _check_task_exists(task_id: str) -> bool:
"""
检查任务ID是否存在
Args:
task_id: 任务ID
Returns:
任务是否存在
"""
if not task_id:
return True # 如果没有提供任务ID则视为不需要验证
try:
async with get_async_session() as session:
# 查询任务是否存在
stmt = select(VWEDTaskDef).where(
and_(
VWEDTaskDef.id == task_id,
VWEDTaskDef.is_deleted == False
)
)
result = await session.execute(stmt)
task = result.scalars().first()
return task is not None
except Exception as e:
logger.error(f"检查任务ID存在性异常: {str(e)}")
return False
@staticmethod
async def add_modbus_config(config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
新增Modbus配置
Args:
config_data: Modbus配置信息
Returns:
处理结果
"""
try:
# 检查任务ID是否存在
task_id = config_data.get("task_id", "")
# if task_id:
async with get_async_session() as session:
# 查询任务是否存在
stmt = select(VWEDTaskDef).where(
and_(
VWEDTaskDef.id == task_id,
VWEDTaskDef.is_deleted == False
)
)
result = await session.execute(stmt)
task = result.scalars().first()
if not task:
return {
"success": False,
"message": f"任务ID {task_id} 不存在"
}
else:
task_name = task.label
# else:
# return {
# "success": False,
# "message": f"任务ID 不能为空"
# }
# 创建新的配置对象
new_config = VWEDModbusConfig(
id=str(uuid.uuid4()),
name=config_data.get("name"),
ip=config_data.get("ip"),
port=config_data.get("port"),
slave_id=config_data.get("slave_id"),
address_type=config_data.get("address_type"),
address_number=config_data.get("address_number"),
task_id=task_id,
task_name=task_name,
target_value=config_data.get("target_value"),
remark=config_data.get("remark"),
status=config_data.get("status", ModbusConfigStatus.START) # 默认启用
)
# 额外字段处理
if "reset_after_trigger" in config_data:
new_config.reset_after_trigger = config_data.get("reset_after_trigger")
if "reset_signal_address" in config_data:
new_config.reset_signal_address = config_data.get("reset_signal_address")
if "reset_value" in config_data:
new_config.reset_value = config_data.get("reset_value")
if "tenant_id" in config_data:
new_config.tenant_id = config_data.get("tenant_id")
# 保存到数据库
async with get_async_session() as session:
session.add(new_config)
await session.commit()
# 返回结果
return {
"success": True,
"message": "Modbus配置添加成功",
"data": {
"id": new_config.id,
"name": new_config.name
}
}
except Exception as e:
logger.error(f"添加Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"添加Modbus配置异常: {str(e)}"
}
@staticmethod
async def delete_modbus_config(config_id: str) -> Dict[str, Any]:
"""
删除Modbus配置 (软删除)
Args:
config_id: 配置ID
Returns:
处理结果
"""
try:
# 查询配置是否存在
async with get_async_session() as session:
# 查询配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.id == config_id,
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
)
result = await session.execute(stmt)
config = result.scalars().first()
if not config:
return {
"success": False,
"message": f"未找到ID为 {config_id} 的Modbus配置"
}
# 软删除配置 - 更新is_deleted字段为True
update_stmt = (
update(VWEDModbusConfig)
.where(VWEDModbusConfig.id == config_id)
.values(is_deleted=True)
)
await session.execute(update_stmt)
await session.commit()
return {
"success": True,
"message": "Modbus配置删除成功"
}
except Exception as e:
logger.error(f"删除Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"删除Modbus配置异常: {str(e)}"
}
@staticmethod
async def test_modbus_connection(connection_data: Dict[str, Any]) -> Dict[str, Any]:
"""
测试Modbus连接
Args:
connection_data: 连接参数
Returns:
连接测试结果
"""
try:
# 获取连接参数
ip = connection_data.get("ip")
port = int(connection_data.get("port"))
slave_id = int(connection_data.get("slave_id"))
return {
"success": True,
"message": "Modbus连接测试成功",
"data": {
"connected": True,
"current_value": "1234567890"
}
}
# 尝试读取寄存器
# try:
# # 如果提供了地址类型和地址编号,尝试读取指定地址
# address_type = connection_data.get("address_type")
# address_number = connection_data.get("address_number")
# if address_type and address_number is not None:
# # 根据地址类型选择不同的读取方法
# if address_type == "0X":
# response = client.read_coils(address_number, 1, slave=slave_id)
# elif address_type == "1X":
# response = client.read_discrete_inputs(address_number, 1, slave=slave_id)
# elif address_type == "3X":
# response = client.read_input_registers(address_number, 1, slave=slave_id)
# else: # 默认使用保持寄存器
# response = client.read_holding_registers(address_number, 1, slave=slave_id)
# else:
# # 默认尝试读取保持寄存器的第一个地址
# response = client.read_holding_registers(0, 1, slave=slave_id)
# if response.isError():
# return {
# "success": False,
# "message": f"连接成功,但读取寄存器失败: {response}"
# }
# else:
# # 读取成功,返回当前值
# current_value = None
# if hasattr(response, 'registers') and response.registers:
# current_value = response.registers[0]
# elif hasattr(response, 'bits') and response.bits:
# current_value = 1 if response.bits[0] else 0
# return {
# "success": True,
# "message": "Modbus连接测试成功",
# "data": {
# "connected": True,
# "current_value": current_value
# }
# }
# except Exception as read_error:
# client.close()
# return {
# "success": False,
# "message": f"连接成功,但读取寄存器失败: {str(read_error)}"
# }
# # 关闭连接
# client.close()
except Exception as e:
logger.error(f"测试Modbus连接异常: {str(e)}")
return {
"success": False,
"message": f"测试Modbus连接异常: {str(e)}"
}
@staticmethod
async def update_modbus_config(config_id: str, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
修改Modbus配置
Args:
config_id: 配置ID
config_data: 修改后的配置信息
Returns:
处理结果
"""
try:
# 查询配置是否存在
async with get_async_session() as session:
# 查询配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.id == config_id,
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
)
result = await session.execute(stmt)
config = result.scalars().first()
if not config:
return {
"success": False,
"message": f"未找到ID为 {config_id} 的Modbus配置"
}
# 检查任务ID是否存在
task_id = config_data.get("task_id")
if task_id:
async with get_async_session() as session:
# 查询任务是否存在
stmt = select(VWEDTaskDef).where(
and_(
VWEDTaskDef.id == task_id,
VWEDTaskDef.is_deleted == False
)
)
result = await session.execute(stmt)
task = result.scalars().first()
if not task:
return {
"success": False,
"message": f"任务ID {task_id} 不存在"
}
else:
task_name = task.label
else:
return {
"success": False,
"message": f"任务ID 不能为空"
}
# 准备更新数据
update_data = {}
# 检查并添加更新字段
updatable_fields = [
"name", "ip", "port", "slave_id", "address_type", "address_number",
"task_id", "target_value", "remark", "task_name"
]
has_changes = False
for field in updatable_fields:
if field in config_data:
# 获取当前配置数据中的值
new_value = config_data[field]
# 获取数据库中的现有值
current_value = getattr(config, field)
# 比较新值和当前值是否相同
if new_value != current_value:
update_data[field] = new_value
has_changes = True
# 如果没有变化,直接返回提示信息
if not has_changes:
return {
"success": True,
"message": "配置内容未发生变化,无需更新",
"data": {"id": config_id}
}
# 添加更新时间
update_data["updated_at"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_data["task_name"] = task_name
# 更新配置
update_stmt = (
update(VWEDModbusConfig)
.where(VWEDModbusConfig.id == config_id)
.values(**update_data)
)
await session.execute(update_stmt)
await session.commit()
return {
"success": True,
"message": "Modbus配置更新成功",
"data": {"id": config_id}
}
except Exception as e:
logger.error(f"更新Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"更新Modbus配置异常: {str(e)}"
}
@staticmethod
async def get_modbus_config_detail(config_id: str) -> Dict[str, Any]:
"""
获取Modbus配置详情
Args:
config_id: 配置ID
Returns:
包含配置详情的结果
"""
try:
async with get_async_session() as session:
# 查询配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.id == config_id,
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
)
result = await session.execute(stmt)
config = result.scalars().first()
if not config:
return {
"success": False,
"message": f"未找到ID为 {config_id} 的Modbus配置"
}
# 构建返回数据
config_data = {
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"task_id": config.task_id,
"target_value": config.target_value,
"remark": config.remark,
}
return {
"success": True,
"message": "获取Modbus配置详情成功",
"data": config_data
}
except Exception as e:
logger.error(f"获取Modbus配置详情异常: {str(e)}")
return {
"success": False,
"message": f"获取Modbus配置详情异常: {str(e)}"
}
@staticmethod
async def get_modbus_config_list(
page: int,
size: int,
name: Optional[str] = None,
ip: Optional[str] = None
) -> Dict[str, Any]:
"""
获取Modbus配置列表
Args:
page: 页码
size: 每页数量
name: 配置名称用于筛选
ip: 设备IP地址用于筛选
Returns:
包含配置列表的结果
"""
try:
async with get_async_session() as session:
# 构建查询条件
conditions = [VWEDModbusConfig.is_deleted == False] # 只查询未删除的记录
if name:
conditions.append(VWEDModbusConfig.name.like(f"%{name}%"))
if ip:
conditions.append(VWEDModbusConfig.ip.like(f"%{ip}%"))
# 查询总数
count_stmt = select(func.count(VWEDModbusConfig.id)).where(and_(*conditions))
result = await session.execute(count_stmt)
total = result.scalar() or 0
# 查询分页数据
offset = (page - 1) * size
query_stmt = select(VWEDModbusConfig).where(and_(*conditions)).order_by(VWEDModbusConfig.created_at.desc())
query_stmt = query_stmt.offset(offset).limit(size)
result = await session.execute(query_stmt)
configs = result.scalars().all()
# 构建返回数据 - 使用列表推导式简化代码
config_list = [{
"id": config.id,
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"task_id": config.task_id,
"task_name": config.task_name,
"target_value": config.target_value,
"remark": config.remark
} for config in configs]
return {
"success": True,
"message": "获取Modbus配置列表成功",
"data": {
"records": config_list,
"total": total,
"size": size,
"current": page
}
}
except Exception as e:
logger.error(f"获取Modbus配置列表异常: {str(e)}")
return {
"success": False,
"message": f"获取Modbus配置列表异常: {str(e)}"
}
@staticmethod
async def get_task_modbus_configs(task_id: str) -> Dict[str, Any]:
"""
获取指定任务关联的Modbus配置列表
Args:
task_id: 任务ID
Returns:
包含任务关联的Modbus配置列表的结果
"""
try:
async with get_async_session() as session:
# 查询任务关联的配置
stmt = select(VWEDModbusConfig).where(
and_(
VWEDModbusConfig.task_id == task_id,
VWEDModbusConfig.status == ModbusConfigStatus.START, # 只获取启用状态的配置
VWEDModbusConfig.is_deleted == False # 只查询未删除的记录
)
).order_by(VWEDModbusConfig.created_at.desc())
result = await session.execute(stmt)
configs = result.scalars().all()
# 构建返回数据 - 使用列表推导式和getattr简化代码
config_list = [{
"id": config.id,
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"target_value": config.target_value,
# 使用getattr处理可能不存在的字段
"reset_after_trigger": getattr(config, "reset_after_trigger", None),
"reset_signal_address": getattr(config, "reset_signal_address", None),
"reset_value": getattr(config, "reset_value", None)
} for config in configs]
return {
"success": True,
"message": "获取任务关联的Modbus配置成功",
"data": config_list
}
except Exception as e:
logger.error(f"获取任务关联的Modbus配置异常: {str(e)}")
return {
"success": False,
"message": f"获取任务关联的Modbus配置异常: {str(e)}"
}