Skip to content

第14章 工作流引擎详解

作者

谭策 — 独立开发者 | AIOps 领域探索者

IT Online 微信公众号

许可证

MPL-2.0 © 谭策

本章导读

工作流引擎是 ITOps Agent Platform 实现自动化运维的核心组件。它将多个 Agent 节点编排为可视化的 DAG(有向无环图),按照拓扑顺序自动执行,实现复杂运维场景的流程化、自动化。本章深入剖析 workflowExecutor.ts 的执行引擎、节点类型、JSON 序列化存储、执行状态管理、错误处理、工作流模板以及定时调度等关键技术。

学习目标

  • 掌握 workflowExecutor.ts 的核心执行逻辑
  • 理解节点类型和执行流的拓扑排序
  • 了解工作流 JSON 序列化与 SQLite 存储方案
  • 熟悉工作流的四种执行状态及其转换
  • 掌握错误处理和容错机制
  • 学会创建工作流模板
  • 理解定时工作流的调度机制

核心内容

14.1 核心执行引擎 workflowExecutor.ts

workflowExecutor.ts 是工作流的核心执行引擎,负责从数据库中加载工作流定义、解析节点和边、进行拓扑排序、逐节点执行 Agent、推送实时进度、记录执行日志、自动生成报告和故障案例入库。

执行流程总览:

executeWorkflow(taskId, workflow, initialInput, context)

         ├─ 解析 nodes 和 edges(JSON → 对象)

         ├─ topologicalSort() — 拓扑排序确定执行顺序

         ├─ UPDATE tasks SET status='running'

         ├─ emit('task:started') — 推送开始事件

         ├─ for nodeId in executionOrder:
         │   │
         │   ├─ emit('task:node:started')
         │   │
         │   ├─ getThinkingSteps() — 获取思考步骤
         │   ├─ for step in thinkingSteps:
         │   │   emit('task:node:thinking')
         │   │
         │   ├─ executeAgentNode() — 执行 Agent
         │   │
         │   ├─ emit('task:node:output')
         │   ├─ emit('task:node:completed')
         │   │
         │   └─ addTaskLog() — 记录日志

         ├─ 故障案例自动存入知识库

         ├─ generateWorkflowExecutionReport() — 生成报告

         ├─ UPDATE tasks SET status='completed'

         └─ emit('task:completed')
typescript
export async function executeWorkflow(
  taskId: string,
  workflow: WorkflowParsed,
  initialInput?: string,
  context?: Record<string, unknown>
) {
  const io = getIOInstance();
  const MAX_EXECUTION_DEPTH = 50;
  let executionDepth = 0;
  const nodeResults: Record<string, NodeResult> = {};
  const startTime = new Date().toISOString();

  // 1. 解析工作流数据(支持 JSON 字符串和对象两种格式)
  const nodes = Array.isArray(workflow.nodes)
    ? workflow.nodes
    : JSON.parse(workflow.nodes as string || '[]');

  const edges = Array.isArray(workflow.edges)
    ? workflow.edges
    : JSON.parse(workflow.edges as string || '[]');

  // 2. 拓扑排序
  const executionOrder = topologicalSort(nodes, edges);

  // 3. 更新任务状态为 running
  db.prepare(
    'UPDATE tasks SET status = ?, start_time = CURRENT_TIMESTAMP, execution_order = ? WHERE id = ?'
  ).run('running', JSON.stringify(executionOrder), taskId);

  io?.to(`task:${taskId}`).emit('task:started', { taskId, executionOrder });

  // 4. 逐节点执行
  for (const nodeId of executionOrder) {
    if (executionDepth++ >= MAX_EXECUTION_DEPTH) {
      logger.error(`Workflow exceeded maximum execution depth`);
      break;
    }

    // 支持任务取消
    const task = db.prepare('SELECT status FROM tasks WHERE id = ?').get(taskId);
    if (task?.status === 'cancelled') break;

    const node = nodes.find(n => n.id === nodeId);
    if (!node || node.type !== 'agent') continue;

    try {
      // 收集前面节点的输出作为当前节点的输入
      const previousResults = Object.values(nodeResults)
        .map(r => r.output).filter(Boolean).join('\n\n');

      const input = previousResults || initialInput || '请开始执行任务';

      // 推送思考进度
      const thinkingProcess = getThinkingSteps(node.data.label);
      for (const step of thinkingProcess) {
        await delay(300);
        io?.to(`task:${taskId}`).emit('task:node:thinking', {
          taskId, nodeId, content: step
        });
      }

      // 执行 Agent
      const output = await executeAgentNode(node.data.agentId, input, context);

      nodeResults[nodeId] = {
        status: 'success',
        output,
        metadata: {
          thinkingProcess: thinkingProcess.join('\n'),
          executionTime: Date.now()
        }
      };

      io?.to(`task:${taskId}`).emit('task:node:completed', {
        taskId, nodeId, status: 'success', output
      });

    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : String(error);
      nodeResults[nodeId] = { status: 'failed', error: errorMessage };

      io?.to(`task:${taskId}`).emit('task:node:completed', {
        taskId, nodeId, status: 'failed', error: errorMessage
      });

      // 非容错节点失败时终止工作流
      if (!node.data.allowFailure) {
        throw error;
      }
    }
  }

  // 5. 更新任务状态
  db.prepare(`
    UPDATE tasks SET status = ?, end_time = CURRENT_TIMESTAMP,
        node_results = ?, current_node_id = NULL
    WHERE id = ?
  `).run('completed', JSON.stringify(nodeResults), taskId);

  // 6. 故障案例自动入库
  saveFailedNodesToKnowledgeBase(nodeResults, nodes, workflow.name);

  // 7. 生成执行报告
  await generateWorkflowExecutionReport(taskId, workflow, nodes, nodeResults, executionOrder, 'completed');

  io?.to(`task:${taskId}`).emit('task:completed', {
    taskId, status: 'completed', nodeResults
  });
}

