VWED_server/services/operate_point_service.py

1434 lines
56 KiB
Python
Raw Permalink Normal View History

2025-07-14 10:29:37 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
动作点管理服务
处理动作点和库位的管理操作
"""
import math
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func
from data.models import OperatePoint, OperatePointLayer, StorageArea, StorageAreaType, ExtendedProperty, ExtendedPropertyTypeEnum, StorageLocationLog
from routes.model.operate_point_model import (
OperatePointLayerInfo,
StorageLocationListRequest, StorageLocationListResponse,
StorageLocationInfo, StorageLocationStatistics,
StorageLocationStatusUpdateRequest, StorageLocationStatusUpdateResponse,
BatchStorageLocationStatusUpdateRequest, BatchStorageLocationStatusUpdateResponse,
StorageLocationActionEnum,
ExtendedPropertyCreateRequest, ExtendedPropertyCreateResponse,
ExtendedPropertyListRequest, ExtendedPropertyListResponse,
ExtendedPropertyInfo, ExtendedPropertyDeleteResponse,
StorageLocationDetailResponse, StorageLocationEditRequest, StorageLocationEditResponse,
StorageLocationLogInfo, StorageLocationLogListRequest, StorageLocationLogListResponse,
)
from utils.logger import get_logger
import datetime
logger = get_logger("services.operate_point_service")
class OperatePointService:
"""库位管理服务"""
@staticmethod
def get_storage_location_list(db: Session, request: StorageLocationListRequest) -> StorageLocationListResponse:
"""
获取库位列表
Args:
db: 数据库会话
request: 库位列表查询请求
Returns:
StorageLocationListResponse: 库位列表响应
"""
try:
# 构建查询条件 - 主要查询 OperatePointLayer 表
query = db.query(OperatePointLayer).filter(OperatePointLayer.is_deleted == False)
# 通过关联的 OperatePoint 进行筛选
if request.scene_id:
query = query.join(OperatePoint).filter(OperatePoint.scene_id == request.scene_id)
if request.storage_area_id:
query = query.join(OperatePoint).filter(OperatePoint.storage_area_id == request.storage_area_id)
# 站点名称模糊搜索
if request.station_name:
query = query.filter(OperatePointLayer.station_name.like(f"%{request.station_name}%"))
2025-07-17 15:47:56 +08:00
2025-07-14 10:29:37 +08:00
# 库位名称模糊搜索
if request.layer_name:
query = query.filter(OperatePointLayer.layer_name.like(f"%{request.layer_name}%"))
# 是否禁用过滤
if request.is_disabled is not None:
query = query.filter(OperatePointLayer.is_disabled == request.is_disabled)
# 是否占用过滤
if request.is_occupied is not None:
query = query.filter(OperatePointLayer.is_occupied == request.is_occupied)
# 是否锁定过滤
if request.is_locked is not None:
query = query.filter(OperatePointLayer.is_locked == request.is_locked)
# 是否空托盘过滤
if request.is_empty_tray is not None:
query = query.filter(OperatePointLayer.is_empty_tray == request.is_empty_tray)
# 获取总数
total = query.count()
# 分页
offset = (request.page - 1) * request.page_size
storage_locations = query.offset(offset).limit(request.page_size).all()
# 转换为响应对象
storage_location_infos = []
for sl in storage_locations:
storage_location_info = OperatePointService._convert_to_storage_location_info(
sl,
request.include_operate_point_info,
request.include_extended_fields
)
storage_location_infos.append(storage_location_info)
# 计算总页数
total_pages = math.ceil(total / request.page_size) if total > 0 else 0
# 获取统计信息
# statistics = OperatePointService._get_storage_location_statistics(db, request)
return StorageLocationListResponse(
total=total,
page=request.page,
page_size=request.page_size,
total_pages=total_pages,
storage_locations=storage_location_infos,
# statistics=statistics
)
except Exception as e:
logger.error(f"获取库位列表失败: {str(e)}")
raise
@staticmethod
def _convert_to_storage_location_info(layer: OperatePointLayer, include_operate_point_info: bool = True, include_extended_fields: bool = False) -> StorageLocationInfo:
"""
将数据库模型转换为库位信息响应对象
Args:
layer: 动作点分层数据库模型
include_operate_point_info: 是否包含动作点信息
Returns:
StorageLocationInfo: 库位信息
"""
# 构建库位信息
storage_location_info = StorageLocationInfo(
id=layer.id,
layer_index=layer.layer_index,
layer_name=layer.layer_name,
is_occupied=layer.is_occupied,
is_locked=layer.is_locked,
is_disabled=layer.is_disabled,
is_empty_tray=layer.is_empty_tray,
locked_by=layer.locked_by,
goods_content=layer.goods_content,
goods_weight=layer.goods_weight,
goods_volume=layer.goods_volume,
goods_stored_at=layer.goods_stored_at,
goods_retrieved_at=layer.goods_retrieved_at,
last_access_at=layer.last_access_at,
max_weight=layer.max_weight,
max_volume=layer.max_volume,
layer_height=layer.layer_height,
tags=layer.tags,
description=layer.description,
created_at=layer.created_at,
updated_at=layer.updated_at
)
# 如果需要包含动作点信息,则直接平铺到库位信息中
if include_operate_point_info and layer.operate_point:
storage_location_info.operate_point_id = layer.operate_point.id
storage_location_info.station_name = layer.operate_point.station_name
storage_location_info.scene_id = layer.operate_point.scene_id
storage_location_info.storage_area_id = layer.operate_point.storage_area_id
storage_location_info.storage_area_type = layer.operate_point.storage_area_type.value if layer.operate_point.storage_area_type else None
storage_location_info.area_name = layer.operate_point.area_name
storage_location_info.max_layers = layer.operate_point.max_layers
storage_location_info.current_layers = layer.operate_point.current_layers
storage_location_info.position_x = layer.operate_point.position_x
storage_location_info.position_y = layer.operate_point.position_y
storage_location_info.position_z = layer.operate_point.position_z
storage_location_info.operate_point_description = layer.operate_point.description
# 如果需要包含扩展字段,则从 config_json 中解析
if include_extended_fields and layer.config_json:
extended_fields = OperatePointService._parse_extended_fields(layer.config_json)
# 将扩展字段转换为简单的键值对格式,便于前端处理
storage_location_info.extended_fields = {
field_name: field_data["value"]
for field_name, field_data in extended_fields.items()
}
elif include_extended_fields:
storage_location_info.extended_fields = {}
return storage_location_info
@staticmethod
def update_storage_location_status(db: Session, request: StorageLocationStatusUpdateRequest) -> StorageLocationStatusUpdateResponse:
"""
更新库位状态
Args:
db: 数据库会话
request: 库位状态更新请求
Returns:
StorageLocationStatusUpdateResponse: 更新响应
"""
try:
2025-07-15 15:05:04 +08:00
# 查询库位使用layer_name进行查询
2025-07-14 10:29:37 +08:00
storage_location = db.query(OperatePointLayer).filter(
2025-07-15 15:05:04 +08:00
OperatePointLayer.layer_name == request.layer_name,
2025-07-14 10:29:37 +08:00
OperatePointLayer.is_deleted == False
).first()
if not storage_location:
return StorageLocationStatusUpdateResponse(
2025-07-15 15:05:04 +08:00
layer_name=request.layer_name,
2025-07-14 10:29:37 +08:00
action=request.action,
success=False,
message="库位不存在",
new_status={}
)
# 执行状态更新
success, message = OperatePointService._execute_status_update(storage_location, request.action, request.locked_by)
if success:
db.commit()
# 记录操作日志
try:
operator = OperatePointService._get_operator(request.locked_by)
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type=request.action.value,
2025-07-15 15:05:04 +08:00
affected_storage_locations=[request.layer_name],
2025-07-14 10:29:37 +08:00
description=request.reason
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
# 获取更新后的状态
new_status = OperatePointService._get_storage_location_status(storage_location)
return StorageLocationStatusUpdateResponse(
2025-07-15 15:05:04 +08:00
layer_name=request.layer_name,
2025-07-14 10:29:37 +08:00
action=request.action,
success=True,
message=message,
new_status=new_status
)
else:
db.rollback()
return StorageLocationStatusUpdateResponse(
2025-07-15 15:05:04 +08:00
layer_name=request.layer_name,
2025-07-14 10:29:37 +08:00
action=request.action,
success=False,
message=message,
new_status={}
)
except Exception as e:
db.rollback()
logger.error(f"更新库位状态失败: {str(e)}")
return StorageLocationStatusUpdateResponse(
2025-07-15 15:05:04 +08:00
layer_name=request.layer_name,
2025-07-14 10:29:37 +08:00
action=request.action,
success=False,
message=f"更新库位状态失败: {str(e)}",
new_status={}
)
@staticmethod
def batch_update_storage_location_status(db: Session, request: BatchStorageLocationStatusUpdateRequest) -> BatchStorageLocationStatusUpdateResponse:
"""
批量更新库位状态
Args:
db: 数据库会话
request: 批量库位状态更新请求
Returns:
BatchStorageLocationStatusUpdateResponse: 批量更新响应
"""
try:
results = []
success_count = 0
failed_count = 0
2025-07-15 15:05:04 +08:00
for layer_name in request.layer_names:
2025-07-14 10:29:37 +08:00
# 创建单个更新请求
single_request = StorageLocationStatusUpdateRequest(
2025-07-15 15:05:04 +08:00
layer_name=layer_name,
2025-07-14 10:29:37 +08:00
action=request.action,
locked_by=request.locked_by,
reason=request.reason
)
# 执行单个更新
result = OperatePointService.update_storage_location_status(db, single_request)
results.append(result)
if result.success:
success_count += 1
else:
failed_count += 1
return BatchStorageLocationStatusUpdateResponse(
2025-07-15 15:05:04 +08:00
total_count=len(request.layer_names),
2025-07-14 10:29:37 +08:00
success_count=success_count,
failed_count=failed_count,
results=results
)
except Exception as e:
logger.error(f"批量更新库位状态失败: {str(e)}")
raise
@staticmethod
def _execute_status_update(storage_location: OperatePointLayer, action: StorageLocationActionEnum, locked_by: Optional[str] = None) -> Tuple[bool, str]:
"""
执行状态更新操作
Args:
storage_location: 库位对象
action: 操作类型枚举
locked_by: 锁定者
Returns:
Tuple[bool, str]: (是否成功, 消息)
"""
try:
current_time = datetime.datetime.now()
if action == StorageLocationActionEnum.OCCUPY:
# 占用操作
if storage_location.is_occupied:
return True, "库位已被占用,无需重复操作"
if storage_location.is_disabled:
return False, "库位已禁用,无法占用"
if storage_location.is_locked:
return False, "库位已锁定,无法占用"
storage_location.is_occupied = True
storage_location.last_access_at = current_time
storage_location.goods_stored_at = current_time
storage_location.goods_retrieved_at = None
return True, "库位占用成功"
elif action == StorageLocationActionEnum.RELEASE:
# 释放操作
if not storage_location.is_occupied:
return True, "库位未被占用,无需重复操作"
storage_location.is_occupied = False
storage_location.goods_content = ''
storage_location.goods_weight = None
storage_location.goods_volume = None
storage_location.goods_retrieved_at = current_time
storage_location.goods_stored_at = None
storage_location.last_access_at = current_time
return True, "库位释放成功"
elif action == StorageLocationActionEnum.LOCK:
# 锁定操作
if not locked_by:
return False, "锁定操作必须提供锁定者"
if storage_location.is_locked:
if storage_location.locked_by == locked_by:
return True, "库位已被相同锁定者锁定,无需重复操作"
else:
return False, f"库位已被其他锁定者锁定,当前锁定者: {storage_location.locked_by}"
storage_location.is_locked = True
storage_location.locked_by = locked_by
storage_location.last_access_at = current_time
return True, "库位锁定成功"
elif action == StorageLocationActionEnum.UNLOCK:
# 解锁操作
if not storage_location.is_locked:
return True, "库位未被锁定,无需重复操作"
storage_location.is_locked = False
storage_location.locked_by = None
storage_location.last_access_at = current_time
return True, "库位解锁成功"
elif action == StorageLocationActionEnum.ENABLE:
# 启用操作
if not storage_location.is_disabled:
return True, "库位已启用,无需重复操作"
storage_location.is_disabled = False
storage_location.last_access_at = current_time
return True, "库位启用成功"
elif action == StorageLocationActionEnum.DISABLE:
# 禁用操作
if storage_location.is_disabled:
return True, "库位已禁用,无需重复操作"
storage_location.is_disabled = True
storage_location.last_access_at = current_time
return True, "库位禁用成功"
elif action == StorageLocationActionEnum.SET_EMPTY_TRAY:
# 设置为空托盘
if storage_location.is_empty_tray:
return True, "库位已设置为空托盘,无需重复操作"
storage_location.is_empty_tray = True
2025-07-15 15:05:04 +08:00
storage_location.is_occupied = True
2025-07-14 10:29:37 +08:00
storage_location.last_access_at = current_time
2025-07-15 15:05:04 +08:00
storage_location.goods_stored_at = current_time
storage_location.goods_retrieved_at = None
2025-07-14 10:29:37 +08:00
return True, "库位设置为空托盘成功"
elif action == StorageLocationActionEnum.CLEAR_EMPTY_TRAY:
# 清除空托盘状态
if not storage_location.is_empty_tray:
return True, "库位已非空托盘状态,无需重复操作"
storage_location.is_empty_tray = False
storage_location.last_access_at = current_time
2025-07-15 15:05:04 +08:00
storage_location.is_occupied = False
storage_location.goods_content = ''
storage_location.goods_weight = None
storage_location.goods_volume = None
storage_location.goods_retrieved_at = current_time
storage_location.goods_stored_at = None
2025-07-14 10:29:37 +08:00
return True, "库位清除空托盘状态成功"
else:
return False, f"不支持的操作类型: {action}"
except Exception as e:
logger.error(f"执行状态更新操作失败: {str(e)}")
return False, f"执行状态更新操作失败: {str(e)}"
@staticmethod
def _get_storage_location_status(storage_location: OperatePointLayer) -> Dict[str, Any]:
"""
获取库位状态信息
Args:
storage_location: 库位对象
Returns:
Dict[str, Any]: 状态信息
"""
return {
"id": storage_location.id,
"is_occupied": storage_location.is_occupied,
"is_locked": storage_location.is_locked,
"is_disabled": storage_location.is_disabled,
"is_empty_tray": storage_location.is_empty_tray,
"locked_by": storage_location.locked_by,
"goods_content": storage_location.goods_content,
"last_access_at": storage_location.last_access_at,
"updated_at": storage_location.updated_at
}
@staticmethod
def _get_operator(provided_operator: Optional[str] = None, fallback_operator: str = "系统") -> str:
"""
获取操作人信息
Args:
provided_operator: 提供的操作人如锁定者
fallback_operator: 默认操作人
Returns:
str: 操作人名称
"""
# 如果提供了操作人且不为空,使用提供的操作人
if provided_operator and provided_operator.strip():
return provided_operator.strip()
# TODO: 未来可以从当前用户会话中获取操作人信息
# 例如:从 JWT token 或 session 中获取当前登录用户
# current_user = get_current_user()
# if current_user:
# return current_user.username
# 使用默认操作人
return fallback_operator
@staticmethod
def get_operation_result_summary(results: List[StorageLocationStatusUpdateResponse]) -> Dict[str, Any]:
"""
获取操作结果的汇总统计信息
Args:
results: 操作结果列表
Returns:
Dict[str, Any]: 统计信息
"""
if not results:
return {
"total": 0,
"success": 0,
"failed": 0,
"no_change_needed": 0,
"actual_updates": 0
}
total = len(results)
success = sum(1 for r in results if r.success)
failed = total - success
# 统计无需更改的操作(幂等性操作)
no_change_messages = [
"无需重复操作",
"已被占用,无需重复操作",
"未被占用,无需重复操作",
"已被相同锁定者锁定,无需重复操作",
"未被锁定,无需重复操作",
"已启用,无需重复操作",
"已禁用,无需重复操作",
"已设置为空托盘,无需重复操作",
"已非空托盘状态,无需重复操作"
]
no_change_needed = sum(1 for r in results
if r.success and any(msg in r.message for msg in no_change_messages))
actual_updates = success - no_change_needed
return {
"total": total,
"success": success,
"failed": failed,
"no_change_needed": no_change_needed,
"actual_updates": actual_updates
}
@staticmethod
def _parse_extended_fields(config_json: str) -> Dict[str, Any]:
"""
解析扩展字段JSON
Args:
config_json: JSON字符串
Returns:
Dict[str, Any]: 扩展字段字典
"""
try:
if not config_json:
return {}
import json
config = json.loads(config_json)
return config.get("extended_fields", {})
except Exception as e:
logger.error(f"解析扩展字段JSON失败: {str(e)}")
return {}
@staticmethod
def _serialize_extended_fields(extended_fields: Dict[str, Any]) -> str:
"""
序列化扩展字段为JSON
Args:
extended_fields: 扩展字段字典
Returns:
str: JSON字符串
"""
try:
import json
config = {"extended_fields": extended_fields}
return json.dumps(config, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"序列化扩展字段JSON失败: {str(e)}")
return "{}"
@staticmethod
def _infer_field_type(value: Any) -> str:
"""
推断字段类型
Args:
value: 字段值
Returns:
str: 字段类型
"""
if isinstance(value, bool):
return "boolean"
elif isinstance(value, int):
return "integer"
elif isinstance(value, float):
return "number"
elif isinstance(value, str):
# 尝试判断是否为日期
try:
datetime.datetime.fromisoformat(value)
return "date"
except:
return "string"
elif isinstance(value, list):
return "array"
elif isinstance(value, dict):
return "object"
else:
return "string"
# 扩展属性管理相关方法
@staticmethod
def create_extended_property(db: Session, request: ExtendedPropertyCreateRequest) -> ExtendedPropertyCreateResponse:
"""
创建扩展属性
Args:
db: 数据库会话
request: 扩展属性创建请求
Returns:
ExtendedPropertyCreateResponse: 创建响应
"""
try:
# 检查属性名称是否已存在
existing_property = db.query(ExtendedProperty).filter(
ExtendedProperty.property_name == request.property_name,
ExtendedProperty.is_deleted == False
).first()
if existing_property:
raise ValueError(f"属性名称 '{request.property_name}' 已存在")
# 创建扩展属性
extended_property = ExtendedProperty(
property_key=request.property_name, # 使用 property_name 作为 property_key
property_name=request.property_name,
property_type=request.property_type,
is_required=request.is_required,
is_enabled=request.is_enabled,
description=request.description,
placeholder=request.placeholder,
default_value=request.default_value,
options=OperatePointService._serialize_options(request.options),
validation_rules=OperatePointService._serialize_validation_rules(request.validation_rules),
category=request.category,
sort_order=request.sort_order,
display_width=request.display_width,
display_format=request.display_format
)
db.add(extended_property)
db.commit()
db.refresh(extended_property)
# 成功创建扩展属性后,更新所有 OperatePointLayer 的 config_json
try:
OperatePointService._add_extended_property_to_layers(
db=db,
property_name=extended_property.property_name,
property_type=extended_property.property_type,
default_value=extended_property.default_value,
is_required=extended_property.is_required
)
db.commit()
logger.info(f"成功为所有库位层添加扩展属性: {extended_property.property_name}")
except Exception as e:
logger.error(f"添加扩展属性到库位层失败: {str(e)}")
# 这里不抛出异常,因为扩展属性已经创建成功
# 记录操作日志
try:
operator = OperatePointService._get_operator()
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type="创建扩展属性",
affected_storage_locations=[], # 扩展属性影响所有库位,这里传空列表
description=f"创建扩展属性 '{extended_property.property_name}',类型: {extended_property.property_type.value},已应用到所有库位层"
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
return ExtendedPropertyCreateResponse(
id=str(extended_property.id),
property_name=extended_property.property_name,
message="扩展属性创建成功,已应用到所有库位层"
)
except Exception as e:
db.rollback()
logger.error(f"创建扩展属性失败: {str(e)}")
raise
@staticmethod
def get_extended_property_list(db: Session, request: ExtendedPropertyListRequest) -> ExtendedPropertyListResponse:
"""
获取扩展属性列表
Args:
db: 数据库会话
request: 扩展属性列表查询请求
Returns:
ExtendedPropertyListResponse: 属性列表响应
"""
try:
# 构建查询条件
query = db.query(ExtendedProperty).filter(ExtendedProperty.is_deleted == False)
# 属性名称模糊搜索
if request.property_name:
query = query.filter(ExtendedProperty.property_name.like(f"%{request.property_name}%"))
# 属性类型过滤
if request.property_type:
query = query.filter(ExtendedProperty.property_type == request.property_type)
# 分类过滤
if request.category:
query = query.filter(ExtendedProperty.category == request.category)
# 是否启用过滤
if request.is_enabled is not None:
query = query.filter(ExtendedProperty.is_enabled == request.is_enabled)
# 排序
query = query.order_by(ExtendedProperty.sort_order.asc(), ExtendedProperty.created_at.desc())
# 获取总数
total = query.count()
# 分页
offset = (request.page - 1) * request.page_size
properties = query.offset(offset).limit(request.page_size).all()
# 转换为响应对象
property_infos = []
for prop in properties:
property_info = OperatePointService._convert_to_extended_property_info(prop)
property_infos.append(property_info)
# 计算总页数
total_pages = math.ceil(total / request.page_size) if total > 0 else 0
return ExtendedPropertyListResponse(
total=total,
page=request.page,
page_size=request.page_size,
total_pages=total_pages,
properties=property_infos
)
except Exception as e:
logger.error(f"获取扩展属性列表失败: {str(e)}")
raise
@staticmethod
def delete_extended_property(db: Session, property_id: str) -> ExtendedPropertyDeleteResponse:
"""
删除扩展属性
Args:
db: 数据库会话
property_id: 属性ID
Returns:
ExtendedPropertyDeleteResponse: 删除响应
"""
try:
# 查询扩展属性
extended_property = db.query(ExtendedProperty).filter(
ExtendedProperty.id == property_id,
ExtendedProperty.is_deleted == False
).first()
if not extended_property:
raise ValueError("扩展属性不存在")
# 软删除
extended_property.is_deleted = True
extended_property.updated_at = datetime.datetime.now()
# 记录属性名称,用于后续清理
property_name = extended_property.property_name
db.commit()
# 成功删除扩展属性后,从所有 OperatePointLayer 的 config_json 中移除该属性
try:
OperatePointService._remove_extended_property_from_layers(
db=db,
property_name=property_name
)
db.commit()
logger.info(f"成功从所有库位层移除扩展属性: {property_name}")
except Exception as e:
logger.error(f"从库位层移除扩展属性失败: {str(e)}")
# 这里不抛出异常,因为扩展属性已经删除成功
# 记录操作日志
try:
operator = OperatePointService._get_operator()
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type="删除扩展属性",
affected_storage_locations=[], # 扩展属性影响所有库位,这里传空列表
description=f"删除扩展属性 '{property_name}',已从所有库位层移除"
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
return ExtendedPropertyDeleteResponse(
property_id=str(extended_property.id),
property_name=extended_property.property_name,
message="扩展属性删除成功,已从所有库位层移除"
)
except Exception as e:
db.rollback()
logger.error(f"删除扩展属性失败: {str(e)}")
raise
@staticmethod
def _convert_to_extended_property_info(prop: ExtendedProperty) -> ExtendedPropertyInfo:
"""
将数据库模型转换为扩展属性信息响应对象
Args:
prop: 扩展属性数据库模型
Returns:
ExtendedPropertyInfo: 扩展属性信息
"""
return ExtendedPropertyInfo(
id=str(prop.id),
property_name=prop.property_name,
property_type=prop.property_type,
is_required=prop.is_required,
is_enabled=prop.is_enabled,
description=prop.description,
placeholder=prop.placeholder,
default_value=prop.default_value,
options=OperatePointService._parse_options(prop.options),
validation_rules=OperatePointService._parse_validation_rules(prop.validation_rules),
category=prop.category,
sort_order=prop.sort_order,
display_width=prop.display_width,
display_format=prop.display_format,
created_at=prop.created_at,
updated_at=prop.updated_at
)
@staticmethod
def _serialize_options(options: Optional[List[Dict[str, Any]]]) -> Optional[str]:
"""
序列化选项为JSON字符串
Args:
options: 选项列表
Returns:
str: JSON字符串
"""
if not options:
return None
try:
import json
return json.dumps(options, ensure_ascii=False)
except Exception as e:
logger.error(f"序列化选项失败: {str(e)}")
return None
@staticmethod
def _parse_options(options_json: Optional[str]) -> Optional[List[Dict[str, Any]]]:
"""
解析选项JSON字符串
Args:
options_json: JSON字符串
Returns:
List[Dict[str, Any]]: 选项列表
"""
if not options_json:
return None
try:
import json
return json.loads(options_json)
except Exception as e:
logger.error(f"解析选项JSON失败: {str(e)}")
return None
@staticmethod
def _serialize_validation_rules(rules: Optional[Dict[str, Any]]) -> Optional[str]:
"""
序列化验证规则为JSON字符串
Args:
rules: 验证规则字典
Returns:
str: JSON字符串
"""
if not rules:
return None
try:
import json
return json.dumps(rules, ensure_ascii=False)
except Exception as e:
logger.error(f"序列化验证规则失败: {str(e)}")
return None
@staticmethod
def _parse_validation_rules(rules_json: Optional[str]) -> Optional[Dict[str, Any]]:
"""
解析验证规则JSON字符串
Args:
rules_json: JSON字符串
Returns:
Dict[str, Any]: 验证规则字典
"""
if not rules_json:
return None
try:
import json
return json.loads(rules_json)
except Exception as e:
logger.error(f"解析验证规则JSON失败: {str(e)}")
return None
@staticmethod
def _add_extended_property_to_layers(
db: Session,
property_name: str,
property_type: ExtendedPropertyTypeEnum,
default_value: Optional[str] = None,
is_required: bool = False
):
"""
将扩展属性添加到所有库位层的config_json中
Args:
db: 数据库会话
property_name: 属性名称
property_type: 属性类型
default_value: 默认值
is_required: 是否必填
"""
try:
# 获取所有未删除的库位层
layers = db.query(OperatePointLayer).filter(
OperatePointLayer.is_deleted == False
).all()
for layer in layers:
# 解析现有的config_json
config = {}
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
config = {}
# 确保extended_fields字段存在
if 'extended_fields' not in config:
config['extended_fields'] = {}
# 添加新的扩展属性
config['extended_fields'][property_name] = {
'value': default_value,
'type': property_type.value,
'is_required': is_required,
'updated_at': datetime.datetime.now().isoformat()
}
# 更新config_json
try:
import json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
logger.debug(f"为库位层 {layer.id} 添加扩展属性: {property_name}")
except Exception as e:
logger.error(f"序列化库位层 {layer.id} 的config_json失败: {str(e)}")
except Exception as e:
logger.error(f"添加扩展属性到库位层失败: {str(e)}")
raise
@staticmethod
def _remove_extended_property_from_layers(db: Session, property_name: str):
"""
从所有库位层的config_json中移除扩展属性
Args:
db: 数据库会话
property_name: 属性名称
"""
try:
# 获取所有未删除的库位层
layers = db.query(OperatePointLayer).filter(
OperatePointLayer.is_deleted == False
).all()
for layer in layers:
# 解析现有的config_json
config = {}
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
continue
# 检查是否存在extended_fields
if 'extended_fields' in config and property_name in config['extended_fields']:
# 移除指定的扩展属性
del config['extended_fields'][property_name]
# 更新config_json
try:
import json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
logger.debug(f"从库位层 {layer.id} 移除扩展属性: {property_name}")
except Exception as e:
logger.error(f"序列化库位层 {layer.id} 的config_json失败: {str(e)}")
except Exception as e:
logger.error(f"从库位层移除扩展属性失败: {str(e)}")
raise
@staticmethod
2025-07-15 15:05:04 +08:00
def get_storage_location_detail(db: Session, layer_name: str) -> Dict[str, Any]:
2025-07-14 10:29:37 +08:00
"""
获取库位详情
Args:
db: 数据库会话
2025-07-15 15:05:04 +08:00
layer_name: 库位名称
2025-07-14 10:29:37 +08:00
Returns:
Dict[str, Any]: 库位详情信息
"""
try:
# 查询库位
layer = db.query(OperatePointLayer).filter(
2025-07-15 15:05:04 +08:00
OperatePointLayer.layer_name == layer_name,
2025-07-14 10:29:37 +08:00
OperatePointLayer.is_deleted == False
).first()
if not layer:
2025-07-15 15:05:04 +08:00
raise ValueError(f"库位 {layer_name} 不存在")
2025-07-14 10:29:37 +08:00
# 获取库位详细信息
storage_location_info = OperatePointService._convert_to_storage_location_info(
layer, include_operate_point_info=True, include_extended_fields=True
)
# 获取动作点详细信息
operate_point_info = None
if layer.operate_point:
operate_point_info = {
"id": layer.operate_point.id,
"station_name": layer.operate_point.station_name,
"scene_id": layer.operate_point.scene_id,
"storage_area_id": layer.operate_point.storage_area_id,
"storage_area_type": layer.operate_point.storage_area_type.value if layer.operate_point.storage_area_type else None,
"area_name": layer.operate_point.area_name,
"max_layers": layer.operate_point.max_layers,
"current_layers": layer.operate_point.current_layers,
"position_x": layer.operate_point.position_x,
"position_y": layer.operate_point.position_y,
"position_z": layer.operate_point.position_z,
"description": layer.operate_point.description,
"created_at": layer.operate_point.created_at,
"updated_at": layer.operate_point.updated_at,
"is_deleted": layer.operate_point.is_deleted
}
# 获取扩展字段定义
extended_fields_definitions = []
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
if 'extended_fields' in config:
# 获取所有扩展属性定义
extended_properties = db.query(ExtendedProperty).filter(
ExtendedProperty.is_deleted == False,
ExtendedProperty.is_enabled == True
).all()
for prop in extended_properties:
extended_fields_definitions.append(
OperatePointService._convert_to_extended_property_info(prop)
)
except Exception as e:
logger.error(f"解析扩展字段定义失败: {str(e)}")
# 构建状态变更历史(暂时为空,后续可以扩展)
status_history = []
return {
"storage_location": storage_location_info,
"operate_point_info": operate_point_info,
"extended_fields_definitions": extended_fields_definitions,
"status_history": status_history
}
except Exception as e:
logger.error(f"获取库位详情失败: {str(e)}")
raise
@staticmethod
2025-07-15 15:05:04 +08:00
def edit_storage_location(db: Session, layer_name: str, request: Any) -> Dict[str, Any]:
2025-07-14 10:29:37 +08:00
"""
编辑库位信息
Args:
db: 数据库会话
2025-07-15 15:05:04 +08:00
layer_name: 库位名称
2025-07-14 10:29:37 +08:00
request: 库位编辑请求
Returns:
Dict[str, Any]: 编辑响应
"""
try:
# 查询库位
layer = db.query(OperatePointLayer).filter(
2025-07-15 15:05:04 +08:00
OperatePointLayer.layer_name == layer_name,
2025-07-14 10:29:37 +08:00
OperatePointLayer.is_deleted == False
).first()
if not layer:
2025-07-15 15:05:04 +08:00
raise ValueError(f"库位 {layer_name} 不存在")
2025-07-14 10:29:37 +08:00
# 跟踪更新的字段
updated_fields = []
# 更新基本字段
if request.goods_content is not None and request.goods_content != layer.goods_content:
layer.goods_content = request.goods_content
updated_fields.append("goods_content")
if request.goods_weight is not None and request.goods_weight != layer.goods_weight:
layer.goods_weight = request.goods_weight
updated_fields.append("goods_weight")
if request.goods_volume is not None and request.goods_volume != layer.goods_volume:
layer.goods_volume = request.goods_volume
updated_fields.append("goods_volume")
if request.max_weight is not None and request.max_weight != layer.max_weight:
layer.max_weight = request.max_weight
updated_fields.append("max_weight")
if request.max_volume is not None and request.max_volume != layer.max_volume:
layer.max_volume = request.max_volume
updated_fields.append("max_volume")
if request.layer_height is not None and request.layer_height != layer.layer_height:
layer.layer_height = request.layer_height
updated_fields.append("layer_height")
# 更新状态字段
if request.is_locked is not None and request.is_locked != layer.is_locked:
layer.is_locked = request.is_locked
updated_fields.append("is_locked")
if request.is_disabled is not None and request.is_disabled != layer.is_disabled:
layer.is_disabled = request.is_disabled
updated_fields.append("is_disabled")
if request.is_empty_tray is not None and request.is_empty_tray != layer.is_empty_tray:
layer.is_empty_tray = request.is_empty_tray
updated_fields.append("is_empty_tray")
if request.tags is not None and request.tags != layer.tags:
layer.tags = request.tags
updated_fields.append("tags")
if request.description is not None and request.description != layer.description:
layer.description = request.description
updated_fields.append("description")
# 更新扩展字段
if request.extended_fields is not None:
# 解析现有的config_json
config = {}
if layer.config_json:
try:
import json
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
config = {}
# 确保extended_fields字段存在
if 'extended_fields' not in config:
config['extended_fields'] = {}
# 更新扩展字段
for field_name, field_value in request.extended_fields.items():
if field_name in config['extended_fields']:
# 检查值是否真正发生了变化
current_value = config['extended_fields'][field_name].get('value')
if current_value != field_value:
config['extended_fields'][field_name]['value'] = field_value
config['extended_fields'][field_name]['updated_at'] = datetime.datetime.now().isoformat()
updated_fields.append(f"extended_fields.{field_name}")
else:
# 新增扩展字段(需要验证是否存在定义)
extended_property = db.query(ExtendedProperty).filter(
ExtendedProperty.property_name == field_name,
ExtendedProperty.is_deleted == False,
ExtendedProperty.is_enabled == True
).first()
if extended_property:
config['extended_fields'][field_name] = {
'value': field_value,
'type': extended_property.property_type.value,
'is_required': extended_property.is_required,
'updated_at': datetime.datetime.now().isoformat()
}
updated_fields.append(f"extended_fields.{field_name}")
else:
logger.warning(f"扩展字段 {field_name} 不存在或已禁用,跳过更新")
# 只有当扩展字段确实发生了变化时才更新config_json
if any(field.startswith("extended_fields.") for field in updated_fields):
try:
import json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"序列化库位层 {layer.id} 的config_json失败: {str(e)}")
raise ValueError(f"更新扩展字段失败: {str(e)}")
# 检查是否有实际的字段变更
if not updated_fields:
return {
2025-07-15 15:05:04 +08:00
"layer_name": layer_name,
2025-07-14 10:29:37 +08:00
"success": True,
"message": "没有字段发生变化,数据保持不变",
"updated_fields": [],
"updated_storage_location": OperatePointService._convert_to_storage_location_info(
layer, include_operate_point_info=True, include_extended_fields=True
)
}
# 更新最后修改时间
layer.updated_at = datetime.datetime.now()
updated_fields.append("updated_at")
# 提交更改
db.commit()
# 记录操作日志
try:
operator = OperatePointService._get_operator()
OperatePointService.create_storage_location_log(
db=db,
operator=operator,
operation_type="编辑库位",
2025-07-15 15:05:04 +08:00
affected_storage_locations=[layer_name],
2025-07-14 10:29:37 +08:00
description=f"编辑库位信息,更新字段: {', '.join(updated_fields)}"
)
except Exception as e:
logger.error(f"记录操作日志失败: {str(e)}")
# 获取更新后的库位信息
updated_storage_location = OperatePointService._convert_to_storage_location_info(
layer, include_operate_point_info=True, include_extended_fields=True
)
return {
2025-07-15 15:05:04 +08:00
"layer_name": layer_name,
2025-07-14 10:29:37 +08:00
"success": True,
"message": f"库位信息更新成功,共更新 {len(updated_fields)} 个字段",
"updated_fields": updated_fields,
"updated_storage_location": updated_storage_location
}
except Exception as e:
logger.error(f"编辑库位信息失败: {str(e)}")
db.rollback()
raise
@staticmethod
def create_storage_location_log(
db: Session,
operator: str,
operation_type: str,
affected_storage_locations: List[str],
description: Optional[str] = None
) -> StorageLocationLog:
"""
创建库位操作记录
Args:
db: 数据库会话
operator: 操作人
operation_type: 操作类型
affected_storage_locations: 影响的库位ID列表
description: 操作描述
Returns:
StorageLocationLog: 创建的操作记录
"""
try:
import json
# 创建操作记录
log = StorageLocationLog(
operator=operator,
operation_type=operation_type,
affected_storage_locations=json.dumps(affected_storage_locations, ensure_ascii=False),
description=description
)
db.add(log)
db.commit()
return log
except Exception as e:
logger.error(f"创建库位操作记录失败: {str(e)}")
db.rollback()
raise
@staticmethod
def get_storage_location_logs(db: Session, request: StorageLocationLogListRequest) -> StorageLocationLogListResponse:
"""
获取库位操作记录列表
Args:
db: 数据库会话
request: 操作记录查询请求
Returns:
StorageLocationLogListResponse: 操作记录列表响应
"""
try:
# 构建查询条件
query = db.query(StorageLocationLog).filter(StorageLocationLog.is_deleted == False)
# 根据库位ID筛选
if request.storage_location_id:
query = query.filter(StorageLocationLog.affected_storage_locations.like(f'%"{request.storage_location_id}"%'))
# 根据操作人筛选(模糊搜索)
if request.operator:
query = query.filter(StorageLocationLog.operator.like(f"%{request.operator}%"))
# 根据操作类型筛选
if request.operation_type:
query = query.filter(StorageLocationLog.operation_type == request.operation_type)
# 根据时间范围筛选
if request.start_time:
query = query.filter(StorageLocationLog.operation_time >= request.start_time)
if request.end_time:
query = query.filter(StorageLocationLog.operation_time <= request.end_time)
# 按操作时间倒序排列
query = query.order_by(StorageLocationLog.operation_time.desc())
# 获取总数
total = query.count()
# 分页
offset = (request.page - 1) * request.page_size
logs = query.offset(offset).limit(request.page_size).all()
# 转换为响应对象
log_infos = []
for log in logs:
log_info = OperatePointService._convert_to_storage_location_log_info(log)
log_infos.append(log_info)
# 计算总页数
total_pages = math.ceil(total / request.page_size) if total > 0 else 0
return StorageLocationLogListResponse(
total=total,
page=request.page,
page_size=request.page_size,
total_pages=total_pages,
logs=log_infos
)
except Exception as e:
logger.error(f"获取库位操作记录列表失败: {str(e)}")
raise
@staticmethod
def _convert_to_storage_location_log_info(log: StorageLocationLog) -> StorageLocationLogInfo:
"""
将数据库模型转换为库位操作记录信息响应对象
Args:
log: 库位操作记录数据库模型
Returns:
StorageLocationLogInfo: 操作记录信息
"""
try:
import json
# 解析影响的库位ID列表
affected_storage_locations = []
if log.affected_storage_locations:
try:
affected_storage_locations = json.loads(log.affected_storage_locations)
except Exception as e:
logger.error(f"解析影响库位列表失败: {str(e)}")
affected_storage_locations = []
return StorageLocationLogInfo(
id=str(log.id),
operation_time=log.operation_time,
operator=log.operator,
operation_type=log.operation_type,
affected_storage_locations=affected_storage_locations,
description=log.description,
created_at=log.created_at
)
except Exception as e:
logger.error(f"转换库位操作记录信息失败: {str(e)}")
raise