2025-04-30 16:57:46 +08:00
|
|
|
|
#!/usr/bin/env python
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
天风任务服务层
|
|
|
|
|
处理天风任务相关的业务逻辑
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import uuid
|
|
|
|
|
import datetime
|
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
from sqlalchemy import and_, or_, desc, asc
|
|
|
|
|
|
|
|
|
|
from data.models.taskdef import VWEDTaskDef
|
|
|
|
|
from data.models.taskrecord import VWEDTaskRecord
|
|
|
|
|
from data.models.tasktemplate import VWEDTaskTemplate
|
|
|
|
|
from data.enum.task_def_enum import TaskTypeEnum, TaskStatusEnum, EnableStatus, PeriodicTaskStatus
|
|
|
|
|
from utils.logger import get_logger
|
2025-05-12 15:43:21 +08:00
|
|
|
|
from data.session import get_async_session
|
|
|
|
|
from sqlalchemy import select, update
|
|
|
|
|
from data.enum.task_record_enum import TaskStatus
|
2025-04-30 16:57:46 +08:00
|
|
|
|
# 设置日志
|
|
|
|
|
logger = get_logger("services.task_service")
|
|
|
|
|
|
|
|
|
|
# 自定义异常类
|
|
|
|
|
class TaskNameExistsError(Exception):
|
|
|
|
|
"""任务名称已存在异常"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
class TaskService:
|
|
|
|
|
"""
|
|
|
|
|
天风任务服务类
|
|
|
|
|
提供天风任务的相关业务操作
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def get_task_list(
|
|
|
|
|
db: Session,
|
|
|
|
|
page_num: int = 1,
|
|
|
|
|
page_size: int = 10,
|
|
|
|
|
keyword: Optional[str] = None,
|
|
|
|
|
status: Optional[int] = None,
|
|
|
|
|
sort_field: Optional[str] = None,
|
|
|
|
|
sort_order: Optional[str] = "desc"
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
获取任务列表
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
db: 数据库会话
|
|
|
|
|
page_num: 页码,从1开始
|
|
|
|
|
page_size: 每页记录数
|
|
|
|
|
keyword: 关键字(任务名称),模糊匹配
|
|
|
|
|
status: 状态过滤
|
|
|
|
|
sort_field: 排序字段
|
|
|
|
|
sort_order: 排序方式:asc(升序)、desc(降序)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 包含任务列表和分页信息
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 构建查询条件
|
|
|
|
|
query = db.query(VWEDTaskDef)
|
|
|
|
|
|
|
|
|
|
# 添加筛选条件
|
|
|
|
|
if keyword:
|
|
|
|
|
query = query.filter(VWEDTaskDef.label.like(f'%{keyword}%'))
|
|
|
|
|
|
|
|
|
|
if status is not None:
|
|
|
|
|
query = query.filter(VWEDTaskDef.status == status)
|
|
|
|
|
|
|
|
|
|
# 计算总数
|
|
|
|
|
total = query.count()
|
|
|
|
|
|
|
|
|
|
# 添加排序
|
|
|
|
|
if sort_field and sort_field in VWEDTaskDef.__table__.columns:
|
|
|
|
|
direction = desc if sort_order == "desc" else asc
|
|
|
|
|
query = query.order_by(direction(getattr(VWEDTaskDef, sort_field)))
|
|
|
|
|
else:
|
|
|
|
|
# 默认按创建时间倒序
|
|
|
|
|
query = query.order_by(desc(VWEDTaskDef.created_at))
|
|
|
|
|
|
|
|
|
|
# 分页处理
|
|
|
|
|
offset = (page_num - 1) * page_size
|
|
|
|
|
tasks = query.offset(offset).limit(page_size).all()
|
|
|
|
|
|
|
|
|
|
# 格式化返回结果
|
|
|
|
|
task_list = []
|
|
|
|
|
for task in tasks:
|
|
|
|
|
task_list.append({
|
|
|
|
|
"id": task.id,
|
|
|
|
|
"label": task.label,
|
|
|
|
|
"version": task.version,
|
|
|
|
|
"detail": task.detail,
|
|
|
|
|
"templateName": task.template_name,
|
|
|
|
|
"periodicTask": bool(task.periodic_task),
|
|
|
|
|
"ifEnable": task.if_enable,
|
|
|
|
|
"status": task.status,
|
|
|
|
|
"createDate": task.created_at,
|
|
|
|
|
"remark": task.remark,
|
|
|
|
|
"tenant_id": task.tenant_id,
|
|
|
|
|
"updated_at": task.updated_at
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"total": total,
|
|
|
|
|
"list": task_list,
|
|
|
|
|
"pageNum": page_num,
|
|
|
|
|
"pageSize": page_size,
|
|
|
|
|
"totalPages": (total + page_size - 1) // page_size
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"获取任务列表失败: {str(e)}")
|
|
|
|
|
raise Exception(f"获取任务列表失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def create_task(
|
|
|
|
|
db: Session,
|
|
|
|
|
label: str,
|
|
|
|
|
task_type: int,
|
|
|
|
|
remark: Optional[str] = None,
|
|
|
|
|
period: Optional[int] = None,
|
|
|
|
|
delay: Optional[int] = 3000,
|
|
|
|
|
release_sites: Optional[bool] = True,
|
|
|
|
|
token: Optional[str] = None,
|
|
|
|
|
tenant_id: str = "default",
|
|
|
|
|
map_id: Optional[str] = None
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
创建任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
db: 数据库会话
|
|
|
|
|
label: 任务名称
|
|
|
|
|
task_type: 任务类型:1-普通任务,2-定时任务
|
|
|
|
|
remark: 任务备注
|
|
|
|
|
period: 周期时间(毫秒),定时任务必填
|
|
|
|
|
delay: 延迟时间(毫秒),默认3000
|
|
|
|
|
release_sites: 是否释放站点,默认true
|
|
|
|
|
token: 用户token值,用于认证
|
|
|
|
|
tenant_id: 租户ID,用于多租户隔离,默认为"default"
|
|
|
|
|
map_id: 相关地图ID
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 新建任务信息
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ValueError: 参数验证失败时
|
|
|
|
|
TaskNameExistsError: 任务名称已存在时
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 验证任务名称
|
|
|
|
|
if not label or label.strip() == "":
|
|
|
|
|
raise ValueError("任务名称不能为空")
|
|
|
|
|
|
|
|
|
|
# 验证任务名称长度
|
|
|
|
|
if len(label.strip()) < 2:
|
|
|
|
|
raise ValueError("任务名称至少需要2个字符")
|
|
|
|
|
|
|
|
|
|
# 检查任务名称是否已存在
|
|
|
|
|
existing_task = db.query(VWEDTaskDef).filter(VWEDTaskDef.label == label.strip()).first()
|
|
|
|
|
if existing_task:
|
|
|
|
|
raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称")
|
|
|
|
|
|
|
|
|
|
# 验证任务类型
|
|
|
|
|
if task_type not in [TaskTypeEnum.NORMAL, TaskTypeEnum.PERIODIC]:
|
|
|
|
|
raise ValueError("任务类型无效,应为1(普通任务)或2(定时任务)")
|
|
|
|
|
|
|
|
|
|
# 验证定时任务参数
|
|
|
|
|
if task_type == TaskTypeEnum.PERIODIC and not period:
|
|
|
|
|
raise ValueError("定时任务必须指定周期时间")
|
|
|
|
|
|
2025-07-14 10:29:37 +08:00
|
|
|
|
# # 获取当前启用的模板
|
|
|
|
|
# template = db.query(VWEDTaskTemplate).filter(
|
|
|
|
|
# VWEDTaskTemplate.template_if_enable == 1
|
|
|
|
|
# ).first()
|
2025-04-30 16:57:46 +08:00
|
|
|
|
|
2025-07-14 10:29:37 +08:00
|
|
|
|
# if not template:
|
|
|
|
|
# raise ValueError("没有可用的任务模板,请先启用一个模板")
|
2025-04-30 16:57:46 +08:00
|
|
|
|
|
|
|
|
|
# 创建任务定义
|
|
|
|
|
task_id = str(uuid.uuid4())
|
|
|
|
|
new_task = VWEDTaskDef(
|
|
|
|
|
id=task_id,
|
|
|
|
|
label=label.strip(), # 去除首尾空白字符
|
|
|
|
|
version=1,
|
|
|
|
|
detail=json.dumps({
|
|
|
|
|
"inputParams": [],
|
|
|
|
|
"outputParams": [],
|
|
|
|
|
"rootBlock": None
|
|
|
|
|
}, ensure_ascii=False),
|
2025-07-14 10:29:37 +08:00
|
|
|
|
template_name="用户自有模板",
|
2025-04-30 16:57:46 +08:00
|
|
|
|
period=period if period else 3000,
|
|
|
|
|
periodic_task=PeriodicTaskStatus.PERIODIC if task_type == TaskTypeEnum.PERIODIC else PeriodicTaskStatus.NON_PERIODIC,
|
|
|
|
|
status=TaskStatusEnum.PENDING,
|
|
|
|
|
if_enable=EnableStatus.DISABLED,
|
|
|
|
|
delay=delay,
|
|
|
|
|
release_sites=release_sites,
|
|
|
|
|
remark=remark,
|
|
|
|
|
tenant_id=tenant_id, # 使用传入的租户ID
|
|
|
|
|
user_token=token, # 保存用户token
|
|
|
|
|
map_id=map_id # 保存地图ID
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
db.add(new_task)
|
|
|
|
|
db.commit()
|
|
|
|
|
db.refresh(new_task)
|
|
|
|
|
|
|
|
|
|
# 返回任务信息
|
|
|
|
|
return {
|
|
|
|
|
"id": new_task.id,
|
|
|
|
|
"label": new_task.label,
|
|
|
|
|
"version": new_task.version,
|
|
|
|
|
"detail": new_task.detail,
|
|
|
|
|
"templateName": new_task.template_name,
|
|
|
|
|
"periodicTask": bool(new_task.periodic_task),
|
|
|
|
|
"status": new_task.status,
|
|
|
|
|
"createDate": new_task.created_at,
|
|
|
|
|
"mapId": new_task.map_id # 返回地图ID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except TaskNameExistsError as e:
|
|
|
|
|
db.rollback()
|
|
|
|
|
logger.warning(f"创建任务失败: {str(e)}")
|
|
|
|
|
raise # 重新抛出异常,不包装
|
|
|
|
|
except Exception as e:
|
|
|
|
|
db.rollback()
|
|
|
|
|
logger.error(f"创建任务失败: {str(e)}")
|
|
|
|
|
raise Exception(f"创建任务失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def delete_tasks(db: Session, ids: List[str]) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
删除任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
db: 数据库会话
|
|
|
|
|
ids: 要删除的任务ID列表
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 是否删除成功
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 检查任务是否存在
|
|
|
|
|
tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all()
|
|
|
|
|
if len(tasks) != len(ids):
|
|
|
|
|
missing_ids = set(ids) - {task.id for task in tasks}
|
|
|
|
|
raise ValueError(f"以下任务不存在: {', '.join(missing_ids)}")
|
|
|
|
|
|
|
|
|
|
# 删除任务
|
|
|
|
|
for task in tasks:
|
|
|
|
|
db.delete(task)
|
|
|
|
|
|
|
|
|
|
db.commit()
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
db.rollback()
|
|
|
|
|
logger.error(f"删除任务失败: {str(e)}")
|
|
|
|
|
raise Exception(f"删除任务失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def get_task_by_id(db: Session, task_id: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""
|
|
|
|
|
根据ID获取任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
db: 数据库会话
|
|
|
|
|
task_id: 任务ID
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Optional[Dict[str, Any]]: 任务信息,如果不存在则返回None
|
|
|
|
|
"""
|
|
|
|
|
try:
|
2025-05-12 15:43:21 +08:00
|
|
|
|
# 显式启动事务
|
2025-04-30 16:57:46 +08:00
|
|
|
|
task = db.query(VWEDTaskDef).filter(VWEDTaskDef.id == task_id).first()
|
|
|
|
|
if not task:
|
|
|
|
|
return None
|
2025-05-12 15:43:21 +08:00
|
|
|
|
|
2025-04-30 16:57:46 +08:00
|
|
|
|
# 格式化任务信息
|
|
|
|
|
task_info = {
|
|
|
|
|
"id": task.id,
|
|
|
|
|
"label": task.label,
|
|
|
|
|
"version": task.version,
|
|
|
|
|
"detail": json.loads(task.detail) if task.detail else {},
|
|
|
|
|
"templateName": task.template_name,
|
|
|
|
|
"period": task.period,
|
|
|
|
|
"periodicTask": bool(task.periodic_task),
|
|
|
|
|
"status": task.status,
|
|
|
|
|
"ifEnable": task.if_enable,
|
|
|
|
|
"delay": task.delay,
|
|
|
|
|
"releaseSites": bool(task.release_sites) if task.release_sites is not None else None,
|
|
|
|
|
"createDate": task.created_at,
|
|
|
|
|
"remark": task.remark,
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-12 15:43:21 +08:00
|
|
|
|
# 提交事务以释放数据库锁
|
|
|
|
|
db.commit()
|
2025-04-30 16:57:46 +08:00
|
|
|
|
return task_info
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-05-12 15:43:21 +08:00
|
|
|
|
# 发生异常时回滚事务
|
|
|
|
|
db.rollback()
|
2025-04-30 16:57:46 +08:00
|
|
|
|
logger.error(f"获取任务失败: {str(e)}")
|
|
|
|
|
raise Exception(f"获取任务失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def export_tasks(db: Session, ids: List[str]) -> List[Dict[str, Any]]:
|
|
|
|
|
"""
|
|
|
|
|
导出任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
db: 数据库会话
|
|
|
|
|
ids: 任务ID列表
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List[Dict[str, Any]]: 任务配置列表
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
tasks = db.query(VWEDTaskDef).filter(VWEDTaskDef.id.in_(ids)).all()
|
|
|
|
|
if not tasks:
|
|
|
|
|
raise ValueError(f"未找到指定的任务")
|
|
|
|
|
|
|
|
|
|
# 格式化任务信息
|
|
|
|
|
tasks_export = []
|
|
|
|
|
for task in tasks:
|
|
|
|
|
task_export = {
|
|
|
|
|
"id": task.id,
|
|
|
|
|
"label": task.label,
|
|
|
|
|
"version": task.version,
|
|
|
|
|
"detail": task.detail,
|
|
|
|
|
"templateName": task.template_name,
|
|
|
|
|
"templateDescription": db.query(VWEDTaskTemplate.template_description)
|
|
|
|
|
.filter(VWEDTaskTemplate.template_name == task.template_name)
|
|
|
|
|
.scalar() or "",
|
|
|
|
|
"period": task.period,
|
|
|
|
|
"periodicTask": task.periodic_task,
|
|
|
|
|
"status": task.status,
|
|
|
|
|
"ifEnable": task.if_enable,
|
|
|
|
|
"delay": task.delay,
|
|
|
|
|
"remark": task.remark,
|
2025-07-14 10:29:37 +08:00
|
|
|
|
"map_id": task.map_id,
|
2025-04-30 16:57:46 +08:00
|
|
|
|
"windcategoryId": 0 # 默认分类ID
|
|
|
|
|
}
|
|
|
|
|
tasks_export.append(task_export)
|
|
|
|
|
|
|
|
|
|
return tasks_export
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"导出任务失败: {str(e)}")
|
|
|
|
|
raise Exception(f"导出任务失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def import_task(
|
|
|
|
|
db: Session,
|
|
|
|
|
task_data: Dict[str, Any],
|
|
|
|
|
task_name: Optional[str] = None,
|
|
|
|
|
token: Optional[str] = None,
|
|
|
|
|
tenant_id: str = "default"
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
导入任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
db: 数据库会话
|
|
|
|
|
task_data: 任务配置数据
|
|
|
|
|
task_name: 导入后的任务名称,不填则使用原名称
|
|
|
|
|
token: 用户token,用于认证
|
|
|
|
|
tenant_id: 租户ID,默认为"default"
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 导入的任务信息
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ValueError: 参数验证失败时
|
|
|
|
|
TaskNameExistsError: 任务名称已存在时
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 创建新ID避免冲突
|
|
|
|
|
new_id = str(uuid.uuid4())
|
|
|
|
|
|
|
|
|
|
# 设置任务名称
|
|
|
|
|
label = task_name or task_data.get("label", "导入的任务")
|
|
|
|
|
# 获取detail字段,这是唯一必需的字段
|
|
|
|
|
detail = task_data.get("detail")
|
|
|
|
|
if not detail:
|
|
|
|
|
raise ValueError("任务数据缺少detail字段,无法导入")
|
|
|
|
|
|
|
|
|
|
# 检查任务名称是否已存在
|
|
|
|
|
existing_task = db.query(VWEDTaskDef).filter(
|
|
|
|
|
VWEDTaskDef.label == label.strip(),
|
|
|
|
|
VWEDTaskDef.tenant_id == tenant_id
|
|
|
|
|
).first()
|
|
|
|
|
if existing_task:
|
|
|
|
|
raise TaskNameExistsError(f"任务名称 '{label}' 已存在,请使用其他名称")
|
|
|
|
|
|
|
|
|
|
# 获取当前启用的模板
|
|
|
|
|
template = db.query(VWEDTaskTemplate).filter(
|
|
|
|
|
VWEDTaskTemplate.template_if_enable == 1
|
|
|
|
|
).first()
|
|
|
|
|
|
|
|
|
|
if not template:
|
|
|
|
|
template_name = task_data.get("templateName", "默认模板")
|
|
|
|
|
else:
|
|
|
|
|
template_name = template.template_name
|
|
|
|
|
|
|
|
|
|
# 创建新任务
|
|
|
|
|
new_task = VWEDTaskDef(
|
|
|
|
|
id=new_id,
|
|
|
|
|
label=label,
|
|
|
|
|
version=1, # 新任务从版本1开始
|
|
|
|
|
detail=detail,
|
|
|
|
|
template_name=template_name,
|
|
|
|
|
period=task_data.get("period", 3000),
|
|
|
|
|
periodic_task=task_data.get("periodicTask", PeriodicTaskStatus.NON_PERIODIC),
|
|
|
|
|
status=TaskStatusEnum.PENDING, # 初始状态为0
|
|
|
|
|
if_enable=EnableStatus.DISABLED, # 初始未启用
|
|
|
|
|
delay=task_data.get("delay", 3000),
|
|
|
|
|
release_sites=task_data.get("releaseSites", True), # 默认释放站点
|
|
|
|
|
remark=task_data.get("remark", ""),
|
2025-07-14 10:29:37 +08:00
|
|
|
|
map_id=task_data.get("map_id"),
|
2025-04-30 16:57:46 +08:00
|
|
|
|
tenant_id=tenant_id,
|
|
|
|
|
user_token=token
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
db.add(new_task)
|
|
|
|
|
db.commit()
|
|
|
|
|
db.refresh(new_task)
|
|
|
|
|
|
|
|
|
|
# 返回任务信息
|
|
|
|
|
return {
|
|
|
|
|
"id": new_task.id,
|
|
|
|
|
"label": new_task.label,
|
|
|
|
|
"version": new_task.version,
|
|
|
|
|
"status": new_task.status,
|
|
|
|
|
"createDate": new_task.created_at
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except TaskNameExistsError as e:
|
|
|
|
|
db.rollback()
|
|
|
|
|
logger.warning(f"导入任务失败: {str(e)}")
|
|
|
|
|
raise # 重新抛出异常,不包装
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
db.rollback()
|
|
|
|
|
logger.error(f"导入任务失败: {str(e)}")
|
2025-05-12 15:43:21 +08:00
|
|
|
|
raise Exception(f"导入任务失败: {str(e)}")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
async def stop_task_def(task_def_id: str) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
停止指定任务定义下的所有运行任务实例,同时禁用定时任务
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task_def_id: 任务定义ID
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
字典包含停止结果信息:
|
|
|
|
|
- success: 是否操作成功
|
|
|
|
|
- message: 操作结果消息
|
|
|
|
|
- is_periodic: 是否为定时任务
|
|
|
|
|
- total_running: 运行中的任务总数
|
|
|
|
|
- stopped_count: 成功停止的任务数量
|
|
|
|
|
- failed_count: 停止失败的任务数量
|
|
|
|
|
- failed_tasks: 停止失败的任务记录ID列表
|
|
|
|
|
"""
|
|
|
|
|
# 导入增强版调度器
|
|
|
|
|
from services.enhanced_scheduler import scheduler
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
async with get_async_session() as session:
|
|
|
|
|
# 查询任务定义
|
|
|
|
|
result = await session.execute(
|
|
|
|
|
select(VWEDTaskDef).where(VWEDTaskDef.id == task_def_id)
|
|
|
|
|
)
|
|
|
|
|
task_def = result.scalars().first()
|
|
|
|
|
|
|
|
|
|
if not task_def:
|
|
|
|
|
return {
|
|
|
|
|
"success": False,
|
|
|
|
|
"message": f"未找到任务定义: {task_def_id}"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
is_periodic = task_def.periodic_task == PeriodicTaskStatus.PERIODIC
|
|
|
|
|
|
|
|
|
|
# 初始化计数器
|
|
|
|
|
total_running = 0
|
|
|
|
|
stopped_count = 0
|
|
|
|
|
failed_count = 0
|
|
|
|
|
failed_tasks = []
|
|
|
|
|
|
|
|
|
|
# 如果是定时任务则禁用
|
|
|
|
|
if is_periodic and task_def.if_enable == EnableStatus.ENABLED:
|
|
|
|
|
# 更新任务定义状态为禁用
|
|
|
|
|
await session.execute(
|
|
|
|
|
update(VWEDTaskDef)
|
|
|
|
|
.where(VWEDTaskDef.id == task_def_id)
|
|
|
|
|
.values(if_enable=EnableStatus.DISABLED)
|
|
|
|
|
)
|
|
|
|
|
await session.commit()
|
|
|
|
|
# 通知调度器
|
|
|
|
|
update_result = await scheduler.update_periodic_task(task_def_id, enable=False)
|
|
|
|
|
|
|
|
|
|
# 将定时任务的禁用也计入停止任务的数量
|
|
|
|
|
total_running += 1
|
|
|
|
|
if update_result.get("success", True): # 假设通知调度器成功,除非明确返回失败
|
|
|
|
|
stopped_count += 1
|
|
|
|
|
else:
|
|
|
|
|
failed_count += 1
|
|
|
|
|
failed_tasks.append({
|
|
|
|
|
"taskRecordId": "periodic_" + task_def_id,
|
|
|
|
|
"reason": update_result.get("message", "禁用定时任务失败")
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# 查找所有正在运行的任务记录
|
|
|
|
|
running_tasks_query = await session.execute(
|
|
|
|
|
select(VWEDTaskRecord)
|
|
|
|
|
.where(
|
|
|
|
|
VWEDTaskRecord.def_id == task_def_id,
|
|
|
|
|
VWEDTaskRecord.status == TaskStatus.RUNNING # 执行中状态码
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
running_tasks = running_tasks_query.scalars().all()
|
|
|
|
|
|
|
|
|
|
# 更新总计数
|
|
|
|
|
total_running += len(running_tasks)
|
|
|
|
|
|
|
|
|
|
# 取消所有运行中的任务
|
|
|
|
|
for task_record in running_tasks:
|
|
|
|
|
cancel_result = await scheduler.cancel_task(task_record.id)
|
|
|
|
|
|
|
|
|
|
if cancel_result.get("success", False):
|
|
|
|
|
stopped_count += 1
|
|
|
|
|
else:
|
|
|
|
|
failed_count += 1
|
|
|
|
|
failed_tasks.append({
|
|
|
|
|
"taskRecordId": task_record.id,
|
|
|
|
|
"reason": cancel_result.get("message", "未知原因")
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# 更新任务定义状态为已结束(0)
|
|
|
|
|
await session.execute(
|
|
|
|
|
update(VWEDTaskDef)
|
|
|
|
|
.where(VWEDTaskDef.id == task_def_id)
|
|
|
|
|
.values(status=TaskStatusEnum.PENDING)
|
|
|
|
|
)
|
|
|
|
|
await session.commit()
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"success": True,
|
|
|
|
|
"message": "操作完成",
|
|
|
|
|
"is_periodic": is_periodic,
|
|
|
|
|
"total_running": total_running,
|
|
|
|
|
"stopped_count": stopped_count,
|
|
|
|
|
"failed_count": failed_count,
|
|
|
|
|
"failed_tasks": failed_tasks
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return {
|
|
|
|
|
"success": False,
|
|
|
|
|
"message": f"停止任务失败: {str(e)}"
|
|
|
|
|
}
|