14.2 节点类型与执行流

节点数据结构:

typescript
interface WorkflowNode {
  id: string;
  type: string;          // 当前仅支持 'agent'
  data: {
    label: string;       // 节点显示名称
    agentId: string;     // 绑定的 Agent ID
    allowFailure?: boolean; // 是否允许失败后继续
  };
  position: { x: number; y: number; }; // 画布坐标
}

interface WorkflowEdge {
  id: string;
  source: string;        // 起始节点 ID
  target: string;        // 目标节点 ID
}

节点类型体系:

┌─────────────────────────────────────────────┐
│              节点类型                        │
├────────────┬────────────────────────────────┤
│ agent      │ 调用 Agent 执行任务(当前唯一)│
│ condition  │ 条件分支(未来扩展)           │
│ parallel   │ 并行执行(未来扩展)           │
│ delay      │ 延迟等待(未来扩展)           │
│ webhook    │ 外部回调(未来扩展)           │
└────────────┴────────────────────────────────┘

上下文传递机制:

Node A (告警处理 Agent)
    │ 输出: "告警摘要: CPU 使用率超过 90%..."

Node B (故障诊断 Agent)
    │ 输入: Node A 的输出
    │ 输出: "可能原因: 进程泄漏...排查步骤: 检查进程列表"

Node C (变更执行 Agent)
    │ 输入: Node A + Node B 的输出
    │ 输出: "已执行 kill 操作,CPU 恢复正常"

Node D (文档生成 Agent)
    │ 输入: 所有前面节点的输出
    └─ 输出: 完整的故障处理报告

节点之间的数据通过 previousResults 变量传递:

typescript
const previousResults = Object.values(nodeResults)
  .map(r => r.output).filter(Boolean).join('\n\n');

const input = previousResults || initialInput || '请开始执行任务';

14.3 工作流的 JSON 序列化与 SQLite 存储

工作流在数据库中仅占用 workflows 表的一行记录,其中 nodesedges 以 JSON 字符串形式存储:

