第16章 告警中心与通知系统
作者
谭策 — 独立开发者 | AIOps 领域探索者
- 🌐 项目官网:ITOpsAgentinfo
- 📝 博客:zjzwfw.cloud
- 📧 邮箱:huawei_network@foxmail.com
- 💬 微信公众号:IT Online

许可证
MPL-2.0 © 谭策
本章导读
IT 运维的核心职责之一是保障系统的稳定运行,而告警系统是发现问题的第一道防线。一个优秀的告警系统不仅要能及时发现异常,还要能够:
- 从多种监控源(Prometheus、Zabbix、Grafana、云平台)统一接收告警
- 智能降噪,避免告警风暴导致运维人员麻木
- 通过多渠道(企业微信、钉钉、邮件、Webhook)将告警及时推送给相关人员
- 自动匹配修复策略,实现无人值守的故障自愈
ITOps Agent Platform 的告警中心正是围绕这些目标设计的。本章将深入剖析告警接收、归一化、降噪、通知、自动修复的完整链路。
学习目标
- 理解告警规则引擎的设计(阈值检查、冷却期、多渠道分发)
- 掌握多告警源适配器的归一化策略(Prometheus/Zabbix/Grafana/Aliyun/Tencent)
- 理解告警降噪的指纹算法与自动抑制机制
- 掌握多渠道通知系统的实现(企业微信/钉钉/邮件/Webhook/WebSocket)
- 理解自动修复引擎的完整流程(策略匹配→冷却/限流→工作流执行→验证→回滚)
- 学会设计告警统计与降噪效果分析
16.1 告警系统整体架构
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Prometheus │ │ Zabbix │ │ Grafana │ │ Aliyun │ │ Tencent │
│ Webhook │ │ Webhook │ │ Webhook │ │ Cloud │ │ Cloud │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │ │ │
└────────────────┴────────────────┴────────────────┴────────────────┘
│
┌─────────▼──────────┐
│ detectSourceType() │ ← 自动识别告警来源
└─────────┬──────────┘
│
┌────────────────────────────────┼────────────────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────────────┐ ┌──────▼──────┐ ┌──────────────┐ ┌──────▼──────┐
│adaptPrometheus│ │adaptZabbix │ │adaptGrafana │ │adaptAliyun │ │adaptTencent │
└──────┬──────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │ │ │
└───────────────┴───────┬────────┴────────────────┴────────────────┘
│
┌────────────▼────────────┐
│ NormalizedAlert 统一格式 │
│ {source, severity, │
│ title, content, ...} │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ AlertNoiseReduction │ ← 指纹生成、去重、抑制
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ AlertService 规则引擎 │ ← 内存指标检查
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ NotificationService │ ← 多渠道通知
│ WeChat/DingTalk/Email │
│ Webhook/WebSocket │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ RemediationService │ ← 自动修复
│ 策略匹配→工作流执行 │
│ 验证/回滚/审批 │
└─────────────────────────┘16.1.1 数据标准化流程
所有外部告警源进入系统后,统一转换为 NormalizedAlert 格式:
export interface NormalizedAlert {
external_id?: string; // 外部系统唯一ID
source: string; // 来源: prometheus/zabbix/grafana/aliyun/tencent
severity: string; // 统一严重程度: critical/high/medium/low/info
title: string; // 告警标题
content: string; // 告警内容
metadata: Record<string, unknown>; // 原始元数据
status: 'firing' | 'resolved'; // 告警状态
host?: string; // 关联主机
labels?: Record<string, string>; // 标签
annotations?: Record<string, string>; // 注解
starts_at?: string; // 开始时间
ends_at?: string; // 结束时间
}16.2 告警源适配器(AlertSourceAdapters)
16.2.1 严重程度归一化
不同监控系统对严重程度的定义各不相同。normalizeSeverity 函数将它们统一映射为五级标准:
function normalizeSeverity(level: string | number): string {
if (typeof level === 'number') {
if (level >= 5) return 'critical'; // 数字5+ → 灾难级
if (level >= 4) return 'high'; // 数字4 → 高级
if (level >= 3) return 'medium'; // 数字3 → 中级
if (level >= 2) return 'low'; // 数字2 → 低级
return 'info'; // 数字1 → 信息级
}
const map: Record<string, string> = {
critical: 'critical', critical_severity: 'critical', disaster: 'critical',
high: 'high', error: 'high',
warning: 'medium', warn: 'medium', average: 'medium',
information: 'low', info: 'low', low: 'low',
not_classified: 'info',
};
return map[level.toLowerCase()] || 'medium'; // 默认 medium
}严重程度映射表:
| 标准级别 | Prometheus | Zabbix | Grafana | Aliyun | 数字值 |
|---|---|---|---|---|---|
| critical | critical | disaster | critical | critical | ≥5 |
| high | - | high/error | high | high | 4 |
| medium | warning | average/warning | warning | medium | 3 |
| low | info | info/low | info | low | 2 |
| info | not_classified | - | - | not_classified | 1 |
16.2.2 Prometheus 适配器
Prometheus Alertmanager 的 Webhook 格式:
export function adaptPrometheus(payload: unknown): AlertAdapterResult {
const body = payload as { alerts?: unknown[]; version?: string; groupKey?: string };
const rawAlerts = Array.isArray(body.alerts) ? body.alerts : [];
for (const raw of rawAlerts) {
const alert = raw as Record<string, unknown>;
const labels = (alert.labels || {}) as Record<string, string>;
const annotations = (alert.annotations || {}) as Record<string, string>;
const status = (alert.status as string) || 'firing';
alerts.push({
external_id: `${labels.alertname || 'unknown'}-${labels.instance || ''}-${status}`,
source: 'prometheus',
severity: normalizeSeverity(labels.severity || 'medium'),
title: annotations.summary || labels.alertname || 'Prometheus Alert',
content: annotations.description || annotations.message || JSON.stringify(alert),
metadata: {
prometheus_version: body.version,
group_key: body.groupKey,
labels,
annotations,
starts_at: alert.startsAt,
ends_at: alert.endsAt,
generator_url: labels.generatorURL,
},
status: status === 'resolved' ? 'resolved' : 'firing',
host: labels.instance || labels.node || labels.host,
labels, annotations
});
}
return { alerts, errors };
}16.2.3 Zabbix 适配器
Zabbix 使用独特的宏(Macro)格式:
export function adaptZabbix(payload: unknown): AlertAdapterResult {
const body = payload as Record<string, unknown>;
const triggerObj = body.TRIGGER as Record<string, unknown> | undefined;
const hostObj = body.HOST as Record<string, unknown> | undefined;
const eventObj = body.event as Record<string, unknown> | undefined;
const itemObj = body.ITEM as Record<string, unknown> | undefined;
const trigger = (triggerObj?.NAME as string) || (body.trigger as string);
const host = (hostObj?.NAME as string) || (body.host as string) || 'Unknown';
const rawSeverity = (triggerObj?.SEVERITY as string) || (triggerObj?.PRIORITY as string | number);
const severity = normalizeSeverity(rawSeverity);
// Zabbix 通过 event value 判断 resolved(value=0 表示恢复)
const eventValue = (body.EVENT as Record<string, unknown>)?.VALUE;
const isResolved = eventValue === '0';
const content = [
`Host: ${host}`,
`Trigger: ${trigger}`,
item ? `Item: ${item}` : '',
itemValue ? `Value: ${itemValue}` : '',
`Severity: ${severity}`,
].filter(Boolean).join('\n');
alerts.push({
external_id: eventId ? `zabbix-${eventId}` : undefined,
source: 'zabbix',
severity, title: `[${severity.toUpperCase()}] ${trigger}`,
content, status: isResolved ? 'resolved' : 'firing', host
});
return { alerts, errors };
}16.2.4 Grafana 适配器
export function adaptGrafana(payload: unknown): AlertAdapterResult {
const body = payload as Record<string, unknown>;
// Grafana 可能是数组或单条
const rawAlerts = Array.isArray(body.alerts) ? body.alerts : [body];
for (const raw of rawAlerts) {
const alert = raw as Record<string, unknown>;
const status = (alert.state || alert.status) as string;
const isResolved = status === 'Normal' || status === 'OK' || status === 'Resolved';
const labels = (alert.labels || {}) as Record<string, string>;
const annotations = (alert.annotations || {}) as Record<string, string>;
alerts.push({
external_id: alert.ruleUID ? `grafana-${alert.ruleUID}` : undefined,
source: 'grafana',
severity: normalizeSeverity(labels.severity || 'medium'),
title: (alert.ruleName || annotations.title || 'Grafana Alert') as string,
content: (alert.message || annotations.description || JSON.stringify(alert)) as string,
metadata: {
grafana_rule_uid: alert.ruleUID,
grafana_folder: alert.folder,
eval_matches: alert.evalMatches,
image_url: alert.imageUrl,
},
status: isResolved ? 'resolved' : 'firing',
host: labels.instance || labels.host || labels.server,
});
}
return { alerts, errors };
}16.2.5 自动来源检测
当 Webhook 请求到达时,系统通过特征字段自动识别告警来源:
export function detectSourceType(payload: unknown): string {
const body = payload as Record<string, unknown>;
// Prometheus: alerts 数组 + labels/annotations/startsAt
if (body.alerts && Array.isArray(body.alerts)) {
const firstAlert = body.alerts[0] as Record<string, unknown>;
if (firstAlert?.labels || firstAlert?.annotations || firstAlert?.startsAt) return 'prometheus';
if (firstAlert?.state || firstAlert?.ruleName || firstAlert?.ruleUID) return 'grafana';
}
// Zabbix: TRIGGER/HOST/eventid/triggerid
if (body.TRIGGER || body.HOST || body.eventid || body.triggerid) return 'zabbix';
// Aliyun: product/productName/alertLevel/dimensions
if (body.product || body.productName || body.alertLevel || body.dimensions) return 'aliyun';
// Tencent: alarmName/alarmType/policyName
if (body.alarmName || body.alarmType || body.policyName) return 'tencent';
return 'generic';
}16.2.6 来源检测决策树
payload
│
├─ alerts 数组存在?
│ ├─ 有 labels/annotations/startsAt ──► prometheus
│ └─ 有 state/ruleName/ruleUID ──────► grafana
│
├─ TRIGGER/HOST/eventid/triggerid 存在?─► zabbix
├─ product/productName/alertLevel 存在?─► aliyun
├─ alarmName/alarmType/policyName 存在?─► tencent
└─ 以上都不匹配 ────────────────────────► generic16.3 告警规则引擎(AlertService)
16.3.1 内置规则定义
AlertService 预定义了 5 条系统级告警规则:
const DEFAULT_ALERT_RULES: AlertRule[] = [
{
id: 'high-memory-usage',
name: 'High Memory Usage',
severity: 'critical',
condition: 'memory_percent',
threshold: 90,
channels: ['log', 'webhook'],
cooldownMs: 300000 // 5分钟冷却
},
{
id: 'high-cpu-usage',
name: 'High CPU Usage',
severity: 'warning',
condition: 'cpu_percent',
threshold: 85,
channels: ['log'],
cooldownMs: 300000
},
{
id: 'database-slow',
name: 'Slow Database Response',
severity: 'critical',
condition: 'db_latency',
threshold: 1000, // 1秒
channels: ['log', 'webhook'],
cooldownMs: 60000 // 1分钟冷却(数据库告警更频繁)
},
{
id: 'high-error-rate',
name: 'High Error Rate',
severity: 'critical',
condition: 'error_rate',
threshold: 10,
channels: ['log', 'webhook'],
cooldownMs: 300000
},
{
id: 'disk-space-low',
name: 'Low Disk Space',
severity: 'warning',
condition: 'disk_percent',
threshold: 90,
channels: ['log'],
cooldownMs: 600000 // 10分钟冷却(磁盘变化较慢)
}
];16.3.2 规则持久化
💡 实现说明:告警规则在运行时存储在内存
Map<string, AlertRule>中,以实现高性能的内存指标检查。规则数据通过 SQLitesettings表进行持久化,以 JSON 格式存储。
规则数据流向:
┌─────────────────┐ 启动时加载 ┌─────────────────┐
│ settings 表 │ ──────────────────► │ 内存 Map │
│ (JSON 持久化) │ │ rules │
│ │ ◄────────────────── │ │
└─────────────────┘ 修改时保存 └─────────────────┘// 运行时存储:内存 Map(高性能检查)
private rules: Map<string, AlertRule> = new Map();
// 加载规则:从数据库加载到内存
private loadRules(): void {
const saved = db.prepare('SELECT value FROM settings WHERE key = ?').get('alert_rules');
if (saved) {
const rules = JSON.parse(saved.value) as AlertRule[];
rules.forEach(rule => this.rules.set(rule.id, rule));
}
// 如果数据库中没有规则,加载默认规则并持久化
if (this.rules.size === 0) {
DEFAULT_ALERT_RULES.forEach(rule => this.rules.set(rule.id, rule));
this.saveRules();
}
}
// 保存规则:从内存持久化到数据库
private saveRules(): void {
const rules = Array.from(this.rules.values());
const json = JSON.stringify(rules);
db.prepare(`
INSERT OR REPLACE INTO settings (key, value, updated_at)
VALUES ('alert_rules', ?, CURRENT_TIMESTAMP)
`).run(json);
}16.3.3 指标检查与告警触发
async checkAlerts(metrics: {
memoryPercent?: number;
cpuPercent?: number;
dbLatency?: number;
errorRate?: number;
diskPercent?: number;
[key: string]: number | undefined;
}): Promise<AlertNotification[]> {
const triggeredAlerts: AlertNotification[] = [];
for (const rule of this.rules.values()) {
if (!rule.enabled) continue;
// 从指标数据中提取规则对应的值
const value = metrics[rule.condition];
if (value === undefined) continue;
// 超过阈值触发告警
if (value >= rule.threshold) {
const now = Date.now();
// 冷却期检查:同一规则在冷却期内不重复触发
if (rule.lastTriggered && (now - rule.lastTriggered) < rule.cooldownMs) {
continue;
}
const alert = await this.triggerAlert(rule, value, metrics);
triggeredAlerts.push(alert);
rule.lastTriggered = now;
this.rules.set(rule.id, rule);
}
}
this.saveRules();
return triggeredAlerts;
}16.3.4 冷却期机制
冷却期(Cooldown)是防止告警风暴的关键设计:
时间轴:
│─────┬───────────────┬───────────────┬───────────────┬────►
│ │ │ │
告警触发1 指标继续超标 指标继续超标 告警触发2
t=0s t=30s t=120s t=310s
│ │
在冷却期内 超过冷却期(300s)
不触发告警 允许触发
▼ ▼
[跳过] [触发]不同规则的冷却期设计:
| 规则 | 冷却期 | 设计原因 |
|---|---|---|
| 数据库延迟 | 1分钟 | 数据库性能波动频繁,短冷却期保证及时感知 |
| CPU/内存 | 5分钟 | 常规指标,中等冷却期避免频繁告警 |
| 磁盘空间 | 10分钟 | 磁盘变化缓慢,长冷却期减少噪音 |
| 错误率 | 5分钟 | 错误率变化可能较快,中等冷却期 |
16.3.5 告警通知分发
触发告警后,通过多渠道异步发送通知:
private async sendNotification(alert: AlertNotification): Promise<void> {
const promises: Promise<void>[] = [];
if (alert.channels.includes('log')) {
promises.push(this.sendToLog(alert)); // 写入日志
}
if (alert.channels.includes('webhook') && this.webhookUrl) {
promises.push(this.sendToWebhook(alert)); // HTTP Webhook
}
if (alert.channels.includes('email') && this.emailConfig) {
promises.push(this.sendToEmail(alert)); // SMTP 邮件
}
await Promise.allSettled(promises); // 所有渠道并行,不互相影响
}使用 Promise.allSettled 而非 Promise.all 确保一个渠道失败不影响其他渠道。
16.3.6 Webhook 通知实现
private async sendToWebhook(alert: AlertNotification): Promise<void> {
const response = await fetch(this.webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
alert_id: alert.id,
rule_name: alert.ruleName,
severity: alert.severity,
message: alert.message,
timestamp: alert.timestamp,
metadata: alert.metadata
})
});
if (!response.ok) {
logger.warn(`Webhook notification failed: ${response.status}`);
}
}16.3.7 邮件通知实现
⚠️ 注意:当前版本邮件功能仅支持配置记录,实际 SMTP 发送功能尚未完全实现。以下代码展示的是完整的实现框架,配置 SMTP 后即可启用。
private async sendEmail(recipients: string[], notification: AlertNotification): Promise<void> {
// 当前实现:记录日志(SMTP 未配置)
logger.info('Email notification prepared (SMTP not configured)', {
recipients,
subject: `[${notification.level.toUpperCase()}] ${notification.title}`,
message: notification.message
});
// TODO: 完整 SMTP 实现需添加 nodemailer 依赖后启用以下代码:
// const nodemailer = await import('nodemailer');
// const transporter = nodemailer.createTransport({
// host: this.emailConfig.host,
// port: this.emailConfig.port,
// secure: this.emailConfig.port === 465,
// auth: { user: this.emailConfig.user, pass: this.emailConfig.pass }
// });
// await transporter.sendMail({ ... });
}扩展指南:如需启用完整的邮件发送功能,请执行以下步骤:
- 安装依赖:
npm install nodemailer - 配置环境变量:
ALERT_EMAIL_HOST、ALERT_EMAIL_PORT、ALERT_EMAIL_USER、ALERT_EMAIL_PASS、ALERT_EMAIL_TO - 取消上述 TODO 代码的注释即可启用
16.3.8 告警统计
getStats(): {
totalAlerts: number;
bySeverity: Record<AlertSeverity, number>;
last24Hours: number;
topRules: { ruleId: string; ruleName: string; count: number }[];
} {
const oneDayAgo = Date.now() - 86400000;
const bySeverity: Record<AlertSeverity, number> = { critical: 0, warning: 0, info: 0 };
const ruleCounts = new Map<string, { name: string; count: number }>();
let last24Hours = 0;
this.alertHistory.forEach(alert => {
bySeverity[alert.severity]++;
if (new Date(alert.timestamp).getTime() > oneDayAgo) last24Hours++;
const existing = ruleCounts.get(alert.ruleId);
if (existing) existing.count++;
else ruleCounts.set(alert.ruleId, { name: alert.ruleName, count: 1 });
});
const topRules = Array.from(ruleCounts.entries())
.map(([ruleId, data]) => ({ ruleId, ruleName: data.name, count: data.count }))
.sort((a, b) => b.count - a.count)
.slice(0, 10);
return { totalAlerts: this.alertHistory.length, bySeverity, last24Hours, topRules };
}16.4 告警降噪系统
16.4.1 降噪算法概述
告警降噪解决的核心问题是:同一故障在短时间内产生大量重复告警,导致运维人员疲劳。降噪系统通过以下策略处理:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 告警到达 │────►│ 指纹计算 │────►│ 查找已有记录 │
│ (source, │ │ MD5 hash │ │ 按指纹查询 │
│ title) │ └──────────────┘ └──────┬───────┘
│
┌──────────────────┼──────────────────┐
│ │
┌─────────▼────────┐ ┌──────────▼─────────┐
│ 新告警指纹 │ │ 已有记录 │
│ 首次出现 │ │ 发生次数+1 │
│ shouldNotify=true│ │ 检查是否应抑制 │
└──────────────────┘ └──────────┬────────┘
│
┌─────────────────┼─────────────────┐
│ │
┌─────────▼────────┐ ┌──────────▼─────────┐
│ 出现次数 < 5 │ │ 出现次数 ≥ 5 │
│ 通知 │ │ severity 非critical│
│ shouldNotify=true│ │ 自动抑制30分钟 │
└──────────────────┘ │ shouldNotify=false│
└────────────────────┘16.4.2 指纹生成
generateFingerprint(source: string, title: string, _content?: string): string {
// 标题归一化:小写 + 移除数字/空格/下划线/连字符的连续序列
const normalizedTitle = title.toLowerCase().replace(/[\d\s_-]+/g, ' ').trim();
const normalizedSource = source.toLowerCase();
const fingerprint = `${normalizedSource}:${normalizedTitle}`;
return createHash('md5').update(fingerprint).digest('hex');
}指纹归一化示例:
| 原始标题 | 归一化后 | 说明 |
|---|---|---|
CPU usage on server-01 is 95% | cpu usage on server is | 数字、连字符被合并 |
CPU usage on server-02 is 88% | cpu usage on server is | 与上一条相同指纹 |
Disk usage on host_10 at 90% | disk usage on host at | 数字、下划线被合并 |
16.4.3 抑制策略
private shouldSuppressAlert(record: AlertNoiseRecord, severity?: string): boolean {
// critical/high 级别的告警永远不抑制
if (severity === 'critical' || severity === 'high') {
return false;
}
// 出现 5 次及以上的低级别告警自动抑制
return record.occurrence_count >= 5;
}抑制后的处理:
if (shouldSuppress && !isSuppressed) {
db.prepare(
`UPDATE alert_noise_reduction
SET is_suppressed = 1, suppression_reason = ?, suppression_until = ?
WHERE alert_fingerprint = ?`
).run(
'频繁告警自动抑制',
new Date(now.getTime() + 30 * 60 * 1000).toISOString(), // 抑制30分钟
fingerprint
);
}16.4.4 降噪统计
getNoiseReductionStats(): {
totalAlerts: number;
suppressedAlerts: number;
duplicateCount: number;
noiseReductionRate: number;
} {
const stats = db.prepare(`
SELECT COUNT(*) as total,
SUM(CASE WHEN is_suppressed = 1 THEN 1 ELSE 0 END) as suppressed,
SUM(occurrence_count - 1) as duplicates
FROM alert_noise_reduction
`).get();
const total = stats?.total || 0;
const suppressed = stats?.suppressed || 0;
const duplicates = stats?.duplicates || 0;
// 降噪率 = (被抑制 + 重复) / (总数 + 重复)
const noiseReductionRate = total > 0
? Math.round(((suppressed + duplicates) / (total + duplicates)) * 100)
: 0;
return { totalAlerts: total, suppressedAlerts: suppressed, duplicateCount: duplicates, noiseReductionRate };
}16.4.5 手动干预
运维人员可以手动取消抑制或手动设置抑制:
// 取消对某指纹的抑制
unsuppressAlert(fingerprint: string): boolean {
db.prepare(
`UPDATE alert_noise_reduction
SET is_suppressed = 0, suppression_reason = NULL, suppression_until = NULL
WHERE alert_fingerprint = ?`
).run(fingerprint);
}
// 手动抑制(指定持续时间和原因)
manuallySuppressAlert(fingerprint: string, reason: string, durationMinutes: number = 60): boolean {
const suppressionUntil = new Date(Date.now() + durationMinutes * 60 * 1000);
db.prepare(
`UPDATE alert_noise_reduction
SET is_suppressed = 1, suppression_reason = ?, suppression_until = ?
WHERE alert_fingerprint = ?`
).run(reason, suppressionUntil.toISOString(), fingerprint);
}16.5 通知服务(NotificationService)
16.5.1 通知渠道对比
| 渠道 | 速度 | 富文本 | 适用场景 | 配置复杂度 |
|---|---|---|---|---|
| WebSocket | 实时(毫秒) | JSON | 前端页面实时展示 | 低 |
| 企业微信 | 快(秒级) | Markdown | 运维团队即时通知 | 中 |
| 钉钉 | 快(秒级) | Markdown | 运维团队即时通知 | 中 |
| 邮件 | 慢(分钟) | HTML | 正式记录、非紧急通知 | 高 |
| Webhook | 实时 | JSON | 对接第三方系统 | 低 |
16.5.2 多渠道并发发送
private async send(notification: { type: string; title: string; content: string }) {
const promises: Promise<void>[] = [];
if (this.config?.wechat_enabled) {
promises.push(this.sendWeChat(notification));
}
if (this.config?.dingtalk_enabled) {
promises.push(this.sendDingTalk(notification));
}
if (this.config?.email_enabled) {
promises.push(this.sendEmail(notification));
}
if (this.config?.webhook_enabled !== false) {
promises.push(this.sendWebhook(notification)); // 默认启用
}
await Promise.allSettled(promises);
}16.5.3 企业微信通知
private async sendWeChat(notification: { type: string; title: string; content: string }) {
const message = {
msgtype: 'markdown',
markdown: {
content: `## ${notification.title}\n\n${notification.content}\n\n> 来源: ITOps Agent Platform\n> 时间: ${new Date().toLocaleString()}`
}
};
await axios.post(this.config.wechat_config.webhook_url, message, {
headers: { 'Content-Type': 'application/json' }
});
}16.5.4 钉钉通知
private async sendDingTalk(notification: { type: string; title: string; content: string }) {
const message = {
msgtype: 'markdown',
markdown: {
title: notification.title,
text: `## ${notification.title}\n\n${notification.content}\n\n> 来源: ITOps Agent Platform\n> 时间: ${new Date().toLocaleString()}`
}
};
await axios.post(this.config.dingtalk_config.webhook_url, message, {
headers: { 'Content-Type': 'application/json' }
});
}16.5.5 WebSocket 实时推送
private async sendWebhook(notification: { type: string; title: string; content: string }) {
const io = getIOInstance();
if (io) {
io.emit('notification', {
id: randomUUID(),
type: notification.type,
title: notification.title,
content: notification.content,
timestamp: new Date().toISOString()
});
}
}16.5.6 通知持久化
每条通知都会保存到数据库,支持历史记录查询和失败重试:
async sendNotification(notification: {
type: string; title: string; content: string;
recipient?: string; related_alert_id?: string; related_task_id?: string;
}) {
const id = randomUUID();
const now = new Date().toISOString();
// 保存到数据库
db.prepare(`
INSERT INTO notifications (id, type, title, content, recipient, status, related_alert_id, related_task_id, created_at)
VALUES (?, ?, ?, ?, ?, 'pending', ?, ?, ?)
`).run(id, notification.type, notification.title, notification.content,
notification.recipient || 'default', notification.related_alert_id || null,
notification.related_task_id || null, now);
// 尝试发送
try {
await this.send(notification);
db.prepare('UPDATE notifications SET status = ?, sent_at = ? WHERE id = ?').run('sent', now, id);
return { success: true, id };
} catch (error) {
db.prepare('UPDATE notifications SET status = ?, error_message = ? WHERE id = ?')
.run('failed', errorMessage, id);
return { success: false, error: errorMessage, id };
}
}16.5.7 失败通知重试
async retryFailedNotifications() {
const failed = db.prepare(`
SELECT * FROM notifications WHERE status = 'failed' ORDER BY created_at DESC
`).all() as NotificationDB[];
const results = [];
for (const notification of failed) {
const result = await this.sendNotification({
type: notification.type,
title: notification.title,
content: notification.content,
recipient: notification.recipient || undefined,
related_alert_id: notification.related_alert_id || undefined,
related_task_id: notification.related_task_id || undefined
});
results.push(result);
}
return results;
}16.5.8 快捷通知方法
// 告警通知(带严重程度emoji)
async sendAlertNotification(alert: AlertRecord) {
const severityEmoji = { critical: '🔴', high: '🟠', medium: '🟡', low: '🟢' };
return this.sendNotification({
type: 'alert',
title: `${severityEmoji[alert.severity] || '⚪'} [${alert.severity?.toUpperCase()}] 新告警: ${alert.title}`,
content: `**告警来源**: ${alert.source}\n**告警级别**: ${alert.severity}\n**告警描述**: ${alert.content}`,
related_alert_id: alert.id
});
}
// 任务状态通知(带状态emoji)
async sendTaskNotification(task: TaskRecord, status: string) {
const statusEmoji = { completed: '✅', failed: '❌', running: '▶️', pending: '⏳' };
return this.sendNotification({
type: 'task',
title: `${statusEmoji[status] || '⚪'} 任务状态变更: ${task.name}`,
content: `**任务名称**: ${task.name}\n**当前状态**: ${status}`,
related_task_id: task.id
});
}16.6 自动修复引擎(RemediationService)
16.6.1 修复策略定义
修复策略是告警与修复工作流之间的桥梁:
┌────────────────────────────────────────────────────────────┐
│ RemediationPolicy │
├────────────────────────────────────────────────────────────┤
│ 匹配条件: │
│ - alert_source: 告警来源(prometheus/zabbix/...) │
│ - alert_severity: 严重程度过滤 │
│ - alert_keywords: 关键词匹配 │
│ - alert_tags: 标签匹配 │
│ │
│ 执行模式: │
│ - auto: 自动执行 │
│ - approval: 需要人工审批 │
│ - suggestion: 仅发送建议 │
│ │
│ 执行配置: │
│ - workflow_id: 修复工作流 │
│ - workflow_params: 工作流参数(支持{<!-- -->{alert.xxx}<!-- -->}模板) │
│ - max_executions_per_hour: 每小时最大执行次数 │
│ - cooldown_seconds: 冷却期 │
│ │
│ 安全保障: │
│ - enable_verification: 执行后验证 │
│ - verification_workflow_id: 验证工作流 │
│ - enable_rollback: 失败回滚 │
│ - rollback_workflow_id: 回滚工作流 │
└────────────────────────────────────────────────────────────┘16.6.2 策略创建
createPolicy(policy: Omit<RemediationPolicy, 'id' | 'created_at' | 'updated_at'>): RemediationPolicy {
const id = uuidv4();
const now = new Date().toISOString();
db.prepare(`
INSERT INTO remediation_policies (
id, name, description, alert_source, alert_severity,
alert_keywords, alert_tags, execution_mode, workflow_id,
workflow_params, max_executions_per_hour, cooldown_seconds,
require_confirmation, enable_verification, verification_workflow_id,
verification_params, verification_timeout_seconds, enable_rollback,
rollback_workflow_id, rollback_on_failure, enabled, created_by,
created_at, updated_at
) VALUES (@id, @name, ..., @updated_at)
`).run({ id, ...policy, created_at: now, updated_at: now });
return this.getPolicy(id);
}16.6.3 告警匹配策略
当告警到达时,系统查找所有匹配的修复策略:
async matchAlertToPolicies(alert: { id: string; source: string; severity?: string; title?: string; content?: string; tags?: string[] }): Promise<RemediationPolicy[]> {
// 按告警来源查询已启用的策略
const policies = db.prepare(`
SELECT * FROM remediation_policies
WHERE enabled = 1 AND alert_source = ?
ORDER BY
CASE alert_severity
WHEN 'disaster' THEN 1
WHEN 'high' THEN 2
WHEN 'average' THEN 3
WHEN 'warning' THEN 4
ELSE 5
END
`).all(alert.source) as RemediationPolicy[];
// 进一步过滤:严重程度、关键词、标签
return policies.filter(policy => {
// 严重程度匹配
if (policy.alert_severity && policy.alert_severity !== alert.severity) {
return false;
}
// 关键词匹配(OR逻辑:任一关键词出现即匹配)
if (policy.alert_keywords) {
const keywords = JSON.parse(policy.alert_keywords) as string[];
const alertText = `${alert.title || ''} ${alert.content || ''}`.toLowerCase();
if (!keywords.some(kw => alertText.includes(kw.toLowerCase()))) {
return false;
}
}
// 标签匹配(OR逻辑:任一标签匹配即通过)
if (policy.alert_tags) {
const tags = JSON.parse(policy.alert_tags) as string[];
const alertTags = alert.tags || [];
if (!tags.some(t => alertTags.includes(t))) {
return false;
}
}
return true;
});
}16.6.4 触发修复
async triggerRemediation(policy: RemediationPolicy, alert: { id: string; source: string; ... }): Promise<RemediationExecution> {
// 1. 冷却期检查
if (this.isInCooldown(policy, alert)) {
return this.createSkippedExecution(policy, alert, 'cooldown');
}
// 2. 频率限制检查
if (this.isRateLimited(policy)) {
return this.createSkippedExecution(policy, alert, 'rate_limited');
}
// 3. 创建执行记录
const id = uuidv4();
db.prepare(`
INSERT INTO remediation_executions (id, policy_id, alert_id, alert_snapshot, status, approval_required, created_at)
VALUES (?, ?, ?, ?, 'pending', ?, ?)
`).run(id, policy.id, alert.id, JSON.stringify(alert), policy.execution_mode === 'approval' ? 1 : 0, now);
// 4. 根据执行模式分发
switch (policy.execution_mode) {
case 'auto':
this.executeWorkflowAsync(id); // 异步执行工作流
break;
case 'approval':
await this.requestApproval(id); // 发送审批请求
break;
case 'suggestion':
await this.sendSuggestion(id); // 发送修复建议
break;
}
return execution;
}16.6.5 冷却期与频率限制
// 冷却期检查:同一策略+告警在冷却期内不重复执行
private isInCooldown(policy: RemediationPolicy, alert: { id: string }): boolean {
const result = db.prepare(`
SELECT cooldown_until FROM remediation_cooldowns
WHERE policy_id = ? AND alert_id = ?
`).get(policy.id, alert.id);
if (!result) return false;
return new Date().toISOString() < result.cooldown_until;
}
// 频率限制:同一策略每小时最大执行次数
private isRateLimited(policy: RemediationPolicy): boolean {
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString();
const result = db.prepare(`
SELECT COUNT(*) as count FROM remediation_executions
WHERE policy_id = ? AND created_at > ?
`).get(policy.id, oneHourAgo);
return result.count >= policy.max_executions_per_hour;
}16.6.6 工作流执行
async executeWorkflow(executionId: string): Promise<void> {
const execution = this.getExecution(executionId);
const policy = this.getPolicy(execution.policy_id);
const alert = JSON.parse(execution.alert_snapshot);
if (!policy.workflow_id) {
this.updateExecutionStatus(executionId, 'failed', 'No workflow configured');
return;
}
this.updateExecution(executionId, { status: 'running', started_at: new Date().toISOString() });
const startTime = Date.now();
// 获取工作流定义
const workflow = db.prepare('SELECT * FROM workflows WHERE id = ?').get(policy.workflow_id);
// 解析工作流参数(支持 {<!-- -->{alert.xxx}<!-- -->} 模板替换)
const params = this.resolveParams(policy.workflow_params, alert);
// 创建任务记录
const taskId = uuidv4();
db.prepare(`
INSERT INTO tasks (id, workflow_id, name, status, context, created_at)
VALUES (?, ?, ?, 'pending', ?, CURRENT_TIMESTAMP)
`).run(taskId, workflow.id, `自动修复: ${workflow.name}`, JSON.stringify(params));
// 执行工作流
await executeWorkflow(taskId, parsedWorkflow, undefined, params);
// 更新执行结果
this.updateExecution(executionId, {
workflow_execution_id: taskId,
completed_at: new Date().toISOString(),
execution_duration_ms: Date.now() - startTime
});
// 验证结果(如果配置了验证)
if (policy.enable_verification && policy.verification_workflow_id) {
await this.verifyResult(executionId);
} else {
this.updateExecutionStatus(executionId, 'success');
this.resolveAlert(execution.alert_id);
this.updateCooldown(policy, alert);
}
}16.6.7 参数模板替换
工作流参数支持 {<!-- -->{alert.xxx}<!-- -->} 模板语法,将告警数据注入到工作流参数中:
private resolveParams(paramsJson: string | undefined, alert: Record<string, unknown>): Record<string, unknown> {
if (!paramsJson) return {};
const params = JSON.parse(paramsJson);
const resolved: Record<string, unknown> = {};
for (const [key, value] of Object.entries(params)) {
if (typeof value === 'string') {
resolved[key] = value.replace(/\{\{alert\.(\w+)\}\}/g, (_match, prop) => {
const val = alert[prop];
return val !== undefined && val !== null ? String(val) : '';
});
} else {
resolved[key] = value;
}
}
return resolved;
}示例:
{
"server_id": "{<!-- -->{alert.host}<!-- -->}",
"command": "systemctl restart nginx",
"max_retries": 3
}当告警的 host 字段为 "192.168.1.100" 时,解析结果为:
{
"server_id": "192.168.1.100",
"command": "systemctl restart nginx",
"max_retries": 3
}16.6.8 验证机制
执行修复后,可通过独立的工作流验证修复效果:
async verifyResult(executionId: string): Promise<{ success: boolean }> {
const policy = this.getPolicy(execution.policy_id);
const alert = JSON.parse(execution.alert_snapshot);
// 获取验证工作流
const workflow = db.prepare('SELECT * FROM workflows WHERE id = ?')
.get(policy.verification_workflow_id);
const params = this.resolveParams(policy.verification_params, alert);
const timeout = policy.verification_timeout_seconds * 1000;
const taskId = uuidv4();
// 带超时控制的执行
const result = await Promise.race([
executeWorkflow(taskId, parsedWorkflow, undefined, params),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Verification timeout')), timeout)
)
]);
this.updateExecution(executionId, {
verification_status: 'success',
status: 'success'
});
// 验证通过 → 解决告警、更新冷却
this.resolveAlert(execution.alert_id);
this.updateCooldown(policy, alert);
return { success: true, result };
}16.6.9 回滚机制
修复失败时自动触发回滚:
async rollbackExecution(executionId: string): Promise<void> {
const policy = this.getPolicy(execution.policy_id);
if (!policy.rollback_workflow_id) {
logger.warn(`No rollback workflow configured for policy ${policy.id}`);
return;
}
const workflow = db.prepare('SELECT * FROM workflows WHERE id = ?')
.get(policy.rollback_workflow_id);
const taskId = uuidv4();
db.prepare(`
INSERT INTO tasks (id, workflow_id, name, status, context, created_at)
VALUES (?, ?, ?, 'pending', ?, CURRENT_TIMESTAMP)
`).run(taskId, workflow.id, `回滚: ${workflow.name}`, JSON.stringify({ execution_id: executionId }));
const result = await executeWorkflow(taskId, parsedWorkflow);
this.updateExecution(executionId, {
rollback_triggered: 1,
rollback_execution_id: taskId,
rollback_result: JSON.stringify(result),
status: 'rolled_back'
});
}16.6.10 审批模式
当执行模式为 approval 时,需要人工审批后才能执行:
// 请求审批
private async requestApproval(execution: RemediationExecution): Promise<void> {
this.updateExecution(execution.id, { status: 'waiting_approval' });
const policy = this.getPolicy(execution.policy_id);
const alert = JSON.parse(execution.alert_snapshot);
await notificationService.sendNotification({
type: 'remediation_approval',
title: '修复审批请求',
content: `策略: ${policy.name}\n告警: ${alert.title || 'Unknown'}\n请审批执行`,
related_alert_id: execution.alert_id
});
}
// 审批处理
async approveExecution(executionId: string, action: 'approve' | 'reject', userId: string, comment?: string): Promise<void> {
const execution = this.getExecution(executionId);
if (execution.status !== 'waiting_approval') {
throw new Error('Execution is not waiting for approval');
}
if (action === 'approve') {
this.updateExecution(executionId, {
status: 'approved',
approved_by: userId,
approved_at: new Date().toISOString(),
approval_comment: comment
});
this.executeWorkflowAsync(executionId); // 批准后执行工作流
} else {
this.updateExecution(executionId, {
status: 'rejected',
approved_by: userId,
approved_at: new Date().toISOString(),
approval_comment: comment,
completed_at: new Date().toISOString()
});
}
}16.6.11 完整修复流程
告警到达
│
▼
┌─────────────────────┐
│ matchAlertToPolicies │ 查找匹配策略(按source+severity+keywords+tags)
└──────────┬──────────┘
│ 匹配到策略
▼
┌─────────────────────┐
│ isInCooldown? │ 冷却期检查
└──────────┬──────────┘
是 ──► 跳过记录(status=skipped, reason=cooldown)
否
▼
┌─────────────────────┐
│ isRateLimited? │ 频率限制检查
└──────────┬──────────┘
是 ──► 跳过记录(status=skipped, reason=rate_limited)
否
▼
┌─────────────────────┐
│ 创建执行记录 │ status=pending
└──────────┬──────────┘
│
▼
┌──────┴──────┐
│ execution_mode │
└──────┬──────┘
│
┌──────┼──────┐
│ │ │
auto approval suggestion
│ │ │
▼ ▼ ▼
执行 等待审批 发送建议
工作流 │
▼
审批通过?
│
是─┤─否
│ │
▼ ▼
执行 记录rejected
工作流
│
▼
┌────┴─────┐
│ 执行成功? │
└────┬─────┘
是─┤─否
│ │
▼ ▼
┌────┴──┐ ┌──────────┐
│验证配置?│ │enable_rollback?
└───┬───┘ └────┬─────┘
是┤─否 是┤─否
│ │ │ │
▼ ▼ ▼ ▼
验证 解决告警 执行回滚 记录failed
工作流 更新冷却 工作流
│
成功?
是┤─否
│ │
▼ ▼
解决告警 记录failed
更新冷却 可能触发回滚16.6.12 策略统计
async getPolicyStats(policyId: string, days: number): Promise<PolicyStats> {
const sinceDate = new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString();
// 总触发次数
const total = db.prepare(`
SELECT COUNT(*) as count FROM remediation_executions
WHERE policy_id = ? AND created_at > ?
`).get(policyId, sinceDate).count;
// 成功次数
const successCount = db.prepare(`
SELECT COUNT(*) as count FROM remediation_executions
WHERE policy_id = ? AND status = 'success' AND created_at > ?
`).get(policyId, sinceDate).count;
// 失败次数
const failedCount = db.prepare(`
SELECT COUNT(*) as count FROM remediation_executions
WHERE policy_id = ? AND status = 'failed' AND created_at > ?
`).get(policyId, sinceDate).count;
// 回滚次数
const rolledBackCount = db.prepare(`
SELECT COUNT(*) as count FROM remediation_executions
WHERE policy_id = ? AND status = 'rolled_back' AND created_at > ?
`).get(policyId, sinceDate).count;
// 平均执行时长
const avgDuration = db.prepare(`
SELECT AVG(execution_duration_ms) as avg_duration FROM remediation_executions
WHERE policy_id = ? AND execution_duration_ms IS NOT NULL AND created_at > ?
`).get(policyId, sinceDate).avg_duration;
// 日维度统计
const dailyStats = db.prepare(`
SELECT DATE(created_at) as date,
COUNT(*) as triggers,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM remediation_executions
WHERE policy_id = ? AND created_at > ?
GROUP BY DATE(created_at) ORDER BY date
`).all(policyId, sinceDate);
return {
total_triggers: total,
success_count: successCount,
failed_count: failedCount,
rolled_back_count: rolledBackCount,
success_rate: total > 0 ? Math.round((successCount / total) * 10000) / 100 : 0,
avg_duration_ms: avgDuration ? Math.round(avgDuration) : 0,
daily_stats: dailyStats
};
}16.7 数据库表结构
16.7.1 告警表
CREATE TABLE alerts (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
severity TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT,
status TEXT NOT NULL DEFAULT 'firing',
external_id TEXT,
host TEXT,
metadata TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);16.7.2 告警降噪表
CREATE TABLE alert_noise_reduction (
id TEXT PRIMARY KEY,
alert_fingerprint TEXT NOT NULL UNIQUE,
alert_source TEXT NOT NULL,
alert_title TEXT NOT NULL,
occurrence_count INTEGER NOT NULL DEFAULT 1,
first_occurrence TEXT NOT NULL,
last_occurrence TEXT NOT NULL,
is_suppressed INTEGER NOT NULL DEFAULT 0,
suppression_reason TEXT,
suppression_until TEXT
);16.7.3 通知表
CREATE TABLE notifications (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT,
recipient TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
related_alert_id TEXT,
related_task_id TEXT,
error_message TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
sent_at TEXT
);16.7.4 修复策略表
CREATE TABLE remediation_policies (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
alert_source TEXT NOT NULL,
alert_severity TEXT,
alert_keywords TEXT,
alert_tags TEXT,
execution_mode TEXT NOT NULL,
workflow_id TEXT,
workflow_params TEXT,
max_executions_per_hour INTEGER NOT NULL,
cooldown_seconds INTEGER NOT NULL,
require_confirmation INTEGER,
enable_verification INTEGER NOT NULL DEFAULT 0,
verification_workflow_id TEXT,
verification_params TEXT,
verification_timeout_seconds INTEGER,
enable_rollback INTEGER NOT NULL DEFAULT 0,
rollback_workflow_id TEXT,
rollback_on_failure INTEGER,
enabled INTEGER NOT NULL DEFAULT 1,
created_by TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);16.7.5 修复执行表
CREATE TABLE remediation_executions (
id TEXT PRIMARY KEY,
policy_id TEXT NOT NULL,
alert_id TEXT NOT NULL,
alert_snapshot TEXT NOT NULL,
status TEXT NOT NULL,
status_reason TEXT,
approval_required INTEGER DEFAULT 0,
approved_by TEXT,
approved_at TEXT,
approval_comment TEXT,
workflow_execution_id TEXT,
started_at TEXT,
completed_at TEXT,
execution_duration_ms INTEGER,
execution_result TEXT,
verification_status TEXT,
verification_result TEXT,
verification_completed_at TEXT,
rollback_triggered INTEGER DEFAULT 0,
rollback_execution_id TEXT,
rollback_completed_at TEXT,
rollback_result TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);16.7.6 冷却表
CREATE TABLE remediation_cooldowns (
policy_id TEXT NOT NULL,
alert_id TEXT NOT NULL,
cooldown_until TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (policy_id, alert_id)
);16.8 完整告警处理链路
以 Prometheus 告警到达为例:
1. Prometheus Alertmanager 发送 POST Webhook 到后端
2. 后端路由收到请求,调用 detectSourceType(payload) → 'prometheus'
3. 调用 adaptPrometheus(payload) → NormalizedAlert[]
4. 对每条 NormalizedAlert:
├── alertNoiseReductionService.processAlert(source, title, content, severity)
│ ├── 生成指纹: md5("prometheus:cpu usage on server is ")
│ ├── 查询数据库:是否已有此指纹?
│ │ ├── 否 → 新记录,shouldNotify=true
│ │ └── 是 → 发生次数+1
│ │ ├── <5次 → shouldNotify=true
│ │ └── ≥5次 + severity 非 critical → 自动抑制30分钟
├── 如果 shouldNotify=true:
│ ├── 保存到 alerts 表
│ ├── notificationService.sendAlertNotification(alert)
│ │ ├── 发送企业微信(Markdown格式)
│ │ ├── 发送钉钉(Markdown格式)
│ │ ├── 发送邮件(HTML格式)
│ │ └── 发送WebSocket(前端实时展示)
│ └── remediationService.matchAlertToPolicies(alert)
│ ├── 查找匹配策略
│ ├── 对每个匹配策略:
│ │ ├── isInCooldown? → 是则跳过
│ │ ├── isRateLimited? → 是则跳过
│ │ └── triggerRemediation(policy, alert)
│ │ ├── execution_mode=auto → executeWorkflowAsync
│ │ │ ├── executeWorkflow() → 执行修复工作流
│ │ │ ├── 成功 → 验证 → 解决告警 → 更新冷却
│ │ │ └── 失败 → 回滚(如果配置)
│ │ ├── execution_mode=approval → 等待人工审批
│ │ └── execution_mode=suggestion → 发送建议通知
└── 如果 shouldNotify=false(被抑制):
└── 仅记录到数据库,不发送通知本章小结
本章系统讲解了 ITOps Agent Platform 告警中心与通知系统的完整实现:
- 告警源适配器:5种适配器(Prometheus/Zabbix/Grafana/Aliyun/Tencent)将不同格式的告警归一化为
NormalizedAlert,包含严重程度映射、来源自动检测 - 告警规则引擎:基于内存 Map 的规则存储、阈值检查、冷却期防抖动、多渠道通知分发
- 告警降噪:MD5 指纹算法(标题归一化)、重复检测(SQLite 持久化)、自动抑制(5次阈值 + critical 豁免 + 30分钟 TTL)
- 通知服务:多渠道并发(企业微信/钉钉/邮件/Webhook/WebSocket)、持久化记录、失败重试、emoji 增强
- 自动修复:策略匹配(source/severity/keywords/tags)、冷却+限流、三种执行模式(auto/approval/suggestion)、工作流驱动修复、验证工作流、回滚工作流
- 数据库设计:6 张核心表(alerts、alert_noise_reduction、notifications、remediation_policies、remediation_executions、remediation_cooldowns)
这些组件共同构建了一个从告警接收到自动修复的闭环系统。
本章练习
基础练习
新增告警源适配器:为 AWS CloudWatch 编写
adaptCloudWatch适配器。CloudWatch SNS 通知的 JSON 格式包含AlarmName、NewStateValue、NewStateReason、AWSAccountId等字段。实现归一化为NormalizedAlert。自定义冷却期规则:在
alertService.ts中,允许每条规则配置动态冷却期(根据严重程度调整:critical 冷却期更短,warning 冷却期更长)。修改AlertRule接口和triggerAlert逻辑。通知历史分页查询:为
notificationService.ts的getNotificationHistory方法添加分页支持,参数包括page、limit、status(过滤已发送/失败),返回{notifications, total, page, totalPages}。
进阶练习
告警关联分析:设计一个告警关联引擎,能够识别同一时间段内多台服务器的相似告警(如 "CPU usage high" 在 5 分钟内出现在 10 台服务器上),将它们聚合为一个"事件"并生成根因分析报告。
修复策略版本管理:为
remediation_policies表添加版本控制。每次策略更新时,将旧版本保存到remediation_policy_versions表中。实现策略回滚到历史版本的功能,以及版本差异比较。通知渠道降级:当某个通知渠道连续失败 3 次时自动禁用该渠道并切换到备用渠道。实现渠道健康度监控面板,展示各渠道的发送成功率、平均延迟、最近失败原因。
思考题
告警风暴场景设计:假设一个核心交换机故障导致 500 台服务器同时产生告警(网络不可达)。分析当前降噪系统在这种场景下的表现,讨论是否需要引入额外的降噪策略(如拓扑感知聚合、时间窗口聚合、依赖关系推断),并给出设计方案。
自动修复的安全性边界:自动修复系统可以执行工作流中的任何操作(包括危险的服务器命令)。讨论如何在自动修复系统中建立安全边界,包括:修复动作的权限分级、修复前备份创建、修复过程中的实时审计、修复失败时的快速回滚。如何在自动化效率和安全性之间取得平衡?
延伸阅读
- Alertmanager 官方文档: https://prometheus.io/docs/alerting/latest/configuration/ - Prometheus 告警配置、路由规则、抑制规则
- Google SRE 告警设计: https://sre.google/sre-book/monitoring-distributed-systems/ - Google 站点可靠性工程中关于告警设计的最佳实践
- 告警疲劳研究: "Alert Fatigue: A Systematic Literature Review" - 关于告警疲劳的成因、影响和缓解策略的学术论文
- 事件响应自动化: PagerDuty 事件响应自动化最佳实践
- Nodemailer 文档: https://nodemailer.com/ - Node.js 邮件发送库
- 企业微信/钉钉 Webhook API: 企业微信机器人开发文档、钉钉自定义机器人开发文档
