#!/usr/bin/env python # -*- coding: utf-8 -*- """ 任务运行记录服务模块 提供任务运行记录相关的服务方法 """ import json from typing import Dict, List, Any, Optional from sqlalchemy import select, and_ import datetime from data.models.blockrecord import VWEDBlockRecord from data.models.taskrecord import VWEDTaskRecord from data.models.tasklog import VWEDTaskLog from data.session import get_async_session from utils.logger import get_logger from data.enum.task_block_record_enum import TaskBlockRecordStatus # 设置日志 logger = get_logger("service.task_record_service") class TaskRecordService: """ 任务运行记录服务类 提供与任务运行记录相关的方法 """ @staticmethod async def get_task_blocks(task_record_id: str) -> Dict[str, Any]: """ 获取指定任务记录下的所有块运行情况 Args: task_record_id: 任务记录ID Returns: Dict: 包含块运行情况的字典 """ try: async with get_async_session() as session: # 构建查询语句 query = select(VWEDBlockRecord).where( VWEDBlockRecord.task_record_id == task_record_id ).order_by(VWEDBlockRecord.started_on) # 执行查询 result = await session.execute(query) blocks = result.scalars().all() if not blocks: return { "success": True, "message": f"未找到任务记录 {task_record_id} 的块运行情况", "data": [] } # 转换为字典列表 block_list = [] for block in blocks: block_dict = { "id": block.id, "block_name": block.block_name, "block_id": block.block_id, "status": block.status, "started_on": block.started_on.isoformat() if block.started_on else None, "ended_on": block.ended_on.isoformat() if block.ended_on else None, "ended_reason": block.ended_reason, "block_execute_name": block.block_execute_name, "block_input_params_value": json.loads(block.block_input_params_value) if block.block_input_params_value else None, "block_out_params_value": json.loads(block.block_out_params_value) if block.block_out_params_value else None, "remark": block.remark } block_list.append(block_dict) return { "success": True, "message": f"成功获取任务记录 {task_record_id} 的块运行情况", "data": block_list } except Exception as e: logger.error(f"获取任务块运行情况失败: {str(e)}") return { "success": False, "message": f"获取任务块运行情况失败: {str(e)}", "data": [] } @staticmethod async def get_block_detail(block_record_id: str) -> Dict[str, Any]: """ 获取指定块记录的详细信息 Args: block_record_id: 块记录ID Returns: Dict: 包含块记录详细信息的字典 """ try: async with get_async_session() as session: # 构建查询语句 query = select(VWEDBlockRecord).where( VWEDBlockRecord.id == block_record_id ) # 执行查询 result = await session.execute(query) block = result.scalars().first() if not block: return { "success": False, "message": f"未找到ID为 {block_record_id} 的块记录", "data": None } # 转换为字典 block_dict = { "id": block.id, "block_name": block.block_name, "block_id": block.block_id, "block_config_id": block.block_config_id, "block_input_params": json.loads(block.block_input_params) if block.block_input_params else None, "block_input_params_value": json.loads(block.block_input_params_value) if block.block_input_params_value else None, "block_out_params_value": json.loads(block.block_out_params_value) if block.block_out_params_value else None, "block_internal_variables": json.loads(block.block_internal_variables) if block.block_internal_variables else None, "block_execute_name": block.block_execute_name, "task_id": block.task_id, "task_record_id": block.task_record_id, "started_on": block.started_on.isoformat() if block.started_on else None, "ended_on": block.ended_on.isoformat() if block.ended_on else None, "ended_reason": block.ended_reason, "status": block.status, "ctrl_status": block.ctrl_status, "input_params": json.loads(block.input_params) if block.input_params else None, "internal_variables": json.loads(block.internal_variables) if block.internal_variables else None, "output_params": json.loads(block.output_params) if block.output_params else None, "version": block.version, "remark": block.remark } return { "success": True, "message": "成功获取块记录详情", "data": block_dict } except Exception as e: logger.error(f"获取块记录详情失败: {str(e)}") return { "success": False, "message": f"获取块记录详情失败: {str(e)}", "data": None } @staticmethod async def stop_task_record(task_record_id: str) -> Dict[str, Any]: """ 停止指定任务记录下的所有运行任务实例,同时禁用定时任务 Args: task_record_id: 任务记录ID Returns: Dict: 包含停止结果的响应 """ # 导入增强版调度器 from services.enhanced_scheduler import scheduler from data.enum.task_record_enum import TaskStatus from datetime import datetime try: async with get_async_session() as session: # 查找所有正在运行的任务记录 running_tasks_query = await session.execute( select(VWEDTaskRecord) .where( VWEDTaskRecord.id == task_record_id, VWEDTaskRecord.status == TaskStatus.RUNNING # 执行中状态码 ) ) running_tasks = running_tasks_query.scalars().first() if not running_tasks: return { "success": True, "message": "任务记录中没有运行中的任务" } # 取消所有运行中的任务 cancel_result = await scheduler.cancel_task(task_record_id) if cancel_result.get("success", False): running_tasks.status = TaskStatus.CANCELED running_tasks.ended_on = datetime.now() running_tasks.ended_reason = "任务终止" await session.commit() return { "success": True, "message": "任务终止成功", "data": { "task_record_id": task_record_id, "status": running_tasks.status, "ended_on": running_tasks.ended_on, "ended_reason": running_tasks.ended_reason } } else: return { "success": False, "message": "任务终止失败", "data": cancel_result } except Exception as e: logger.error(f"任务记录终止失败: {str(e)}") return { "success": False, "message": f"任务记录终止失败: {str(e)}" } @staticmethod async def set_task_error(task_record_id: str, error_reason: str) -> Dict[str, Any]: """ 将指定任务记录及其相关任务块状态设置为错误状态 Args: task_record_id: 任务记录ID error_reason: 错误原因 Returns: Dict: 包含设置结果的响应 """ from services.enhanced_scheduler import scheduler from data.enum.task_record_enum import TaskStatus from datetime import datetime try: async with get_async_session() as session: # 查找任务记录 task_query = await session.execute( select(VWEDTaskRecord) .where(VWEDTaskRecord.id == task_record_id) ) task_record = task_query.scalars().first() if not task_record: return { "success": False, "message": f"未找到任务记录 {task_record_id}" } # 如果任务正在运行,先取消它 if task_record.status == TaskStatus.RUNNING: cancel_result = await scheduler.cancel_task(task_record_id) if not cancel_result.get("success", False): logger.warning(f"取消任务 {task_record_id} 失败: {cancel_result}") # 设置任务状态为失败 task_record.status = TaskStatus.FAILED task_record.ended_on = datetime.now() task_record.ended_reason = error_reason # 查找所有相关的任务块记录 blocks_query = await session.execute( select(VWEDBlockRecord) .where(VWEDBlockRecord.task_record_id == task_record_id) ) block_records = blocks_query.scalars().all() # 设置所有任务块状态为失败 updated_blocks = [] for block in block_records: # 只更新未完成的任务块 if block.status not in [TaskBlockRecordStatus.SUCCESS, TaskBlockRecordStatus.FAILED]: block.status = TaskBlockRecordStatus.FAILED block.ended_on = datetime.now() block.ended_reason = error_reason updated_blocks.append({ "block_id": block.id, "block_name": block.block_name, "status": block.status }) # 提交所有更改 await session.commit() return { "success": True, "message": "任务状态设置为错误成功", "data": { "task_record_id": task_record_id, "task_status": task_record.status, "error_reason": error_reason, "ended_on": task_record.ended_on, "updated_blocks_count": len(updated_blocks), "updated_blocks": updated_blocks } } except Exception as e: logger.error(f"设置任务错误状态失败: {str(e)}") return { "success": False, "message": f"设置任务错误状态失败: {str(e)}" } @staticmethod async def get_block_results(task_record_id: str) -> Dict[str, Any]: """ 获取指定任务记录的执行结果 Args: task_record_id: 任务记录ID Returns: Dict: 包含执行结果的响应 """ try: async with get_async_session() as session: # 构建查询语句 query = select(VWEDBlockRecord).where( VWEDBlockRecord.task_record_id == task_record_id, ).order_by(VWEDBlockRecord.ended_on.desc()) result = await session.execute(query) blocks = result.scalars().all() block_results = [] # 使用集合记录已处理的block_name,用于去重 processed_block_names = set() for block in blocks: # print("block_name::::", block.block_execute_name, block.ended_on, "=======================") # 如果block_name已经处理过,则跳过 if block.status != TaskBlockRecordStatus.SUCCESS: if block.block_name == "-1": block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.task_record_id, "status":block.status}) continue else: block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.ended_reason, "status":block.status}) else: if block.block_name == "-1": block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.task_record_id, "status":block.status}) continue task_logs_query = select(VWEDTaskLog).where( VWEDTaskLog.task_record_id == task_record_id, VWEDTaskLog.task_block_id == block.block_name ) if block.block_name in processed_block_names: continue # 记录已处理的block_name processed_block_names.add(block.block_name) task_logs = await session.execute(task_logs_query) # task_logs = task_logs.scalars().all() task_logs = task_logs.scalars().all() # print("task_logs::::", task_logs, len(task_logs), "---------------------------------------") if task_logs and len(task_logs) == 1: task_log = task_logs[0] messages = json.loads(task_log.message) message = messages.get("message", "") output = messages.get("output", "") if output and str(output.get("message", "")): block_results.append({"created_at":task_log.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output.get("message", "")), "status":block.status}) elif output: block_results.append({"created_at":task_log.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output), "status":block.status}) else: block_results.append({"created_at":task_log.created_at, "context":"["+block.block_execute_name+"] "+message, "status":block.status}) elif task_logs and len(task_logs) > 1: for task_log in task_logs: messages = json.loads(task_log.message) message = messages.get("message", "") output = messages.get("output", "") if output and str(output.get("message", "")): block_results.append({"created_at":task_log.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output.get("message", "")), "status":block.status}) elif output: block_results.append({"created_at":task_log.created_at, "context":"["+block.block_execute_name+"] "+message+"@"+str(output), "status":block.status}) else: block_results.append({"created_at":task_log.created_at, "context":"["+block.block_execute_name+"] "+message, "status":block.status}) else: block_results.append({"created_at":block.created_at, "context":"["+block.block_execute_name+"] "+block.ended_reason, "status":block.status}) logger.warning(f"任务记录 {task_record_id} 的块 {block.block_name} 没有日志") block_results.sort(key=lambda x: x["created_at"], reverse=True) if not blocks: return { "success": True, "message": f"未找到任务记录 {task_record_id} 的块运行情况", "data": [] } return { "success": True, "message": "成功获取任务记录执行结果", "data": block_results } except Exception as e: logger.error(f"获取任务记录执行结果失败: {str(e)}") return { "success": False, "message": f"获取任务记录执行结果失败: {str(e)}" } @staticmethod async def get_task_record_detail(task_record_id: str) -> Dict[str, Any]: """ 获取指定任务记录的详细信息 Args: task_record_id: 任务记录ID Returns: Dict: 包含任务记录详细信息的字典 """ try: async with get_async_session() as session: # 构建查询语句 query = select(VWEDTaskRecord).where( VWEDTaskRecord.id == task_record_id ) # 执行查询 result = await session.execute(query) task_record = result.scalars().first() if not task_record: return { "success": False, "message": f"未找到ID为 {task_record_id} 的任务记录", "data": None } # 计算执行时长(如果任务已结束) execution_time = None if task_record.ended_on and task_record.first_executor_time: time_diff = task_record.ended_on - task_record.first_executor_time execution_time = int(time_diff.total_seconds() * 1000) # 转换为毫秒 elif task_record.executor_time: execution_time = task_record.executor_time # 转换为字典 task_dict = { "id": task_record.id, "task_id": task_record.def_id, "task_name": task_record.def_label, "task_version": task_record.def_version, "status": task_record.status, "input_params": json.loads(task_record.input_params) if task_record.input_params else None, "started_on": task_record.first_executor_time.isoformat() if task_record.first_executor_time else None, "ended_on": task_record.ended_on.isoformat() if task_record.ended_on else None, "ended_reason": task_record.ended_reason, "execution_time": execution_time, "created_at": task_record.created_at.isoformat() if task_record.created_at else None, "updated_at": task_record.updated_at.isoformat() if task_record.updated_at else None, "agv_id": task_record.agv_id, "parent_task_record_id": task_record.parent_task_record_id, "root_task_record_id": task_record.root_task_record_id, "state_description": task_record.state_description, "if_have_child_task": bool(task_record.if_have_child_task) if task_record.if_have_child_task is not None else None, "periodic_task": task_record.periodic_task, "priority": task_record.priority, "work_stations": task_record.work_stations, "work_types": task_record.work_types, "variables": json.loads(task_record.variables) if task_record.variables else None, "source_type": task_record.source_type, "source_system": task_record.source_system, "source_user": task_record.source_user, "source_device": task_record.source_device, "source_ip": task_record.source_ip, "source_time": task_record.source_time.isoformat() if task_record.source_time else None, "source_client_info": task_record.source_client_info, "source_remarks": task_record.source_remarks } return { "success": True, "message": "成功获取任务记录详情", "data": task_dict } except Exception as e: logger.error(f"获取任务记录详情失败: {str(e)}") return { "success": False, "message": f"获取任务记录详情失败: {str(e)}", "data": None }