sql
CREATE TABLE workflows (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  description TEXT,
  nodes TEXT,              -- JSON 字符串,存储节点数组
  edges TEXT,              -- JSON 字符串,存储边数组
  agent_configs TEXT,      -- JSON 字符串,存储 Agent 配置
  is_template INTEGER DEFAULT 0,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

存储示例:

json
{
  "id": "wf-001",
  "name": "故障诊断流程",
  "description": "自动化的故障诊断工作流",
  "nodes": "[{\"id\":\"n1\",\"type\":\"agent\",\"data\":{\"label\":\"告警处理 Agent\",\"agentId\":\"agent-1\"},\"position\":{\"x\":100,\"y\":100}},{\"id\":\"n2\",\"type\":\"agent\",\"data\":{\"label\":\"故障诊断 Agent\",\"agentId\":\"agent-2\"},\"position\":{\"x\":100,\"y\":300}}]",
  "edges": "[{\"id\":\"e1\",\"source\":\"n1\",\"target\":\"n2\"}]",
  "agent_configs": "{}",
  "is_template": 0
}

读取时解析:

typescript
const nodes = Array.isArray(workflow.nodes)
  ? workflow.nodes
  : JSON.parse(workflow.nodes as string || '[]');

const edges = Array.isArray(workflow.edges)
  ? workflow.edges
  : JSON.parse(workflow.edges as string || '[]');

写入时序列化:

typescript
db.prepare(`
  INSERT INTO workflows (id, name, description, nodes, edges, agent_configs, is_template)
  VALUES (?, ?, ?, ?, ?, ?, ?)
`).run(
  id, name, description,
  JSON.stringify(nodes),
  JSON.stringify(edges),
  JSON.stringify(agentConfigs),
  isTemplate ? 1 : 0
);

14.4 执行状态管理

工作流及其节点有完整的状态机:

┌──────────────────────────────────────────────────┐
│               任务状态机                          │
│                                                  │
│   pending ──► running ──► completed              │
│                 │              ▲                 │
│                 │              │                 │
│                 ▼              │                 │
│              failed ───────────┘ (手动重试)      │
│                 │                                │
│                 ▼                                │
│             cancelled                            │
└──────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────┐
│               节点状态                            │
│                                                  │
│   pending ──► running ──► success                │
│                 │                                │
│                 ▼                                │
│              failed (allowFailure? 继续 : 终止)   │
└──────────────────────────────────────────────────┘

tasks 表关键状态字段:

字段类型说明
statusTEXTpending / running / completed / failed / cancelled
start_timeTIMESTAMP开始时间
end_timeTIMESTAMP结束时间
execution_orderTEXTJSON 数组,节点执行顺序
node_resultsTEXTJSON 对象,每个节点的执行结果
current_node_idTEXT当前正在执行的节点
logsTEXTJSON 数组,执行日志
report_idTEXT关联的执行报告
typescript
// 更新任务状态
db.prepare('UPDATE tasks SET status = ?, start_time = CURRENT_TIMESTAMP WHERE id = ?')
  .run('running', taskId);

// 完成时更新
db.prepare(`
  UPDATE tasks SET status = ?, end_time = CURRENT_TIMESTAMP,
      node_results = ?, current_node_id = NULL
  WHERE id = ?
`).run('completed', JSON.stringify(nodeResults), taskId);

// 失败时更新
db.prepare(`
  UPDATE tasks SET status = ?, end_time = CURRENT_TIMESTAMP, current_node_id = NULL
  WHERE id = ?
`).run('failed', taskId);

14.5 错误处理与容错机制

节点级容错: 每个节点可以设置 allowFailure 标志:

typescript
try {
  const output = await executeAgentNode(node.data.agentId, input, context);
  nodeResults[nodeId] = { status: 'success', output };
} catch (error) {
  const errorMessage = error instanceof Error ? error.message : String(error);
  nodeResults[nodeId] = { status: 'failed', error: errorMessage };

  // allowFailure = true 时,记录错误后继续执行下一个节点
  if (!node.data.allowFailure) {
    throw error; // 终止工作流
  }
}

最大执行深度保护:

typescript
const MAX_EXECUTION_DEPTH = 50;
for (const nodeId of executionOrder) {
  if (executionDepth++ >= MAX_EXECUTION_DEPTH) {
    logger.error('Workflow exceeded maximum execution depth');
    break;
  }
  // ...
}

任务取消检测:

typescript
const task = db.prepare('SELECT status FROM tasks WHERE id = ?').get(taskId);
if (task?.status === 'cancelled') break;

故障案例自动入库:

工作流失败时,自动将故障信息存入知识库,形成运维经验积累:

typescript
// 检查是否存在重复的故障案例
function isDuplicateKnowledgeBase(content: string, threshold = 0.7): string | null {
  const existing = db.prepare(
    'SELECT id, content FROM knowledge_base WHERE category = ? ORDER BY created_at DESC LIMIT 50'
  ).all('故障处理');

  for (const entry of existing) {
    const similarity = calculateTextSimilarity(content, entry.content);
    if (similarity >= threshold) return entry.id;
  }
  return null;
}

// 基于 Jaccard 相似度
function calculateTextSimilarity(text1: string, text2: string): number {
  const set1 = new Set(text1.toLowerCase().replace(/[^\w\s]/g, '').split(/\s+/));
  const set2 = new Set(text2.toLowerCase().replace(/[^\w\s]/g, '').split(/\s+/));
  const intersection = new Set([...set1].filter(x => set2.has(x)));
  const union = new Set([...set1, ...set2]);
  return union.size === 0 ? 1 : intersection.size / union.size;
}

// 将故障节点信息存入知识库
failedNodes.forEach(nodeResult => {
  const duplicateId = isDuplicateKnowledgeBase(content);
  if (duplicateId) {
    logger.info('跳过重复的故障案例');
    return;
  }
  db.prepare(`
    INSERT INTO knowledge_base (id, title, category, content, created_at)
    VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
  `).run(randomUUID(), title, '故障处理', content);
});

14.6 工作流模板与创建

工作流模板 是预定义的可复用工作流,通过 is_template = 1 标识:

sql
-- 查询所有工作流模板
SELECT * FROM workflows WHERE is_template = 1 ORDER BY created_at DESC;

创建工作流的 API 流程:

typescript
// workflowRoutes.ts
router.post('/', async (req: Request, res: Response) => {
  const { name, description, nodes, edges, agent_configs, is_template } = req.body;

  const id = randomUUID();

  db.prepare(`
    INSERT INTO workflows (id, name, description, nodes, edges, agent_configs, is_template, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
  `).run(
    id, name, description,
    JSON.stringify(nodes),
    JSON.stringify(edges),
    JSON.stringify(agent_configs || {}),
    is_template ? 1 : 0
  );

  res.status(201).json({ success: true, data: { id, name } });
});

预设工作流模板 在数据库初始化时创建,包括:

模板名称节点用途
故障诊断流程告警处理 → 故障诊断 → 日志分析 → 系统巡检自动化故障定位
系统健康检查系统巡检 → 合规检查 → 服务器命令执行全面系统检查
事件响应流程告警处理 → 故障诊断 → 变更执行 → 文档生成标准事件响应

基于模板创建工作流:

typescript
// 复制模板并实例化
router.post('/from-template/:templateId', async (req: Request, res: Response) => {
  const template = db.prepare('SELECT * FROM workflows WHERE id = ?').get(req.params.templateId);
  const { name, ...overrides } = req.body;

  const id = randomUUID();
  db.prepare(`
    INSERT INTO workflows (id, name, description, nodes, edges, agent_configs, is_template)
    VALUES (?, ?, ?, ?, ?, ?, 0)
  `).run(id, name, template.description, template.nodes, template.edges, template.agent_configs);
});

14.7 定时工作流执行

schedulerService.ts 基于 node-schedule 库实现定时工作流调度:

┌─────────────────────────────────────────────┐
│             SchedulerService                │
│                                             │
│  scheduled_tasks 表 ──► node-schedule Job   │
│       │                       │             │
│       │ schedule              │ 触发        │
│       │ (cron表达式)          │             │
│       │                       ▼             │
│       │               executeWorkflow()     │
│       │                       │             │
│       │              创建 task 记录          │
│       │              执行工作流引擎          │
│       │              更新执行状态            │
│       │              记录审计日志            │
└───────┴─────────────────────────────────────┘

定时任务数据结构:

sql
CREATE TABLE scheduled_tasks (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  description TEXT,
  schedule TEXT NOT NULL,        -- Cron 表达式
  workflow_id TEXT REFERENCES workflows(id),
  enabled INTEGER DEFAULT 1,
  last_run TIMESTAMP,
  last_status TEXT,
  next_run TIMESTAMP,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

调度核心逻辑:

typescript
import { scheduleJob, Job } from 'node-schedule';

class SchedulerService {
  private jobs: Map<string, Job> = new Map();
  private runningWorkflows: Set<string> = new Set();

  init() {
    // 加载所有启用的定时任务
    const tasks = db.prepare(
      'SELECT * FROM scheduled_tasks WHERE enabled = 1'
    ).all();

    tasks.forEach(task => this.scheduleTask(task));

    // 数据库定期维护(每天凌晨 3 点)
    this.initDatabaseMaintenance();
  }

  scheduleTask(task: ScheduledTaskRecord) {
    this.cancelTask(task.id);

    const job = scheduleJob(task.schedule, async () => {
      if (task.workflow_id) {
        await this.executeWorkflow(task);
      }

      // 记录执行结果
      db.prepare(`
        UPDATE scheduled_tasks
        SET last_run = CURRENT_TIMESTAMP, last_status = ?
        WHERE id = ?
      `).run('success', task.id);

      // 审计日志
      db.prepare(`
        INSERT INTO audit_logs (id, action, resource_type, resource_id, details)
        VALUES (?, ?, ?, ?, ?)
      `).run(randomUUID(), 'execute_scheduled_task', 'scheduled_task', task.id, details);
    });

    this.jobs.set(task.id, job);
  }

  async executeWorkflow(task: ScheduledTaskRecord) {
    const workflowId = task.workflow_id;

    // 防止并发执行
    if (this.runningWorkflows.has(workflowId)) {
      throw new Error(`Workflow ${workflowId} is already running`);
    }

    this.runningWorkflows.add(workflowId);

    // 解析工作流
    const parsedWorkflow: WorkflowParsed = {
      id: workflow.id,
      name: workflow.name,
      nodes: JSON.parse(workflow.nodes),
      edges: JSON.parse(workflow.edges),
      // ...
    };

    // 创建任务记录
    const taskId = randomUUID();
    db.prepare(`
      INSERT INTO tasks (id, workflow_id, name, status, created_at)
      VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)
    `).run(taskId, workflowId, `定时执行: ${workflow.name}`);

    // 执行工作流
    await executeWorkflow(taskId, parsedWorkflow);

    this.runningWorkflows.delete(workflowId);
  }
}

Cron 表达式示例:

表达式含义
0 2 * * *每天凌晨 2 点执行
*/30 * * * *每 30 分钟执行
0 9 * * 1-5工作日早上 9 点执行
0 0 * * 0每周日凌晨执行

数据库维护任务:

typescript
private initDatabaseMaintenance() {
  const maintenanceJob = scheduleJob('0 3 * * *', async () => {
    performMaintenance('analyze');         // 分析统计信息
    performMaintenance('integrity_check'); // 检查完整性

    // 每周日执行 VACUUM
    if (new Date().getDay() === 0) {
      performMaintenance('vacuum');
    }
  });
  this.jobs.set('db-maintenance', maintenanceJob);
}

本章小结

本章全面介绍了 ITOps Agent Platform 的工作流引擎。workflowExecutor.ts 作为核心执行引擎,通过拓扑排序确定节点执行顺序,逐节点调用 Agent 执行任务,并通过 WebSocket 实时推送进度。工作流的 nodes 和 edges 以 JSON 格式存储在 SQLite 中,实现了灵活的可视化编排。系统提供了四种任务状态和节点级容错机制(allowFailure),并在故障时自动将案例存入知识库。schedulerService.ts 基于 node-schedule 实现定时工作流调度,支持 Cron 表达式和并发控制。

本章练习

基础练习

  1. 状态转换描述:描述一个工作流从创建到执行完成的完整状态转换过程。

  2. 拓扑排序练习:给定以下节点和边,写出执行顺序:

    • 节点:A(0,0), B(100,200), C(200,100)
    • 边:A→B, A→C
  3. JSON 解析:写一段代码将工作流 JSON 字符串解析为 WorkflowNode 数组。

进阶练习

  1. 容错设计:设计一个包含 5 个节点的工作流,其中第 3 个节点允许失败。描述当第 3 个节点失败时,系统的行为。

  2. 定时任务配置:编写一个 Cron 表达式和数据库 INSERT 语句,创建一个"每周一早上 9 点执行系统巡检工作流"的定时任务。

  3. 执行报告分析:阅读 generateWorkflowExecutionReport 函数,解释报告中包含哪些信息,以及这些信息如何帮助运维人员理解工作流执行结果。

思考题

  1. 拓扑排序的局限性:当前工作流引擎使用线性拓扑排序执行节点。如果需要在 DAG 中实现真正的并行执行(无依赖的节点同时执行),应该如何设计?

  2. 工作流 vs 脚本:对于简单的运维任务(如在 10 台服务器上执行一个命令),使用工作流引擎和使用 shell 脚本相比,各有什么优劣?在什么场景下应该选择工作流引擎?

延伸阅读

基于 MPL-2.0 许可证发布