VWED_server/services/operate_point_service.py
2025-07-17 15:47:56 +08:00

1434 lines
56 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
动作点管理服务
处理动作点和库位的管理操作
"""
import math
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func
from data.models import OperatePoint, OperatePointLayer, StorageArea, StorageAreaType, ExtendedProperty, ExtendedPropertyTypeEnum, StorageLocationLog
from routes.model.operate_point_model import (
OperatePointLayerInfo,
StorageLocationListRequest, StorageLocationListResponse,
StorageLocationInfo, StorageLocationStatistics,
StorageLocationStatusUpdateRequest, StorageLocationStatusUpdateResponse,
BatchStorageLocationStatusUpdateRequest, BatchStorageLocationStatusUpdateResponse,
StorageLocationActionEnum,
ExtendedPropertyCreateRequest, ExtendedPropertyCreateResponse,
ExtendedPropertyListRequest, ExtendedPropertyListResponse,
ExtendedPropertyInfo, ExtendedPropertyDeleteResponse,
StorageLocationDetailResponse, StorageLocationEditRequest, StorageLocationEditResponse,
StorageLocationLogInfo, StorageLocationLogListRequest, StorageLocationLogListResponse,
)
from utils.logger import get_logger
import datetime
logger = get_logger("services.operate_point_service")
class OperatePointService:
"""库位管理服务"""
@staticmethod
def get_storage_location_list(db: Session, request: StorageLocationListRequest) -> StorageLocationListResponse:
"""
获取库位列表
Args:
db: 数据库会话
request: 库位列表查询请求
Returns:
StorageLocationListResponse: 库位列表响应
"""
try:
# 构建查询条件 - 主要查询 OperatePointLayer 表
query = db.query(OperatePointLayer).filter(OperatePointLayer.is_deleted == False)
# 通过关联的 OperatePoint 进行筛选
if request.scene_id:
query = query.join(OperatePoint).filter(OperatePoint.scene_id == request.scene_id)
if request.storage_area_id:
query = query.join(OperatePoint).filter(OperatePoint.storage_area_id == request.storage_area_id)
# 站点名称模糊搜索
if request.station_name:
query = query.filter(OperatePointLayer.station_name.like(f"%{request.station_name}%"))
# 库位名称模糊搜索
if request.layer_name:
query = query.filter(OperatePointLayer.layer_name.like(f"%{request.layer_name}%"))
# 是否禁用过滤
if request.is_disabled is not None:
query = query.filter(OperatePointLayer.is_disabled == request.is_disabled)
# 是否占用过滤
if request.is_occupied is not None:
query = query.filter(OperatePointLayer.is_occupied == request.is_occupied)
# 是否锁定过滤
if request.is_locked is not None:
query = query.filter(OperatePointLayer.is_locked == request.is_locked)
# 是否空托盘过滤
if request.is_empty_tray is not None:
query = query.filter(OperatePointLayer.is_empty_tray == request.is_empty_tray)
# 获取总数
total = query.count()
# 分页
offset = (request.page - 1) * request.page_size
storage_locations = query.offset(offset).limit(request.page_size).all()
# 转换为响应对象
storage_location_infos = []
for sl in storage_locations:
storage_location_info = OperatePointService._convert_to_storage_location_info(
sl,
request.include_operate_point_info,
request.include_extended_fields
)
storage_location_infos.append(storage_location_info)
# 计算总页数
total_pages = math.ceil(total / request.page_size) if total > 0 else 0
# 获取统计信息
# statistics = OperatePointService._get_storage_location_statistics(db, request)
return StorageLocationListResponse(
total=total,
page=request.page,
page_size=request.page_size,
total_pages=total_pages,
storage_locations=storage_location_infos,
# statistics=statistics
)
except Exception as e:
logger.error(f"获取库位列表失败: {str(e)}")
raise
@staticmethod
def _convert_to_storage_location_info(layer: OperatePointLayer, include_operate_point_info: bool = True, include_extended_fields: bool = False) -> StorageLocationInfo:
"""
将数据库模型转换为库位信息响应对象
Args:
layer: 动作点分层数据库模型
include_operate_point_info: 是否包含动作点信息
Returns:
StorageLocationInfo: 库位信息
"""
# 构建库位信息
storage_location_info = StorageLocationInfo(
id=layer.id,
layer_index=layer.layer_index,
layer_name=layer.layer_name,
is_occupied=layer.is_occupied,
is_locked=layer.is_locked,
is_disabled=layer.is_disabled,
is_empty_tray=layer.is_empty_tray,
locked_by=layer.locked_by,
goods_content=layer.goods_content,
goods_weight=layer.goods_weight,
goods_volume=layer.goods_volume,
goods_stored_at=layer.goods_stored_at,
goods_retrieved_at=layer.goods_retrieved_at,
last_access_at=layer.last_access_at,
max_weight=layer.max_weight,
max_volume=layer.max_volume,
layer_height=layer.layer_height,
tags=layer.tags,
description=layer.description,
created_at=layer.created_at,
updated_at=layer.updated_at
)
# 如果需要包含动作点信息,则直接平铺到库位信息中
if include_operate_point_info and layer.operate_point:
storage_location_info.operate_point_id = layer.operate_point.id
storage_location_info.station_name = layer.operate_point.station_name
storage_location_info.scene_id = layer.operate_point.scene_id
storage_location_info.storage_area_id = layer.operate_point.storage_area_id
storage_location_info.storage_area_type = layer.operate_point.storage_area_type.value if layer.operate_point.storage_area_type else None
storage_location_info.area_name = layer.operate_point.area_name
storage_location_info.max_layers = layer.operate_point.max_layers
storage_location_info.current_layers = layer.operate_point.current_layers
storage_location_info.position_x = layer.operate_point.position_x
storage_location_info.position_y = layer.operate_point.position_y
storage_location_info.position_z = layer.operate_point.position_z
storage_location_info.operate_point_description = layer.operate_point.description
# 如果需要包含扩展字段,则从 config_json 中解析
if include_extended_fields and layer.config_json:
extended_fields = OperatePointService._parse_extended_fields(layer.config_json)
# 将扩展字段转换为简单的键值对格式,便于前端处理
storage_location_info.extended_fields = {
field_name: field_data["value"]
for field_name, field_data in extended_fields.items()
}
elif include_extended_fields:
storage_location_info.extended_fields = {}
return storage_location_info
@staticmethod
def update_storage_location_status(db: Session, request: StorageLocationStatusUpdateRequest) -> StorageLocationStatusUpdateResponse:
"""
更新库位状态
Args:
db: 数据库会话
request: 库位状态更新请求
Returns:
StorageLocationStatusUpdateResponse: 更新响应
"""
try:
# 查询库位使用layer_name进行查询
storage_location = db.query(OperatePointLayer).filter(
OperatePointLayer.layer_name == request.layer_name,
OperatePointLayer.is_deleted == False
).first()
if not storage_location:
return StorageLocationStatusUpdateResponse(
layer_name=request.layer_name,
action=request.action,
success=False,
message="库位不存在",
new_status={}
)
# 执行状态更新
success, message = OperatePointService._execute_status_update(storage_location, request.action, request.locked_by)
if success:
db.commit()
# 记录操作日志
try:
operator = OperatePointService._get_operator(request.locked_by)
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type=request.action.value,
affected_storage_locations=[request.layer_name],
description=request.reason
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
# 获取更新后的状态
new_status = OperatePointService._get_storage_location_status(storage_location)
return StorageLocationStatusUpdateResponse(
layer_name=request.layer_name,
action=request.action,
success=True,
message=message,
new_status=new_status
)
else:
db.rollback()
return StorageLocationStatusUpdateResponse(
layer_name=request.layer_name,
action=request.action,
success=False,
message=message,
new_status={}
)
except Exception as e:
db.rollback()
logger.error(f"更新库位状态失败: {str(e)}")
return StorageLocationStatusUpdateResponse(
layer_name=request.layer_name,
action=request.action,
success=False,
message=f"更新库位状态失败: {str(e)}",
new_status={}
)
@staticmethod
def batch_update_storage_location_status(db: Session, request: BatchStorageLocationStatusUpdateRequest) -> BatchStorageLocationStatusUpdateResponse:
"""
批量更新库位状态
Args:
db: 数据库会话
request: 批量库位状态更新请求
Returns:
BatchStorageLocationStatusUpdateResponse: 批量更新响应
"""
try:
results = []
success_count = 0
failed_count = 0
for layer_name in request.layer_names:
# 创建单个更新请求
single_request = StorageLocationStatusUpdateRequest(
layer_name=layer_name,
action=request.action,
locked_by=request.locked_by,
reason=request.reason
)
# 执行单个更新
result = OperatePointService.update_storage_location_status(db, single_request)
results.append(result)
if result.success:
success_count += 1
else:
failed_count += 1
return BatchStorageLocationStatusUpdateResponse(
total_count=len(request.layer_names),
success_count=success_count,
failed_count=failed_count,
results=results
)
except Exception as e:
logger.error(f"批量更新库位状态失败: {str(e)}")
raise
@staticmethod
def _execute_status_update(storage_location: OperatePointLayer, action: StorageLocationActionEnum, locked_by: Optional[str] = None) -> Tuple[bool, str]:
"""
执行状态更新操作
Args:
storage_location: 库位对象
action: 操作类型(枚举)
locked_by: 锁定者
Returns:
Tuple[bool, str]: (是否成功, 消息)
"""
try:
current_time = datetime.datetime.now()
if action == StorageLocationActionEnum.OCCUPY:
# 占用操作
if storage_location.is_occupied:
return True, "库位已被占用,无需重复操作"
if storage_location.is_disabled:
return False, "库位已禁用,无法占用"
if storage_location.is_locked:
return False, "库位已锁定,无法占用"
storage_location.is_occupied = True
storage_location.last_access_at = current_time
storage_location.goods_stored_at = current_time
storage_location.goods_retrieved_at = None
return True, "库位占用成功"
elif action == StorageLocationActionEnum.RELEASE:
# 释放操作
if not storage_location.is_occupied:
return True, "库位未被占用,无需重复操作"
storage_location.is_occupied = False
storage_location.goods_content = ''
storage_location.goods_weight = None
storage_location.goods_volume = None
storage_location.goods_retrieved_at = current_time
storage_location.goods_stored_at = None
storage_location.last_access_at = current_time
return True, "库位释放成功"
elif action == StorageLocationActionEnum.LOCK:
# 锁定操作
if not locked_by:
return False, "锁定操作必须提供锁定者"
if storage_location.is_locked:
if storage_location.locked_by == locked_by:
return True, "库位已被相同锁定者锁定,无需重复操作"
else:
return False, f"库位已被其他锁定者锁定,当前锁定者: {storage_location.locked_by}"
storage_location.is_locked = True
storage_location.locked_by = locked_by
storage_location.last_access_at = current_time
return True, "库位锁定成功"
elif action == StorageLocationActionEnum.UNLOCK:
# 解锁操作
if not storage_location.is_locked:
return True, "库位未被锁定,无需重复操作"
storage_location.is_locked = False
storage_location.locked_by = None
storage_location.last_access_at = current_time
return True, "库位解锁成功"
elif action == StorageLocationActionEnum.ENABLE:
# 启用操作
if not storage_location.is_disabled:
return True, "库位已启用,无需重复操作"
storage_location.is_disabled = False
storage_location.last_access_at = current_time
return True, "库位启用成功"
elif action == StorageLocationActionEnum.DISABLE:
# 禁用操作
if storage_location.is_disabled:
return True, "库位已禁用,无需重复操作"
storage_location.is_disabled = True
storage_location.last_access_at = current_time
return True, "库位禁用成功"
elif action == StorageLocationActionEnum.SET_EMPTY_TRAY:
# 设置为空托盘
if storage_location.is_empty_tray:
return True, "库位已设置为空托盘,无需重复操作"
storage_location.is_empty_tray = True
storage_location.is_occupied = True
storage_location.last_access_at = current_time
storage_location.goods_stored_at = current_time
storage_location.goods_retrieved_at = None
return True, "库位设置为空托盘成功"
elif action == StorageLocationActionEnum.CLEAR_EMPTY_TRAY:
# 清除空托盘状态
if not storage_location.is_empty_tray:
return True, "库位已非空托盘状态,无需重复操作"
storage_location.is_empty_tray = False
storage_location.last_access_at = current_time
storage_location.is_occupied = False
storage_location.goods_content = ''
storage_location.goods_weight = None
storage_location.goods_volume = None
storage_location.goods_retrieved_at = current_time
storage_location.goods_stored_at = None
return True, "库位清除空托盘状态成功"
else:
return False, f"不支持的操作类型: {action}"
except Exception as e:
logger.error(f"执行状态更新操作失败: {str(e)}")
return False, f"执行状态更新操作失败: {str(e)}"
@staticmethod
def _get_storage_location_status(storage_location: OperatePointLayer) -> Dict[str, Any]:
"""
获取库位状态信息
Args:
storage_location: 库位对象
Returns:
Dict[str, Any]: 状态信息
"""
return {
"id": storage_location.id,
"is_occupied": storage_location.is_occupied,
"is_locked": storage_location.is_locked,
"is_disabled": storage_location.is_disabled,
"is_empty_tray": storage_location.is_empty_tray,
"locked_by": storage_location.locked_by,
"goods_content": storage_location.goods_content,
"last_access_at": storage_location.last_access_at,
"updated_at": storage_location.updated_at
}
@staticmethod
def _get_operator(provided_operator: Optional[str] = None, fallback_operator: str = "系统") -> str:
"""
获取操作人信息
Args:
provided_operator: 提供的操作人(如锁定者)
fallback_operator: 默认操作人
Returns:
str: 操作人名称
"""
# 如果提供了操作人且不为空,使用提供的操作人
if provided_operator and provided_operator.strip():
return provided_operator.strip()
# TODO: 未来可以从当前用户会话中获取操作人信息
# 例如:从 JWT token 或 session 中获取当前登录用户
# current_user = get_current_user()
# if current_user:
# return current_user.username
# 使用默认操作人
return fallback_operator
@staticmethod
def get_operation_result_summary(results: List[StorageLocationStatusUpdateResponse]) -> Dict[str, Any]:
"""
获取操作结果的汇总统计信息
Args:
results: 操作结果列表
Returns:
Dict[str, Any]: 统计信息
"""
if not results:
return {
"total": 0,
"success": 0,
"failed": 0,
"no_change_needed": 0,
"actual_updates": 0
}
total = len(results)
success = sum(1 for r in results if r.success)
failed = total - success
# 统计无需更改的操作(幂等性操作)
no_change_messages = [
"无需重复操作",
"已被占用,无需重复操作",
"未被占用,无需重复操作",
"已被相同锁定者锁定,无需重复操作",
"未被锁定,无需重复操作",
"已启用,无需重复操作",
"已禁用,无需重复操作",
"已设置为空托盘,无需重复操作",
"已非空托盘状态,无需重复操作"
]
no_change_needed = sum(1 for r in results
if r.success and any(msg in r.message for msg in no_change_messages))
actual_updates = success - no_change_needed
return {
"total": total,
"success": success,
"failed": failed,
"no_change_needed": no_change_needed,
"actual_updates": actual_updates
}
@staticmethod
def _parse_extended_fields(config_json: str) -> Dict[str, Any]:
"""
解析扩展字段JSON
Args:
config_json: JSON字符串
Returns:
Dict[str, Any]: 扩展字段字典
"""
try:
if not config_json:
return {}
import json
config = json.loads(config_json)
return config.get("extended_fields", {})
except Exception as e:
logger.error(f"解析扩展字段JSON失败: {str(e)}")
return {}
@staticmethod
def _serialize_extended_fields(extended_fields: Dict[str, Any]) -> str:
"""
序列化扩展字段为JSON
Args:
extended_fields: 扩展字段字典
Returns:
str: JSON字符串
"""
try:
import json
config = {"extended_fields": extended_fields}
return json.dumps(config, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"序列化扩展字段JSON失败: {str(e)}")
return "{}"
@staticmethod
def _infer_field_type(value: Any) -> str:
"""
推断字段类型
Args:
value: 字段值
Returns:
str: 字段类型
"""
if isinstance(value, bool):
return "boolean"
elif isinstance(value, int):
return "integer"
elif isinstance(value, float):
return "number"
elif isinstance(value, str):
# 尝试判断是否为日期
try:
datetime.datetime.fromisoformat(value)
return "date"
except:
return "string"
elif isinstance(value, list):
return "array"
elif isinstance(value, dict):
return "object"
else:
return "string"
# 扩展属性管理相关方法
@staticmethod
def create_extended_property(db: Session, request: ExtendedPropertyCreateRequest) -> ExtendedPropertyCreateResponse:
"""
创建扩展属性
Args:
db: 数据库会话
request: 扩展属性创建请求
Returns:
ExtendedPropertyCreateResponse: 创建响应
"""
try:
# 检查属性名称是否已存在
existing_property = db.query(ExtendedProperty).filter(
ExtendedProperty.property_name == request.property_name,
ExtendedProperty.is_deleted == False
).first()
if existing_property:
raise ValueError(f"属性名称 '{request.property_name}' 已存在")
# 创建扩展属性
extended_property = ExtendedProperty(
property_key=request.property_name, # 使用 property_name 作为 property_key
property_name=request.property_name,
property_type=request.property_type,
is_required=request.is_required,
is_enabled=request.is_enabled,
description=request.description,
placeholder=request.placeholder,
default_value=request.default_value,
options=OperatePointService._serialize_options(request.options),
validation_rules=OperatePointService._serialize_validation_rules(request.validation_rules),
category=request.category,
sort_order=request.sort_order,
display_width=request.display_width,
display_format=request.display_format
)
db.add(extended_property)
db.commit()
db.refresh(extended_property)
# 成功创建扩展属性后,更新所有 OperatePointLayer 的 config_json
try:
OperatePointService._add_extended_property_to_layers(
db=db,
property_name=extended_property.property_name,
property_type=extended_property.property_type,
default_value=extended_property.default_value,
is_required=extended_property.is_required
)
db.commit()
logger.info(f"成功为所有库位层添加扩展属性: {extended_property.property_name}")
except Exception as e:
logger.error(f"添加扩展属性到库位层失败: {str(e)}")
# 这里不抛出异常,因为扩展属性已经创建成功
# 记录操作日志
try:
operator = OperatePointService._get_operator()
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type="创建扩展属性",
affected_storage_locations=[], # 扩展属性影响所有库位,这里传空列表
description=f"创建扩展属性 '{extended_property.property_name}',类型: {extended_property.property_type.value},已应用到所有库位层"
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
return ExtendedPropertyCreateResponse(
id=str(extended_property.id),
property_name=extended_property.property_name,
message="扩展属性创建成功,已应用到所有库位层"
)
except Exception as e:
db.rollback()
logger.error(f"创建扩展属性失败: {str(e)}")
raise
@staticmethod
def get_extended_property_list(db: Session, request: ExtendedPropertyListRequest) -> ExtendedPropertyListResponse:
"""
获取扩展属性列表
Args:
db: 数据库会话
request: 扩展属性列表查询请求
Returns:
ExtendedPropertyListResponse: 属性列表响应
"""
try:
# 构建查询条件
query = db.query(ExtendedProperty).filter(ExtendedProperty.is_deleted == False)
# 属性名称模糊搜索
if request.property_name:
query = query.filter(ExtendedProperty.property_name.like(f"%{request.property_name}%"))
# 属性类型过滤
if request.property_type:
query = query.filter(ExtendedProperty.property_type == request.property_type)
# 分类过滤
if request.category:
query = query.filter(ExtendedProperty.category == request.category)
# 是否启用过滤
if request.is_enabled is not None:
query = query.filter(ExtendedProperty.is_enabled == request.is_enabled)
# 排序
query = query.order_by(ExtendedProperty.sort_order.asc(), ExtendedProperty.created_at.desc())
# 获取总数
total = query.count()
# 分页
offset = (request.page - 1) * request.page_size
properties = query.offset(offset).limit(request.page_size).all()
# 转换为响应对象
property_infos = []
for prop in properties:
property_info = OperatePointService._convert_to_extended_property_info(prop)
property_infos.append(property_info)
# 计算总页数
total_pages = math.ceil(total / request.page_size) if total > 0 else 0
return ExtendedPropertyListResponse(
total=total,
page=request.page,
page_size=request.page_size,
total_pages=total_pages,
properties=property_infos
)
except Exception as e:
logger.error(f"获取扩展属性列表失败: {str(e)}")
raise
@staticmethod
def delete_extended_property(db: Session, property_id: str) -> ExtendedPropertyDeleteResponse:
"""
删除扩展属性
Args:
db: 数据库会话
property_id: 属性ID
Returns:
ExtendedPropertyDeleteResponse: 删除响应
"""
try:
# 查询扩展属性
extended_property = db.query(ExtendedProperty).filter(
ExtendedProperty.id == property_id,
ExtendedProperty.is_deleted == False
).first()
if not extended_property:
raise ValueError("扩展属性不存在")
# 软删除
extended_property.is_deleted = True
extended_property.updated_at = datetime.datetime.now()
# 记录属性名称,用于后续清理
property_name = extended_property.property_name
db.commit()
# 成功删除扩展属性后,从所有 OperatePointLayer 的 config_json 中移除该属性
try:
OperatePointService._remove_extended_property_from_layers(
db=db,
property_name=property_name
)
db.commit()
logger.info(f"成功从所有库位层移除扩展属性: {property_name}")
except Exception as e:
logger.error(f"从库位层移除扩展属性失败: {str(e)}")
# 这里不抛出异常,因为扩展属性已经删除成功
# 记录操作日志
try:
operator = OperatePointService._get_operator()
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type="删除扩展属性",
affected_storage_locations=[], # 扩展属性影响所有库位,这里传空列表
description=f"删除扩展属性 '{property_name}',已从所有库位层移除"
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
return ExtendedPropertyDeleteResponse(
property_id=str(extended_property.id),
property_name=extended_property.property_name,
message="扩展属性删除成功,已从所有库位层移除"
)
except Exception as e:
db.rollback()
logger.error(f"删除扩展属性失败: {str(e)}")
raise
@staticmethod
def _convert_to_extended_property_info(prop: ExtendedProperty) -> ExtendedPropertyInfo:
"""
将数据库模型转换为扩展属性信息响应对象
Args:
prop: 扩展属性数据库模型
Returns:
ExtendedPropertyInfo: 扩展属性信息
"""
return ExtendedPropertyInfo(
id=str(prop.id),
property_name=prop.property_name,
property_type=prop.property_type,
is_required=prop.is_required,
is_enabled=prop.is_enabled,
description=prop.description,
placeholder=prop.placeholder,
default_value=prop.default_value,
options=OperatePointService._parse_options(prop.options),
validation_rules=OperatePointService._parse_validation_rules(prop.validation_rules),
category=prop.category,
sort_order=prop.sort_order,
display_width=prop.display_width,
display_format=prop.display_format,
created_at=prop.created_at,
updated_at=prop.updated_at
)
@staticmethod
def _serialize_options(options: Optional[List[Dict[str, Any]]]) -> Optional[str]:
"""
序列化选项为JSON字符串
Args:
options: 选项列表
Returns:
str: JSON字符串
"""
if not options:
return None
try:
import json
return json.dumps(options, ensure_ascii=False)
except Exception as e:
logger.error(f"序列化选项失败: {str(e)}")
return None
@staticmethod
def _parse_options(options_json: Optional[str]) -> Optional[List[Dict[str, Any]]]:
"""
解析选项JSON字符串
Args:
options_json: JSON字符串
Returns:
List[Dict[str, Any]]: 选项列表
"""
if not options_json:
return None
try:
import json
return json.loads(options_json)
except Exception as e:
logger.error(f"解析选项JSON失败: {str(e)}")
return None
@staticmethod
def _serialize_validation_rules(rules: Optional[Dict[str, Any]]) -> Optional[str]:
"""
序列化验证规则为JSON字符串
Args:
rules: 验证规则字典
Returns:
str: JSON字符串
"""
if not rules:
return None
try:
import json
return json.dumps(rules, ensure_ascii=False)
except Exception as e:
logger.error(f"序列化验证规则失败: {str(e)}")
return None
@staticmethod
def _parse_validation_rules(rules_json: Optional[str]) -> Optional[Dict[str, Any]]:
"""
解析验证规则JSON字符串
Args:
rules_json: JSON字符串
Returns:
Dict[str, Any]: 验证规则字典
"""
if not rules_json:
return None
try:
import json
return json.loads(rules_json)
except Exception as e:
logger.error(f"解析验证规则JSON失败: {str(e)}")
return None
@staticmethod
def _add_extended_property_to_layers(
db: Session,
property_name: str,
property_type: ExtendedPropertyTypeEnum,
default_value: Optional[str] = None,
is_required: bool = False
):
"""
将扩展属性添加到所有库位层的config_json中
Args:
db: 数据库会话
property_name: 属性名称
property_type: 属性类型
default_value: 默认值
is_required: 是否必填
"""
try:
# 获取所有未删除的库位层
layers = db.query(OperatePointLayer).filter(
OperatePointLayer.is_deleted == False
).all()
for layer in layers:
# 解析现有的config_json
config = {}
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
config = {}
# 确保extended_fields字段存在
if 'extended_fields' not in config:
config['extended_fields'] = {}
# 添加新的扩展属性
config['extended_fields'][property_name] = {
'value': default_value,
'type': property_type.value,
'is_required': is_required,
'updated_at': datetime.datetime.now().isoformat()
}
# 更新config_json
try:
import json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
logger.debug(f"为库位层 {layer.id} 添加扩展属性: {property_name}")
except Exception as e:
logger.error(f"序列化库位层 {layer.id} 的config_json失败: {str(e)}")
except Exception as e:
logger.error(f"添加扩展属性到库位层失败: {str(e)}")
raise
@staticmethod
def _remove_extended_property_from_layers(db: Session, property_name: str):
"""
从所有库位层的config_json中移除扩展属性
Args:
db: 数据库会话
property_name: 属性名称
"""
try:
# 获取所有未删除的库位层
layers = db.query(OperatePointLayer).filter(
OperatePointLayer.is_deleted == False
).all()
for layer in layers:
# 解析现有的config_json
config = {}
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
continue
# 检查是否存在extended_fields
if 'extended_fields' in config and property_name in config['extended_fields']:
# 移除指定的扩展属性
del config['extended_fields'][property_name]
# 更新config_json
try:
import json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
logger.debug(f"从库位层 {layer.id} 移除扩展属性: {property_name}")
except Exception as e:
logger.error(f"序列化库位层 {layer.id} 的config_json失败: {str(e)}")
except Exception as e:
logger.error(f"从库位层移除扩展属性失败: {str(e)}")
raise
@staticmethod
def get_storage_location_detail(db: Session, layer_name: str) -> Dict[str, Any]:
"""
获取库位详情
Args:
db: 数据库会话
layer_name: 库位名称
Returns:
Dict[str, Any]: 库位详情信息
"""
try:
# 查询库位
layer = db.query(OperatePointLayer).filter(
OperatePointLayer.layer_name == layer_name,
OperatePointLayer.is_deleted == False
).first()
if not layer:
raise ValueError(f"库位 {layer_name} 不存在")
# 获取库位详细信息
storage_location_info = OperatePointService._convert_to_storage_location_info(
layer, include_operate_point_info=True, include_extended_fields=True
)
# 获取动作点详细信息
operate_point_info = None
if layer.operate_point:
operate_point_info = {
"id": layer.operate_point.id,
"station_name": layer.operate_point.station_name,
"scene_id": layer.operate_point.scene_id,
"storage_area_id": layer.operate_point.storage_area_id,
"storage_area_type": layer.operate_point.storage_area_type.value if layer.operate_point.storage_area_type else None,
"area_name": layer.operate_point.area_name,
"max_layers": layer.operate_point.max_layers,
"current_layers": layer.operate_point.current_layers,
"position_x": layer.operate_point.position_x,
"position_y": layer.operate_point.position_y,
"position_z": layer.operate_point.position_z,
"description": layer.operate_point.description,
"created_at": layer.operate_point.created_at,
"updated_at": layer.operate_point.updated_at,
"is_deleted": layer.operate_point.is_deleted
}
# 获取扩展字段定义
extended_fields_definitions = []
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
if 'extended_fields' in config:
# 获取所有扩展属性定义
extended_properties = db.query(ExtendedProperty).filter(
ExtendedProperty.is_deleted == False,
ExtendedProperty.is_enabled == True
).all()
for prop in extended_properties:
extended_fields_definitions.append(
OperatePointService._convert_to_extended_property_info(prop)
)
except Exception as e:
logger.error(f"解析扩展字段定义失败: {str(e)}")
# 构建状态变更历史(暂时为空,后续可以扩展)
status_history = []
return {
"storage_location": storage_location_info,
"operate_point_info": operate_point_info,
"extended_fields_definitions": extended_fields_definitions,
"status_history": status_history
}
except Exception as e:
logger.error(f"获取库位详情失败: {str(e)}")
raise
@staticmethod
def edit_storage_location(db: Session, layer_name: str, request: Any) -> Dict[str, Any]:
"""
编辑库位信息
Args:
db: 数据库会话
layer_name: 库位名称
request: 库位编辑请求
Returns:
Dict[str, Any]: 编辑响应
"""
try:
# 查询库位
layer = db.query(OperatePointLayer).filter(
OperatePointLayer.layer_name == layer_name,
OperatePointLayer.is_deleted == False
).first()
if not layer:
raise ValueError(f"库位 {layer_name} 不存在")
# 跟踪更新的字段
updated_fields = []
# 更新基本字段
if request.goods_content is not None and request.goods_content != layer.goods_content:
layer.goods_content = request.goods_content
updated_fields.append("goods_content")
if request.goods_weight is not None and request.goods_weight != layer.goods_weight:
layer.goods_weight = request.goods_weight
updated_fields.append("goods_weight")
if request.goods_volume is not None and request.goods_volume != layer.goods_volume:
layer.goods_volume = request.goods_volume
updated_fields.append("goods_volume")
if request.max_weight is not None and request.max_weight != layer.max_weight:
layer.max_weight = request.max_weight
updated_fields.append("max_weight")
if request.max_volume is not None and request.max_volume != layer.max_volume:
layer.max_volume = request.max_volume
updated_fields.append("max_volume")
if request.layer_height is not None and request.layer_height != layer.layer_height:
layer.layer_height = request.layer_height
updated_fields.append("layer_height")
# 更新状态字段
if request.is_locked is not None and request.is_locked != layer.is_locked:
layer.is_locked = request.is_locked
updated_fields.append("is_locked")
if request.is_disabled is not None and request.is_disabled != layer.is_disabled:
layer.is_disabled = request.is_disabled
updated_fields.append("is_disabled")
if request.is_empty_tray is not None and request.is_empty_tray != layer.is_empty_tray:
layer.is_empty_tray = request.is_empty_tray
updated_fields.append("is_empty_tray")
if request.tags is not None and request.tags != layer.tags:
layer.tags = request.tags
updated_fields.append("tags")
if request.description is not None and request.description != layer.description:
layer.description = request.description
updated_fields.append("description")
# 更新扩展字段
if request.extended_fields is not None:
# 解析现有的config_json
config = {}
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
config = {}
# 确保extended_fields字段存在
if 'extended_fields' not in config:
config['extended_fields'] = {}
# 更新扩展字段
for field_name, field_value in request.extended_fields.items():
if field_name in config['extended_fields']:
# 检查值是否真正发生了变化
current_value = config['extended_fields'][field_name].get('value')
if current_value != field_value:
config['extended_fields'][field_name]['value'] = field_value
config['extended_fields'][field_name]['updated_at'] = datetime.datetime.now().isoformat()
updated_fields.append(f"extended_fields.{field_name}")
else:
# 新增扩展字段(需要验证是否存在定义)
extended_property = db.query(ExtendedProperty).filter(
ExtendedProperty.property_name == field_name,
ExtendedProperty.is_deleted == False,
ExtendedProperty.is_enabled == True
).first()
if extended_property:
config['extended_fields'][field_name] = {
'value': field_value,
'type': extended_property.property_type.value,
'is_required': extended_property.is_required,
'updated_at': datetime.datetime.now().isoformat()
}
updated_fields.append(f"extended_fields.{field_name}")
else:
logger.warning(f"扩展字段 {field_name} 不存在或已禁用,跳过更新")
# 只有当扩展字段确实发生了变化时才更新config_json
if any(field.startswith("extended_fields.") for field in updated_fields):
try:
import json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"序列化库位层 {layer.id} 的config_json失败: {str(e)}")
raise ValueError(f"更新扩展字段失败: {str(e)}")
# 检查是否有实际的字段变更
if not updated_fields:
return {
"layer_name": layer_name,
"success": True,
"message": "没有字段发生变化,数据保持不变",
"updated_fields": [],
"updated_storage_location": OperatePointService._convert_to_storage_location_info(
layer, include_operate_point_info=True, include_extended_fields=True
)
}
# 更新最后修改时间
layer.updated_at = datetime.datetime.now()
updated_fields.append("updated_at")
# 提交更改
db.commit()
# 记录操作日志
try:
operator = OperatePointService._get_operator()
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type="编辑库位",
affected_storage_locations=[layer_name],
description=f"编辑库位信息,更新字段: {', '.join(updated_fields)}"
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
# 获取更新后的库位信息
updated_storage_location = OperatePointService._convert_to_storage_location_info(
layer, include_operate_point_info=True, include_extended_fields=True
)
return {
"layer_name": layer_name,
"success": True,
"message": f"库位信息更新成功,共更新 {len(updated_fields)} 个字段",
"updated_fields": updated_fields,
"updated_storage_location": updated_storage_location
}
except Exception as e:
logger.error(f"编辑库位信息失败: {str(e)}")
db.rollback()
raise
@staticmethod
def create_storage_location_log(
db: Session,
operator: str,
operation_type: str,
affected_storage_locations: List[str],
description: Optional[str] = None
) -> StorageLocationLog:
"""
创建库位操作记录
Args:
db: 数据库会话
operator: 操作人
operation_type: 操作类型
affected_storage_locations: 影响的库位ID列表
description: 操作描述
Returns:
StorageLocationLog: 创建的操作记录
"""
try:
import json
# 创建操作记录
log = StorageLocationLog(
operator=operator,
operation_type=operation_type,
affected_storage_locations=json.dumps(affected_storage_locations, ensure_ascii=False),
description=description
)
db.add(log)
db.commit()
return log
except Exception as e:
logger.error(f"创建库位操作记录失败: {str(e)}")
db.rollback()
raise
@staticmethod
def get_storage_location_logs(db: Session, request: StorageLocationLogListRequest) -> StorageLocationLogListResponse:
"""
获取库位操作记录列表
Args:
db: 数据库会话
request: 操作记录查询请求
Returns:
StorageLocationLogListResponse: 操作记录列表响应
"""
try:
# 构建查询条件
query = db.query(StorageLocationLog).filter(StorageLocationLog.is_deleted == False)
# 根据库位ID筛选
if request.storage_location_id:
query = query.filter(StorageLocationLog.affected_storage_locations.like(f'%"{request.storage_location_id}"%'))
# 根据操作人筛选(模糊搜索)
if request.operator:
query = query.filter(StorageLocationLog.operator.like(f"%{request.operator}%"))
# 根据操作类型筛选
if request.operation_type:
query = query.filter(StorageLocationLog.operation_type == request.operation_type)
# 根据时间范围筛选
if request.start_time:
query = query.filter(StorageLocationLog.operation_time >= request.start_time)
if request.end_time:
query = query.filter(StorageLocationLog.operation_time <= request.end_time)
# 按操作时间倒序排列
query = query.order_by(StorageLocationLog.operation_time.desc())
# 获取总数
total = query.count()
# 分页
offset = (request.page - 1) * request.page_size
logs = query.offset(offset).limit(request.page_size).all()
# 转换为响应对象
log_infos = []
for log in logs:
log_info = OperatePointService._convert_to_storage_location_log_info(log)
log_infos.append(log_info)
# 计算总页数
total_pages = math.ceil(total / request.page_size) if total > 0 else 0
return StorageLocationLogListResponse(
total=total,
page=request.page,
page_size=request.page_size,
total_pages=total_pages,
logs=log_infos
)
except Exception as e:
logger.error(f"获取库位操作记录列表失败: {str(e)}")
raise
@staticmethod
def _convert_to_storage_location_log_info(log: StorageLocationLog) -> StorageLocationLogInfo:
"""
将数据库模型转换为库位操作记录信息响应对象
Args:
log: 库位操作记录数据库模型
Returns:
StorageLocationLogInfo: 操作记录信息
"""
try:
import json
# 解析影响的库位ID列表
affected_storage_locations = []
if log.affected_storage_locations:
try:
affected_storage_locations = json.loads(log.affected_storage_locations)
except Exception as e:
logger.error(f"解析影响库位列表失败: {str(e)}")
affected_storage_locations = []
return StorageLocationLogInfo(
id=str(log.id),
operation_time=log.operation_time,
operator=log.operator,
operation_type=log.operation_type,
affected_storage_locations=affected_storage_locations,
description=log.description,
created_at=log.created_at
)
except Exception as e:
logger.error(f"转换库位操作记录信息失败: {str(e)}")
raise