第14章 工作流引擎详解
作者
谭策 — 独立开发者 | AIOps 领域探索者
- 🌐 项目官网:ITOpsAgentinfo
- 📝 博客:zjzwfw.cloud
- 📧 邮箱:huawei_network@foxmail.com
- 💬 微信公众号:IT Online

许可证
MPL-2.0 © 谭策
本章导读
工作流引擎是 ITOps Agent Platform 实现自动化运维的核心组件。它将多个 Agent 节点编排为可视化的 DAG(有向无环图),按照拓扑顺序自动执行,实现复杂运维场景的流程化、自动化。本章深入剖析 workflowExecutor.ts 的执行引擎、节点类型、JSON 序列化存储、执行状态管理、错误处理、工作流模板以及定时调度等关键技术。
学习目标
- 掌握 workflowExecutor.ts 的核心执行逻辑
- 理解节点类型和执行流的拓扑排序
- 了解工作流 JSON 序列化与 SQLite 存储方案
- 熟悉工作流的四种执行状态及其转换
- 掌握错误处理和容错机制
- 学会创建工作流模板
- 理解定时工作流的调度机制
核心内容
14.1 核心执行引擎 workflowExecutor.ts
workflowExecutor.ts 是工作流的核心执行引擎,负责从数据库中加载工作流定义、解析节点和边、进行拓扑排序、逐节点执行 Agent、推送实时进度、记录执行日志、自动生成报告和故障案例入库。
执行流程总览:
executeWorkflow(taskId, workflow, initialInput, context)
│
├─ 解析 nodes 和 edges(JSON → 对象)
│
├─ topologicalSort() — 拓扑排序确定执行顺序
│
├─ UPDATE tasks SET status='running'
│
├─ emit('task:started') — 推送开始事件
│
├─ for nodeId in executionOrder:
│ │
│ ├─ emit('task:node:started')
│ │
│ ├─ getThinkingSteps() — 获取思考步骤
│ ├─ for step in thinkingSteps:
│ │ emit('task:node:thinking')
│ │
│ ├─ executeAgentNode() — 执行 Agent
│ │
│ ├─ emit('task:node:output')
│ ├─ emit('task:node:completed')
│ │
│ └─ addTaskLog() — 记录日志
│
├─ 故障案例自动存入知识库
│
├─ generateWorkflowExecutionReport() — 生成报告
│
├─ UPDATE tasks SET status='completed'
│
└─ emit('task:completed')export async function executeWorkflow(
taskId: string,
workflow: WorkflowParsed,
initialInput?: string,
context?: Record<string, unknown>
) {
const io = getIOInstance();
const MAX_EXECUTION_DEPTH = 50;
let executionDepth = 0;
const nodeResults: Record<string, NodeResult> = {};
const startTime = new Date().toISOString();
// 1. 解析工作流数据(支持 JSON 字符串和对象两种格式)
const nodes = Array.isArray(workflow.nodes)
? workflow.nodes
: JSON.parse(workflow.nodes as string || '[]');
const edges = Array.isArray(workflow.edges)
? workflow.edges
: JSON.parse(workflow.edges as string || '[]');
// 2. 拓扑排序
const executionOrder = topologicalSort(nodes, edges);
// 3. 更新任务状态为 running
db.prepare(
'UPDATE tasks SET status = ?, start_time = CURRENT_TIMESTAMP, execution_order = ? WHERE id = ?'
).run('running', JSON.stringify(executionOrder), taskId);
io?.to(`task:${taskId}`).emit('task:started', { taskId, executionOrder });
// 4. 逐节点执行
for (const nodeId of executionOrder) {
if (executionDepth++ >= MAX_EXECUTION_DEPTH) {
logger.error(`Workflow exceeded maximum execution depth`);
break;
}
// 支持任务取消
const task = db.prepare('SELECT status FROM tasks WHERE id = ?').get(taskId);
if (task?.status === 'cancelled') break;
const node = nodes.find(n => n.id === nodeId);
if (!node || node.type !== 'agent') continue;
try {
// 收集前面节点的输出作为当前节点的输入
const previousResults = Object.values(nodeResults)
.map(r => r.output).filter(Boolean).join('\n\n');
const input = previousResults || initialInput || '请开始执行任务';
// 推送思考进度
const thinkingProcess = getThinkingSteps(node.data.label);
for (const step of thinkingProcess) {
await delay(300);
io?.to(`task:${taskId}`).emit('task:node:thinking', {
taskId, nodeId, content: step
});
}
// 执行 Agent
const output = await executeAgentNode(node.data.agentId, input, context);
nodeResults[nodeId] = {
status: 'success',
output,
metadata: {
thinkingProcess: thinkingProcess.join('\n'),
executionTime: Date.now()
}
};
io?.to(`task:${taskId}`).emit('task:node:completed', {
taskId, nodeId, status: 'success', output
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
nodeResults[nodeId] = { status: 'failed', error: errorMessage };
io?.to(`task:${taskId}`).emit('task:node:completed', {
taskId, nodeId, status: 'failed', error: errorMessage
});
// 非容错节点失败时终止工作流
if (!node.data.allowFailure) {
throw error;
}
}
}
// 5. 更新任务状态
db.prepare(`
UPDATE tasks SET status = ?, end_time = CURRENT_TIMESTAMP,
node_results = ?, current_node_id = NULL
WHERE id = ?
`).run('completed', JSON.stringify(nodeResults), taskId);
// 6. 故障案例自动入库
saveFailedNodesToKnowledgeBase(nodeResults, nodes, workflow.name);
// 7. 生成执行报告
await generateWorkflowExecutionReport(taskId, workflow, nodes, nodeResults, executionOrder, 'completed');
io?.to(`task:${taskId}`).emit('task:completed', {
taskId, status: 'completed', nodeResults
});
}14.2 节点类型与执行流
节点数据结构:
interface WorkflowNode {
id: string;
type: string; // 当前仅支持 'agent'
data: {
label: string; // 节点显示名称
agentId: string; // 绑定的 Agent ID
allowFailure?: boolean; // 是否允许失败后继续
};
position: { x: number; y: number; }; // 画布坐标
}
interface WorkflowEdge {
id: string;
source: string; // 起始节点 ID
target: string; // 目标节点 ID
}节点类型体系:
┌─────────────────────────────────────────────┐
│ 节点类型 │
├────────────┬────────────────────────────────┤
│ agent │ 调用 Agent 执行任务(当前唯一)│
│ condition │ 条件分支(未来扩展) │
│ parallel │ 并行执行(未来扩展) │
│ delay │ 延迟等待(未来扩展) │
│ webhook │ 外部回调(未来扩展) │
└────────────┴────────────────────────────────┘上下文传递机制:
Node A (告警处理 Agent)
│ 输出: "告警摘要: CPU 使用率超过 90%..."
▼
Node B (故障诊断 Agent)
│ 输入: Node A 的输出
│ 输出: "可能原因: 进程泄漏...排查步骤: 检查进程列表"
▼
Node C (变更执行 Agent)
│ 输入: Node A + Node B 的输出
│ 输出: "已执行 kill 操作,CPU 恢复正常"
▼
Node D (文档生成 Agent)
│ 输入: 所有前面节点的输出
└─ 输出: 完整的故障处理报告节点之间的数据通过 previousResults 变量传递:
const previousResults = Object.values(nodeResults)
.map(r => r.output).filter(Boolean).join('\n\n');
const input = previousResults || initialInput || '请开始执行任务';14.3 工作流的 JSON 序列化与 SQLite 存储
工作流在数据库中仅占用 workflows 表的一行记录,其中 nodes 和 edges 以 JSON 字符串形式存储:
CREATE TABLE workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
nodes TEXT, -- JSON 字符串,存储节点数组
edges TEXT, -- JSON 字符串,存储边数组
agent_configs TEXT, -- JSON 字符串,存储 Agent 配置
is_template INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);存储示例:
{
"id": "wf-001",
"name": "故障诊断流程",
"description": "自动化的故障诊断工作流",
"nodes": "[{\"id\":\"n1\",\"type\":\"agent\",\"data\":{\"label\":\"告警处理 Agent\",\"agentId\":\"agent-1\"},\"position\":{\"x\":100,\"y\":100}},{\"id\":\"n2\",\"type\":\"agent\",\"data\":{\"label\":\"故障诊断 Agent\",\"agentId\":\"agent-2\"},\"position\":{\"x\":100,\"y\":300}}]",
"edges": "[{\"id\":\"e1\",\"source\":\"n1\",\"target\":\"n2\"}]",
"agent_configs": "{}",
"is_template": 0
}读取时解析:
const nodes = Array.isArray(workflow.nodes)
? workflow.nodes
: JSON.parse(workflow.nodes as string || '[]');
const edges = Array.isArray(workflow.edges)
? workflow.edges
: JSON.parse(workflow.edges as string || '[]');写入时序列化:
db.prepare(`
INSERT INTO workflows (id, name, description, nodes, edges, agent_configs, is_template)
VALUES (?, ?, ?, ?, ?, ?, ?)
`).run(
id, name, description,
JSON.stringify(nodes),
JSON.stringify(edges),
JSON.stringify(agentConfigs),
isTemplate ? 1 : 0
);14.4 执行状态管理
工作流及其节点有完整的状态机:
┌──────────────────────────────────────────────────┐
│ 任务状态机 │
│ │
│ pending ──► running ──► completed │
│ │ ▲ │
│ │ │ │
│ ▼ │ │
│ failed ───────────┘ (手动重试) │
│ │ │
│ ▼ │
│ cancelled │
└──────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────┐
│ 节点状态 │
│ │
│ pending ──► running ──► success │
│ │ │
│ ▼ │
│ failed (allowFailure? 继续 : 终止) │
└──────────────────────────────────────────────────┘tasks 表关键状态字段:
| 字段 | 类型 | 说明 |
|---|---|---|
| status | TEXT | pending / running / completed / failed / cancelled |
| start_time | TIMESTAMP | 开始时间 |
| end_time | TIMESTAMP | 结束时间 |
| execution_order | TEXT | JSON 数组,节点执行顺序 |
| node_results | TEXT | JSON 对象,每个节点的执行结果 |
| current_node_id | TEXT | 当前正在执行的节点 |
| logs | TEXT | JSON 数组,执行日志 |
| report_id | TEXT | 关联的执行报告 |
// 更新任务状态
db.prepare('UPDATE tasks SET status = ?, start_time = CURRENT_TIMESTAMP WHERE id = ?')
.run('running', taskId);
// 完成时更新
db.prepare(`
UPDATE tasks SET status = ?, end_time = CURRENT_TIMESTAMP,
node_results = ?, current_node_id = NULL
WHERE id = ?
`).run('completed', JSON.stringify(nodeResults), taskId);
// 失败时更新
db.prepare(`
UPDATE tasks SET status = ?, end_time = CURRENT_TIMESTAMP, current_node_id = NULL
WHERE id = ?
`).run('failed', taskId);14.5 错误处理与容错机制
节点级容错: 每个节点可以设置 allowFailure 标志:
try {
const output = await executeAgentNode(node.data.agentId, input, context);
nodeResults[nodeId] = { status: 'success', output };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
nodeResults[nodeId] = { status: 'failed', error: errorMessage };
// allowFailure = true 时,记录错误后继续执行下一个节点
if (!node.data.allowFailure) {
throw error; // 终止工作流
}
}最大执行深度保护:
const MAX_EXECUTION_DEPTH = 50;
for (const nodeId of executionOrder) {
if (executionDepth++ >= MAX_EXECUTION_DEPTH) {
logger.error('Workflow exceeded maximum execution depth');
break;
}
// ...
}任务取消检测:
const task = db.prepare('SELECT status FROM tasks WHERE id = ?').get(taskId);
if (task?.status === 'cancelled') break;故障案例自动入库:
工作流失败时,自动将故障信息存入知识库,形成运维经验积累:
// 检查是否存在重复的故障案例
function isDuplicateKnowledgeBase(content: string, threshold = 0.7): string | null {
const existing = db.prepare(
'SELECT id, content FROM knowledge_base WHERE category = ? ORDER BY created_at DESC LIMIT 50'
).all('故障处理');
for (const entry of existing) {
const similarity = calculateTextSimilarity(content, entry.content);
if (similarity >= threshold) return entry.id;
}
return null;
}
// 基于 Jaccard 相似度
function calculateTextSimilarity(text1: string, text2: string): number {
const set1 = new Set(text1.toLowerCase().replace(/[^\w\s]/g, '').split(/\s+/));
const set2 = new Set(text2.toLowerCase().replace(/[^\w\s]/g, '').split(/\s+/));
const intersection = new Set([...set1].filter(x => set2.has(x)));
const union = new Set([...set1, ...set2]);
return union.size === 0 ? 1 : intersection.size / union.size;
}
// 将故障节点信息存入知识库
failedNodes.forEach(nodeResult => {
const duplicateId = isDuplicateKnowledgeBase(content);
if (duplicateId) {
logger.info('跳过重复的故障案例');
return;
}
db.prepare(`
INSERT INTO knowledge_base (id, title, category, content, created_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
`).run(randomUUID(), title, '故障处理', content);
});14.6 工作流模板与创建
工作流模板 是预定义的可复用工作流,通过 is_template = 1 标识:
-- 查询所有工作流模板
SELECT * FROM workflows WHERE is_template = 1 ORDER BY created_at DESC;创建工作流的 API 流程:
// workflowRoutes.ts
router.post('/', async (req: Request, res: Response) => {
const { name, description, nodes, edges, agent_configs, is_template } = req.body;
const id = randomUUID();
db.prepare(`
INSERT INTO workflows (id, name, description, nodes, edges, agent_configs, is_template, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
`).run(
id, name, description,
JSON.stringify(nodes),
JSON.stringify(edges),
JSON.stringify(agent_configs || {}),
is_template ? 1 : 0
);
res.status(201).json({ success: true, data: { id, name } });
});预设工作流模板 在数据库初始化时创建,包括:
| 模板名称 | 节点 | 用途 |
|---|---|---|
| 故障诊断流程 | 告警处理 → 故障诊断 → 日志分析 → 系统巡检 | 自动化故障定位 |
| 系统健康检查 | 系统巡检 → 合规检查 → 服务器命令执行 | 全面系统检查 |
| 事件响应流程 | 告警处理 → 故障诊断 → 变更执行 → 文档生成 | 标准事件响应 |
基于模板创建工作流:
// 复制模板并实例化
router.post('/from-template/:templateId', async (req: Request, res: Response) => {
const template = db.prepare('SELECT * FROM workflows WHERE id = ?').get(req.params.templateId);
const { name, ...overrides } = req.body;
const id = randomUUID();
db.prepare(`
INSERT INTO workflows (id, name, description, nodes, edges, agent_configs, is_template)
VALUES (?, ?, ?, ?, ?, ?, 0)
`).run(id, name, template.description, template.nodes, template.edges, template.agent_configs);
});14.7 定时工作流执行
schedulerService.ts 基于 node-schedule 库实现定时工作流调度:
┌─────────────────────────────────────────────┐
│ SchedulerService │
│ │
│ scheduled_tasks 表 ──► node-schedule Job │
│ │ │ │
│ │ schedule │ 触发 │
│ │ (cron表达式) │ │
│ │ ▼ │
│ │ executeWorkflow() │
│ │ │ │
│ │ 创建 task 记录 │
│ │ 执行工作流引擎 │
│ │ 更新执行状态 │
│ │ 记录审计日志 │
└───────┴─────────────────────────────────────┘定时任务数据结构:
CREATE TABLE scheduled_tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
schedule TEXT NOT NULL, -- Cron 表达式
workflow_id TEXT REFERENCES workflows(id),
enabled INTEGER DEFAULT 1,
last_run TIMESTAMP,
last_status TEXT,
next_run TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);调度核心逻辑:
import { scheduleJob, Job } from 'node-schedule';
class SchedulerService {
private jobs: Map<string, Job> = new Map();
private runningWorkflows: Set<string> = new Set();
init() {
// 加载所有启用的定时任务
const tasks = db.prepare(
'SELECT * FROM scheduled_tasks WHERE enabled = 1'
).all();
tasks.forEach(task => this.scheduleTask(task));
// 数据库定期维护(每天凌晨 3 点)
this.initDatabaseMaintenance();
}
scheduleTask(task: ScheduledTaskRecord) {
this.cancelTask(task.id);
const job = scheduleJob(task.schedule, async () => {
if (task.workflow_id) {
await this.executeWorkflow(task);
}
// 记录执行结果
db.prepare(`
UPDATE scheduled_tasks
SET last_run = CURRENT_TIMESTAMP, last_status = ?
WHERE id = ?
`).run('success', task.id);
// 审计日志
db.prepare(`
INSERT INTO audit_logs (id, action, resource_type, resource_id, details)
VALUES (?, ?, ?, ?, ?)
`).run(randomUUID(), 'execute_scheduled_task', 'scheduled_task', task.id, details);
});
this.jobs.set(task.id, job);
}
async executeWorkflow(task: ScheduledTaskRecord) {
const workflowId = task.workflow_id;
// 防止并发执行
if (this.runningWorkflows.has(workflowId)) {
throw new Error(`Workflow ${workflowId} is already running`);
}
this.runningWorkflows.add(workflowId);
// 解析工作流
const parsedWorkflow: WorkflowParsed = {
id: workflow.id,
name: workflow.name,
nodes: JSON.parse(workflow.nodes),
edges: JSON.parse(workflow.edges),
// ...
};
// 创建任务记录
const taskId = randomUUID();
db.prepare(`
INSERT INTO tasks (id, workflow_id, name, status, created_at)
VALUES (?, ?, ?, 'pending', CURRENT_TIMESTAMP)
`).run(taskId, workflowId, `定时执行: ${workflow.name}`);
// 执行工作流
await executeWorkflow(taskId, parsedWorkflow);
this.runningWorkflows.delete(workflowId);
}
}Cron 表达式示例:
| 表达式 | 含义 |
|---|---|
0 2 * * * | 每天凌晨 2 点执行 |
*/30 * * * * | 每 30 分钟执行 |
0 9 * * 1-5 | 工作日早上 9 点执行 |
0 0 * * 0 | 每周日凌晨执行 |
数据库维护任务:
private initDatabaseMaintenance() {
const maintenanceJob = scheduleJob('0 3 * * *', async () => {
performMaintenance('analyze'); // 分析统计信息
performMaintenance('integrity_check'); // 检查完整性
// 每周日执行 VACUUM
if (new Date().getDay() === 0) {
performMaintenance('vacuum');
}
});
this.jobs.set('db-maintenance', maintenanceJob);
}本章小结
本章全面介绍了 ITOps Agent Platform 的工作流引擎。workflowExecutor.ts 作为核心执行引擎,通过拓扑排序确定节点执行顺序,逐节点调用 Agent 执行任务,并通过 WebSocket 实时推送进度。工作流的 nodes 和 edges 以 JSON 格式存储在 SQLite 中,实现了灵活的可视化编排。系统提供了四种任务状态和节点级容错机制(allowFailure),并在故障时自动将案例存入知识库。schedulerService.ts 基于 node-schedule 实现定时工作流调度,支持 Cron 表达式和并发控制。
本章练习
基础练习
状态转换描述:描述一个工作流从创建到执行完成的完整状态转换过程。
拓扑排序练习:给定以下节点和边,写出执行顺序:
- 节点:A(0,0), B(100,200), C(200,100)
- 边:A→B, A→C
JSON 解析:写一段代码将工作流 JSON 字符串解析为 WorkflowNode 数组。
进阶练习
容错设计:设计一个包含 5 个节点的工作流,其中第 3 个节点允许失败。描述当第 3 个节点失败时,系统的行为。
定时任务配置:编写一个 Cron 表达式和数据库 INSERT 语句,创建一个"每周一早上 9 点执行系统巡检工作流"的定时任务。
执行报告分析:阅读 generateWorkflowExecutionReport 函数,解释报告中包含哪些信息,以及这些信息如何帮助运维人员理解工作流执行结果。
思考题
拓扑排序的局限性:当前工作流引擎使用线性拓扑排序执行节点。如果需要在 DAG 中实现真正的并行执行(无依赖的节点同时执行),应该如何设计?
工作流 vs 脚本:对于简单的运维任务(如在 10 台服务器上执行一个命令),使用工作流引擎和使用 shell 脚本相比,各有什么优劣?在什么场景下应该选择工作流引擎?
延伸阅读
- workflowExecutor.ts — 工作流执行引擎
- schedulerService.ts — 定时任务调度
- workflowTopologyService.ts — 工作流拓扑排序服务
- 第12章-工作流编辑器开发 — 前端工作流编辑器实现
