Skip to content

第16章 告警中心与通知系统

作者

谭策 — 独立开发者 | AIOps 领域探索者

IT Online 微信公众号

许可证

MPL-2.0 © 谭策

本章导读

IT 运维的核心职责之一是保障系统的稳定运行,而告警系统是发现问题的第一道防线。一个优秀的告警系统不仅要能及时发现异常,还要能够:

  • 从多种监控源(Prometheus、Zabbix、Grafana、云平台)统一接收告警
  • 智能降噪,避免告警风暴导致运维人员麻木
  • 通过多渠道(企业微信、钉钉、邮件、Webhook)将告警及时推送给相关人员
  • 自动匹配修复策略,实现无人值守的故障自愈

ITOps Agent Platform 的告警中心正是围绕这些目标设计的。本章将深入剖析告警接收、归一化、降噪、通知、自动修复的完整链路。

学习目标

  • 理解告警规则引擎的设计(阈值检查、冷却期、多渠道分发)
  • 掌握多告警源适配器的归一化策略(Prometheus/Zabbix/Grafana/Aliyun/Tencent)
  • 理解告警降噪的指纹算法与自动抑制机制
  • 掌握多渠道通知系统的实现(企业微信/钉钉/邮件/Webhook/WebSocket)
  • 理解自动修复引擎的完整流程(策略匹配→冷却/限流→工作流执行→验证→回滚)
  • 学会设计告警统计与降噪效果分析

16.1 告警系统整体架构

┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│  Prometheus  │ │   Zabbix     │ │   Grafana    │ │  Aliyun      │ │  Tencent     │
│  Webhook     │ │  Webhook     │ │  Webhook     │ │  Cloud       │ │  Cloud       │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
       │                │                │                │                │
       └────────────────┴────────────────┴────────────────┴────────────────┘

                              ┌─────────▼──────────┐
                              │ detectSourceType() │  ← 自动识别告警来源
                              └─────────┬──────────┘

       ┌────────────────────────────────┼────────────────────────────────┐
       │                                │                                │
┌──────▼──────┐ ┌──────────────┐ ┌──────▼──────┐ ┌──────────────┐ ┌──────▼──────┐
│adaptPrometheus│ │adaptZabbix │ │adaptGrafana │ │adaptAliyun   │ │adaptTencent │
└──────┬──────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
       │               │                │                │                │
       └───────────────┴───────┬────────┴────────────────┴────────────────┘

                  ┌────────────▼────────────┐
                  │  NormalizedAlert 统一格式 │
                  │  {source, severity,     │
                  │   title, content, ...}   │
                  └────────────┬────────────┘

                  ┌────────────▼────────────┐
                  │  AlertNoiseReduction    │  ← 指纹生成、去重、抑制
                  └────────────┬────────────┘

                  ┌────────────▼────────────┐
                  │  AlertService 规则引擎    │  ← 内存指标检查
                  └────────────┬────────────┘

                  ┌────────────▼────────────┐
                  │  NotificationService    │  ← 多渠道通知
                  │  WeChat/DingTalk/Email  │
                  │  Webhook/WebSocket      │
                  └────────────┬────────────┘

                  ┌────────────▼────────────┐
                  │  RemediationService     │  ← 自动修复
                  │  策略匹配→工作流执行     │
                  │  验证/回滚/审批         │
                  └─────────────────────────┘

16.1.1 数据标准化流程

所有外部告警源进入系统后,统一转换为 NormalizedAlert 格式:

typescript
export interface NormalizedAlert {
  external_id?: string;                // 外部系统唯一ID
  source: string;                      // 来源: prometheus/zabbix/grafana/aliyun/tencent
  severity: string;                    // 统一严重程度: critical/high/medium/low/info
  title: string;                       // 告警标题
  content: string;                     // 告警内容
  metadata: Record<string, unknown>;   // 原始元数据
  status: 'firing' | 'resolved';       // 告警状态
  host?: string;                       // 关联主机
  labels?: Record<string, string>;     // 标签
  annotations?: Record<string, string>; // 注解
  starts_at?: string;                  // 开始时间
  ends_at?: string;                    // 结束时间
}

16.2 告警源适配器(AlertSourceAdapters)

16.2.1 严重程度归一化

不同监控系统对严重程度的定义各不相同。normalizeSeverity 函数将它们统一映射为五级标准:

typescript
function normalizeSeverity(level: string | number): string {
  if (typeof level === 'number') {
    if (level >= 5) return 'critical';   // 数字5+ → 灾难级
    if (level >= 4) return 'high';       // 数字4 → 高级
    if (level >= 3) return 'medium';     // 数字3 → 中级
    if (level >= 2) return 'low';        // 数字2 → 低级
    return 'info';                       // 数字1 → 信息级
  }
  const map: Record<string, string> = {
    critical: 'critical', critical_severity: 'critical', disaster: 'critical',
    high: 'high', error: 'high',
    warning: 'medium', warn: 'medium', average: 'medium',
    information: 'low', info: 'low', low: 'low',
    not_classified: 'info',
  };
  return map[level.toLowerCase()] || 'medium';  // 默认 medium
}

严重程度映射表:

标准级别PrometheusZabbixGrafanaAliyun数字值
criticalcriticaldisastercriticalcritical≥5
high-high/errorhighhigh4
mediumwarningaverage/warningwarningmedium3
lowinfoinfo/lowinfolow2
infonot_classified--not_classified1

16.2.2 Prometheus 适配器

Prometheus Alertmanager 的 Webhook 格式:

typescript
export function adaptPrometheus(payload: unknown): AlertAdapterResult {
  const body = payload as { alerts?: unknown[]; version?: string; groupKey?: string };
  const rawAlerts = Array.isArray(body.alerts) ? body.alerts : [];

  for (const raw of rawAlerts) {
    const alert = raw as Record<string, unknown>;
    const labels = (alert.labels || {}) as Record<string, string>;
    const annotations = (alert.annotations || {}) as Record<string, string>;
    const status = (alert.status as string) || 'firing';

    alerts.push({
      external_id: `${labels.alertname || 'unknown'}-${labels.instance || ''}-${status}`,
      source: 'prometheus',
      severity: normalizeSeverity(labels.severity || 'medium'),
      title: annotations.summary || labels.alertname || 'Prometheus Alert',
      content: annotations.description || annotations.message || JSON.stringify(alert),
      metadata: {
        prometheus_version: body.version,
        group_key: body.groupKey,
        labels,
        annotations,
        starts_at: alert.startsAt,
        ends_at: alert.endsAt,
        generator_url: labels.generatorURL,
      },
      status: status === 'resolved' ? 'resolved' : 'firing',
      host: labels.instance || labels.node || labels.host,
      labels, annotations
    });
  }

  return { alerts, errors };
}

16.2.3 Zabbix 适配器

Zabbix 使用独特的宏(Macro)格式:

typescript
export function adaptZabbix(payload: unknown): AlertAdapterResult {
  const body = payload as Record<string, unknown>;

  const triggerObj = body.TRIGGER as Record<string, unknown> | undefined;
  const hostObj = body.HOST as Record<string, unknown> | undefined;
  const eventObj = body.event as Record<string, unknown> | undefined;
  const itemObj = body.ITEM as Record<string, unknown> | undefined;

  const trigger = (triggerObj?.NAME as string) || (body.trigger as string);
  const host = (hostObj?.NAME as string) || (body.host as string) || 'Unknown';
  const rawSeverity = (triggerObj?.SEVERITY as string) || (triggerObj?.PRIORITY as string | number);
  const severity = normalizeSeverity(rawSeverity);

  // Zabbix 通过 event value 判断 resolved(value=0 表示恢复)
  const eventValue = (body.EVENT as Record<string, unknown>)?.VALUE;
  const isResolved = eventValue === '0';

  const content = [
    `Host: ${host}`,
    `Trigger: ${trigger}`,
    item ? `Item: ${item}` : '',
    itemValue ? `Value: ${itemValue}` : '',
    `Severity: ${severity}`,
  ].filter(Boolean).join('\n');

  alerts.push({
    external_id: eventId ? `zabbix-${eventId}` : undefined,
    source: 'zabbix',
    severity, title: `[${severity.toUpperCase()}] ${trigger}`,
    content, status: isResolved ? 'resolved' : 'firing', host
  });

  return { alerts, errors };
}

16.2.4 Grafana 适配器

typescript
export function adaptGrafana(payload: unknown): AlertAdapterResult {
  const body = payload as Record<string, unknown>;
  // Grafana 可能是数组或单条
  const rawAlerts = Array.isArray(body.alerts) ? body.alerts : [body];

  for (const raw of rawAlerts) {
    const alert = raw as Record<string, unknown>;
    const status = (alert.state || alert.status) as string;
    const isResolved = status === 'Normal' || status === 'OK' || status === 'Resolved';

    const labels = (alert.labels || {}) as Record<string, string>;
    const annotations = (alert.annotations || {}) as Record<string, string>;

    alerts.push({
      external_id: alert.ruleUID ? `grafana-${alert.ruleUID}` : undefined,
      source: 'grafana',
      severity: normalizeSeverity(labels.severity || 'medium'),
      title: (alert.ruleName || annotations.title || 'Grafana Alert') as string,
      content: (alert.message || annotations.description || JSON.stringify(alert)) as string,
      metadata: {
        grafana_rule_uid: alert.ruleUID,
        grafana_folder: alert.folder,
        eval_matches: alert.evalMatches,
        image_url: alert.imageUrl,
      },
      status: isResolved ? 'resolved' : 'firing',
      host: labels.instance || labels.host || labels.server,
    });
  }

  return { alerts, errors };
}

16.2.5 自动来源检测

当 Webhook 请求到达时,系统通过特征字段自动识别告警来源:

typescript
export function detectSourceType(payload: unknown): string {
  const body = payload as Record<string, unknown>;

  // Prometheus: alerts 数组 + labels/annotations/startsAt
  if (body.alerts && Array.isArray(body.alerts)) {
    const firstAlert = body.alerts[0] as Record<string, unknown>;
    if (firstAlert?.labels || firstAlert?.annotations || firstAlert?.startsAt) return 'prometheus';
    if (firstAlert?.state || firstAlert?.ruleName || firstAlert?.ruleUID) return 'grafana';
  }

  // Zabbix: TRIGGER/HOST/eventid/triggerid
  if (body.TRIGGER || body.HOST || body.eventid || body.triggerid) return 'zabbix';

  // Aliyun: product/productName/alertLevel/dimensions
  if (body.product || body.productName || body.alertLevel || body.dimensions) return 'aliyun';

  // Tencent: alarmName/alarmType/policyName
  if (body.alarmName || body.alarmType || body.policyName) return 'tencent';

  return 'generic';
}

16.2.6 来源检测决策树

payload

  ├─ alerts 数组存在?
  │   ├─ 有 labels/annotations/startsAt ──► prometheus
  │   └─ 有 state/ruleName/ruleUID ──────► grafana

  ├─ TRIGGER/HOST/eventid/triggerid 存在?─► zabbix
  ├─ product/productName/alertLevel 存在?─► aliyun
  ├─ alarmName/alarmType/policyName 存在?─► tencent
  └─ 以上都不匹配 ────────────────────────► generic

16.3 告警规则引擎(AlertService)

16.3.1 内置规则定义

AlertService 预定义了 5 条系统级告警规则:

typescript
const DEFAULT_ALERT_RULES: AlertRule[] = [
  {
    id: 'high-memory-usage',
    name: 'High Memory Usage',
    severity: 'critical',
    condition: 'memory_percent',
    threshold: 90,
    channels: ['log', 'webhook'],
    cooldownMs: 300000          // 5分钟冷却
  },
  {
    id: 'high-cpu-usage',
    name: 'High CPU Usage',
    severity: 'warning',
    condition: 'cpu_percent',
    threshold: 85,
    channels: ['log'],
    cooldownMs: 300000
  },
  {
    id: 'database-slow',
    name: 'Slow Database Response',
    severity: 'critical',
    condition: 'db_latency',
    threshold: 1000,            // 1秒
    channels: ['log', 'webhook'],
    cooldownMs: 60000           // 1分钟冷却(数据库告警更频繁)
  },
  {
    id: 'high-error-rate',
    name: 'High Error Rate',
    severity: 'critical',
    condition: 'error_rate',
    threshold: 10,
    channels: ['log', 'webhook'],
    cooldownMs: 300000
  },
  {
    id: 'disk-space-low',
    name: 'Low Disk Space',
    severity: 'warning',
    condition: 'disk_percent',
    threshold: 90,
    channels: ['log'],
    cooldownMs: 600000          // 10分钟冷却(磁盘变化较慢)
  }
];

16.3.2 规则持久化

💡 实现说明:告警规则在运行时存储在内存 Map<string, AlertRule> 中,以实现高性能的内存指标检查。规则数据通过 SQLite settings 表进行持久化,以 JSON 格式存储。

规则数据流向:
┌─────────────────┐      启动时加载      ┌─────────────────┐
│  settings 表     │ ──────────────────► │  内存 Map       │
│  (JSON 持久化)   │                     │  rules          │
│                  │ ◄────────────────── │                 │
└─────────────────┘     修改时保存       └─────────────────┘
typescript
// 运行时存储:内存 Map(高性能检查)
private rules: Map<string, AlertRule> = new Map();

// 加载规则:从数据库加载到内存
private loadRules(): void {
  const saved = db.prepare('SELECT value FROM settings WHERE key = ?').get('alert_rules');
  if (saved) {
    const rules = JSON.parse(saved.value) as AlertRule[];
    rules.forEach(rule => this.rules.set(rule.id, rule));
  }

  // 如果数据库中没有规则,加载默认规则并持久化
  if (this.rules.size === 0) {
    DEFAULT_ALERT_RULES.forEach(rule => this.rules.set(rule.id, rule));
    this.saveRules();
  }
}

// 保存规则:从内存持久化到数据库
private saveRules(): void {
  const rules = Array.from(this.rules.values());
  const json = JSON.stringify(rules);
  db.prepare(`
    INSERT OR REPLACE INTO settings (key, value, updated_at)
    VALUES ('alert_rules', ?, CURRENT_TIMESTAMP)
  `).run(json);
}

16.3.3 指标检查与告警触发

typescript
async checkAlerts(metrics: {
  memoryPercent?: number;
  cpuPercent?: number;
  dbLatency?: number;
  errorRate?: number;
  diskPercent?: number;
  [key: string]: number | undefined;
}): Promise<AlertNotification[]> {
  const triggeredAlerts: AlertNotification[] = [];

  for (const rule of this.rules.values()) {
    if (!rule.enabled) continue;

    // 从指标数据中提取规则对应的值
    const value = metrics[rule.condition];
    if (value === undefined) continue;

    // 超过阈值触发告警
    if (value >= rule.threshold) {
      const now = Date.now();
      
      // 冷却期检查:同一规则在冷却期内不重复触发
      if (rule.lastTriggered && (now - rule.lastTriggered) < rule.cooldownMs) {
        continue;
      }

      const alert = await this.triggerAlert(rule, value, metrics);
      triggeredAlerts.push(alert);

      rule.lastTriggered = now;
      this.rules.set(rule.id, rule);
    }
  }

  this.saveRules();
  return triggeredAlerts;
}

16.3.4 冷却期机制

冷却期(Cooldown)是防止告警风暴的关键设计:

时间轴:
  │─────┬───────────────┬───────────────┬───────────────┬────►
        │               │               │               │
     告警触发1      指标继续超标       指标继续超标      告警触发2
     t=0s          t=30s            t=120s           t=310s
                                     │                │
                                  在冷却期内          超过冷却期(300s)
                                  不触发告警          允许触发
                                  ▼                  ▼
                               [跳过]             [触发]

不同规则的冷却期设计:

规则冷却期设计原因
数据库延迟1分钟数据库性能波动频繁,短冷却期保证及时感知
CPU/内存5分钟常规指标,中等冷却期避免频繁告警
磁盘空间10分钟磁盘变化缓慢,长冷却期减少噪音
错误率5分钟错误率变化可能较快,中等冷却期

16.3.5 告警通知分发

触发告警后,通过多渠道异步发送通知:

typescript
private async sendNotification(alert: AlertNotification): Promise<void> {
  const promises: Promise<void>[] = [];

  if (alert.channels.includes('log')) {
    promises.push(this.sendToLog(alert));           // 写入日志
  }

  if (alert.channels.includes('webhook') && this.webhookUrl) {
    promises.push(this.sendToWebhook(alert));       // HTTP Webhook
  }

  if (alert.channels.includes('email') && this.emailConfig) {
    promises.push(this.sendToEmail(alert));         // SMTP 邮件
  }

  await Promise.allSettled(promises);               // 所有渠道并行,不互相影响
}

使用 Promise.allSettled 而非 Promise.all 确保一个渠道失败不影响其他渠道。

16.3.6 Webhook 通知实现

typescript
private async sendToWebhook(alert: AlertNotification): Promise<void> {
  const response = await fetch(this.webhookUrl, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      alert_id: alert.id,
      rule_name: alert.ruleName,
      severity: alert.severity,
      message: alert.message,
      timestamp: alert.timestamp,
      metadata: alert.metadata
    })
  });

  if (!response.ok) {
    logger.warn(`Webhook notification failed: ${response.status}`);
  }
}

16.3.7 邮件通知实现

⚠️ 注意:当前版本邮件功能仅支持配置记录,实际 SMTP 发送功能尚未完全实现。以下代码展示的是完整的实现框架,配置 SMTP 后即可启用。

typescript
private async sendEmail(recipients: string[], notification: AlertNotification): Promise<void> {
  // 当前实现:记录日志(SMTP 未配置)
  logger.info('Email notification prepared (SMTP not configured)', {
    recipients,
    subject: `[${notification.level.toUpperCase()}] ${notification.title}`,
    message: notification.message
  });
  
  // TODO: 完整 SMTP 实现需添加 nodemailer 依赖后启用以下代码:
  // const nodemailer = await import('nodemailer');
  // const transporter = nodemailer.createTransport({
  //   host: this.emailConfig.host,
  //   port: this.emailConfig.port,
  //   secure: this.emailConfig.port === 465,
  //   auth: { user: this.emailConfig.user, pass: this.emailConfig.pass }
  // });
  // await transporter.sendMail({ ... });
}

扩展指南:如需启用完整的邮件发送功能,请执行以下步骤:

  1. 安装依赖:npm install nodemailer
  2. 配置环境变量:ALERT_EMAIL_HOSTALERT_EMAIL_PORTALERT_EMAIL_USERALERT_EMAIL_PASSALERT_EMAIL_TO
  3. 取消上述 TODO 代码的注释即可启用

16.3.8 告警统计

typescript
getStats(): {
  totalAlerts: number;
  bySeverity: Record<AlertSeverity, number>;
  last24Hours: number;
  topRules: { ruleId: string; ruleName: string; count: number }[];
} {
  const oneDayAgo = Date.now() - 86400000;

  const bySeverity: Record<AlertSeverity, number> = { critical: 0, warning: 0, info: 0 };
  const ruleCounts = new Map<string, { name: string; count: number }>();
  let last24Hours = 0;

  this.alertHistory.forEach(alert => {
    bySeverity[alert.severity]++;
    if (new Date(alert.timestamp).getTime() > oneDayAgo) last24Hours++;

    const existing = ruleCounts.get(alert.ruleId);
    if (existing) existing.count++;
    else ruleCounts.set(alert.ruleId, { name: alert.ruleName, count: 1 });
  });

  const topRules = Array.from(ruleCounts.entries())
    .map(([ruleId, data]) => ({ ruleId, ruleName: data.name, count: data.count }))
    .sort((a, b) => b.count - a.count)
    .slice(0, 10);

  return { totalAlerts: this.alertHistory.length, bySeverity, last24Hours, topRules };
}

16.4 告警降噪系统

16.4.1 降噪算法概述

告警降噪解决的核心问题是:同一故障在短时间内产生大量重复告警,导致运维人员疲劳。降噪系统通过以下策略处理:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  告警到达     │────►│  指纹计算     │────►│  查找已有记录 │
│  (source,    │     │  MD5 hash    │     │  按指纹查询   │
│   title)     │     └──────────────┘     └──────┬───────┘

                              ┌──────────────────┼──────────────────┐
                              │                                       │
                    ┌─────────▼────────┐                   ┌──────────▼─────────┐
                    │  新告警指纹       │                   │  已有记录           │
                    │  首次出现         │                   │  发生次数+1        │
                    │  shouldNotify=true│                   │  检查是否应抑制    │
                    └──────────────────┘                   └──────────┬────────┘

                                                    ┌─────────────────┼─────────────────┐
                                                    │                                   │
                                          ┌─────────▼────────┐              ┌──────────▼─────────┐
                                          │  出现次数 < 5    │              │  出现次数 ≥ 5      │
                                          │  通知             │              │  severity 非critical│
                                          │  shouldNotify=true│             │  自动抑制30分钟    │
                                          └──────────────────┘              │  shouldNotify=false│
                                                                            └────────────────────┘

16.4.2 指纹生成

typescript
generateFingerprint(source: string, title: string, _content?: string): string {
  // 标题归一化:小写 + 移除数字/空格/下划线/连字符的连续序列
  const normalizedTitle = title.toLowerCase().replace(/[\d\s_-]+/g, ' ').trim();
  const normalizedSource = source.toLowerCase();
  const fingerprint = `${normalizedSource}:${normalizedTitle}`;
  return createHash('md5').update(fingerprint).digest('hex');
}

指纹归一化示例:

原始标题归一化后说明
CPU usage on server-01 is 95%cpu usage on server is数字、连字符被合并
CPU usage on server-02 is 88%cpu usage on server is与上一条相同指纹
Disk usage on host_10 at 90%disk usage on host at数字、下划线被合并

16.4.3 抑制策略

typescript
private shouldSuppressAlert(record: AlertNoiseRecord, severity?: string): boolean {
  // critical/high 级别的告警永远不抑制
  if (severity === 'critical' || severity === 'high') {
    return false;
  }
  // 出现 5 次及以上的低级别告警自动抑制
  return record.occurrence_count >= 5;
}

抑制后的处理:

typescript
if (shouldSuppress && !isSuppressed) {
  db.prepare(
    `UPDATE alert_noise_reduction 
     SET is_suppressed = 1, suppression_reason = ?, suppression_until = ? 
     WHERE alert_fingerprint = ?`
  ).run(
    '频繁告警自动抑制',
    new Date(now.getTime() + 30 * 60 * 1000).toISOString(),  // 抑制30分钟
    fingerprint
  );
}

16.4.4 降噪统计

typescript
getNoiseReductionStats(): {
  totalAlerts: number;
  suppressedAlerts: number;
  duplicateCount: number;
  noiseReductionRate: number;
} {
  const stats = db.prepare(`
    SELECT COUNT(*) as total,
           SUM(CASE WHEN is_suppressed = 1 THEN 1 ELSE 0 END) as suppressed,
           SUM(occurrence_count - 1) as duplicates
    FROM alert_noise_reduction
  `).get();

  const total = stats?.total || 0;
  const suppressed = stats?.suppressed || 0;
  const duplicates = stats?.duplicates || 0;
  
  // 降噪率 = (被抑制 + 重复) / (总数 + 重复)
  const noiseReductionRate = total > 0
    ? Math.round(((suppressed + duplicates) / (total + duplicates)) * 100)
    : 0;

  return { totalAlerts: total, suppressedAlerts: suppressed, duplicateCount: duplicates, noiseReductionRate };
}

16.4.5 手动干预

运维人员可以手动取消抑制或手动设置抑制:

typescript
// 取消对某指纹的抑制
unsuppressAlert(fingerprint: string): boolean {
  db.prepare(
    `UPDATE alert_noise_reduction 
     SET is_suppressed = 0, suppression_reason = NULL, suppression_until = NULL 
     WHERE alert_fingerprint = ?`
  ).run(fingerprint);
}

// 手动抑制(指定持续时间和原因)
manuallySuppressAlert(fingerprint: string, reason: string, durationMinutes: number = 60): boolean {
  const suppressionUntil = new Date(Date.now() + durationMinutes * 60 * 1000);
  db.prepare(
    `UPDATE alert_noise_reduction 
     SET is_suppressed = 1, suppression_reason = ?, suppression_until = ? 
     WHERE alert_fingerprint = ?`
  ).run(reason, suppressionUntil.toISOString(), fingerprint);
}

16.5 通知服务(NotificationService)

16.5.1 通知渠道对比

渠道速度富文本适用场景配置复杂度
WebSocket实时(毫秒)JSON前端页面实时展示
企业微信快(秒级)Markdown运维团队即时通知
钉钉快(秒级)Markdown运维团队即时通知
邮件慢(分钟)HTML正式记录、非紧急通知
Webhook实时JSON对接第三方系统

16.5.2 多渠道并发发送

typescript
private async send(notification: { type: string; title: string; content: string }) {
  const promises: Promise<void>[] = [];

  if (this.config?.wechat_enabled) {
    promises.push(this.sendWeChat(notification));
  }
  if (this.config?.dingtalk_enabled) {
    promises.push(this.sendDingTalk(notification));
  }
  if (this.config?.email_enabled) {
    promises.push(this.sendEmail(notification));
  }
  if (this.config?.webhook_enabled !== false) {
    promises.push(this.sendWebhook(notification));  // 默认启用
  }

  await Promise.allSettled(promises);
}

16.5.3 企业微信通知

typescript
private async sendWeChat(notification: { type: string; title: string; content: string }) {
  const message = {
    msgtype: 'markdown',
    markdown: {
      content: `## ${notification.title}\n\n${notification.content}\n\n> 来源: ITOps Agent Platform\n> 时间: ${new Date().toLocaleString()}`
    }
  };

  await axios.post(this.config.wechat_config.webhook_url, message, {
    headers: { 'Content-Type': 'application/json' }
  });
}

16.5.4 钉钉通知

typescript
private async sendDingTalk(notification: { type: string; title: string; content: string }) {
  const message = {
    msgtype: 'markdown',
    markdown: {
      title: notification.title,
      text: `## ${notification.title}\n\n${notification.content}\n\n> 来源: ITOps Agent Platform\n> 时间: ${new Date().toLocaleString()}`
    }
  };

  await axios.post(this.config.dingtalk_config.webhook_url, message, {
    headers: { 'Content-Type': 'application/json' }
  });
}

16.5.5 WebSocket 实时推送

typescript
private async sendWebhook(notification: { type: string; title: string; content: string }) {
  const io = getIOInstance();
  if (io) {
    io.emit('notification', {
      id: randomUUID(),
      type: notification.type,
      title: notification.title,
      content: notification.content,
      timestamp: new Date().toISOString()
    });
  }
}

16.5.6 通知持久化

每条通知都会保存到数据库,支持历史记录查询和失败重试:

typescript
async sendNotification(notification: {
  type: string; title: string; content: string;
  recipient?: string; related_alert_id?: string; related_task_id?: string;
}) {
  const id = randomUUID();
  const now = new Date().toISOString();

  // 保存到数据库
  db.prepare(`
    INSERT INTO notifications (id, type, title, content, recipient, status, related_alert_id, related_task_id, created_at)
    VALUES (?, ?, ?, ?, ?, 'pending', ?, ?, ?)
  `).run(id, notification.type, notification.title, notification.content,
    notification.recipient || 'default', notification.related_alert_id || null,
    notification.related_task_id || null, now);

  // 尝试发送
  try {
    await this.send(notification);
    db.prepare('UPDATE notifications SET status = ?, sent_at = ? WHERE id = ?').run('sent', now, id);
    return { success: true, id };
  } catch (error) {
    db.prepare('UPDATE notifications SET status = ?, error_message = ? WHERE id = ?')
      .run('failed', errorMessage, id);
    return { success: false, error: errorMessage, id };
  }
}

16.5.7 失败通知重试

typescript
async retryFailedNotifications() {
  const failed = db.prepare(`
    SELECT * FROM notifications WHERE status = 'failed' ORDER BY created_at DESC
  `).all() as NotificationDB[];

  const results = [];
  for (const notification of failed) {
    const result = await this.sendNotification({
      type: notification.type,
      title: notification.title,
      content: notification.content,
      recipient: notification.recipient || undefined,
      related_alert_id: notification.related_alert_id || undefined,
      related_task_id: notification.related_task_id || undefined
    });
    results.push(result);
  }
  return results;
}

16.5.8 快捷通知方法

typescript
// 告警通知(带严重程度emoji)
async sendAlertNotification(alert: AlertRecord) {
  const severityEmoji = { critical: '🔴', high: '🟠', medium: '🟡', low: '🟢' };
  return this.sendNotification({
    type: 'alert',
    title: `${severityEmoji[alert.severity] || '⚪'} [${alert.severity?.toUpperCase()}] 新告警: ${alert.title}`,
    content: `**告警来源**: ${alert.source}\n**告警级别**: ${alert.severity}\n**告警描述**: ${alert.content}`,
    related_alert_id: alert.id
  });
}

// 任务状态通知(带状态emoji)
async sendTaskNotification(task: TaskRecord, status: string) {
  const statusEmoji = { completed: '✅', failed: '❌', running: '▶️', pending: '⏳' };
  return this.sendNotification({
    type: 'task',
    title: `${statusEmoji[status] || '⚪'} 任务状态变更: ${task.name}`,
    content: `**任务名称**: ${task.name}\n**当前状态**: ${status}`,
    related_task_id: task.id
  });
}

16.6 自动修复引擎(RemediationService)

16.6.1 修复策略定义

修复策略是告警与修复工作流之间的桥梁:

┌────────────────────────────────────────────────────────────┐
│                    RemediationPolicy                        │
├────────────────────────────────────────────────────────────┤
│  匹配条件:                                                  │
│    - alert_source: 告警来源(prometheus/zabbix/...)        │
│    - alert_severity: 严重程度过滤                           │
│    - alert_keywords: 关键词匹配                            │
│    - alert_tags: 标签匹配                                  │
│                                                            │
│  执行模式:                                                  │
│    - auto: 自动执行                                        │
│    - approval: 需要人工审批                                │
│    - suggestion: 仅发送建议                                │
│                                                            │
│  执行配置:                                                  │
│    - workflow_id: 修复工作流                               │
│    - workflow_params: 工作流参数(支持{<!-- -->{alert.xxx}<!-- -->}模板)  │
│    - max_executions_per_hour: 每小时最大执行次数           │
│    - cooldown_seconds: 冷却期                              │
│                                                            │
│  安全保障:                                                  │
│    - enable_verification: 执行后验证                       │
│    - verification_workflow_id: 验证工作流                  │
│    - enable_rollback: 失败回滚                             │
│    - rollback_workflow_id: 回滚工作流                      │
└────────────────────────────────────────────────────────────┘

16.6.2 策略创建

typescript
createPolicy(policy: Omit<RemediationPolicy, 'id' | 'created_at' | 'updated_at'>): RemediationPolicy {
  const id = uuidv4();
  const now = new Date().toISOString();

  db.prepare(`
    INSERT INTO remediation_policies (
      id, name, description, alert_source, alert_severity,
      alert_keywords, alert_tags, execution_mode, workflow_id,
      workflow_params, max_executions_per_hour, cooldown_seconds,
      require_confirmation, enable_verification, verification_workflow_id,
      verification_params, verification_timeout_seconds, enable_rollback,
      rollback_workflow_id, rollback_on_failure, enabled, created_by,
      created_at, updated_at
    ) VALUES (@id, @name, ..., @updated_at)
  `).run({ id, ...policy, created_at: now, updated_at: now });

  return this.getPolicy(id);
}

16.6.3 告警匹配策略

当告警到达时,系统查找所有匹配的修复策略:

typescript
async matchAlertToPolicies(alert: { id: string; source: string; severity?: string; title?: string; content?: string; tags?: string[] }): Promise<RemediationPolicy[]> {
  // 按告警来源查询已启用的策略
  const policies = db.prepare(`
    SELECT * FROM remediation_policies
    WHERE enabled = 1 AND alert_source = ?
    ORDER BY
      CASE alert_severity
        WHEN 'disaster' THEN 1
        WHEN 'high' THEN 2
        WHEN 'average' THEN 3
        WHEN 'warning' THEN 4
        ELSE 5
      END
  `).all(alert.source) as RemediationPolicy[];

  // 进一步过滤:严重程度、关键词、标签
  return policies.filter(policy => {
    // 严重程度匹配
    if (policy.alert_severity && policy.alert_severity !== alert.severity) {
      return false;
    }

    // 关键词匹配(OR逻辑:任一关键词出现即匹配)
    if (policy.alert_keywords) {
      const keywords = JSON.parse(policy.alert_keywords) as string[];
      const alertText = `${alert.title || ''} ${alert.content || ''}`.toLowerCase();
      if (!keywords.some(kw => alertText.includes(kw.toLowerCase()))) {
        return false;
      }
    }

    // 标签匹配(OR逻辑:任一标签匹配即通过)
    if (policy.alert_tags) {
      const tags = JSON.parse(policy.alert_tags) as string[];
      const alertTags = alert.tags || [];
      if (!tags.some(t => alertTags.includes(t))) {
        return false;
      }
    }

    return true;
  });
}

16.6.4 触发修复

typescript
async triggerRemediation(policy: RemediationPolicy, alert: { id: string; source: string; ... }): Promise<RemediationExecution> {
  // 1. 冷却期检查
  if (this.isInCooldown(policy, alert)) {
    return this.createSkippedExecution(policy, alert, 'cooldown');
  }

  // 2. 频率限制检查
  if (this.isRateLimited(policy)) {
    return this.createSkippedExecution(policy, alert, 'rate_limited');
  }

  // 3. 创建执行记录
  const id = uuidv4();
  db.prepare(`
    INSERT INTO remediation_executions (id, policy_id, alert_id, alert_snapshot, status, approval_required, created_at)
    VALUES (?, ?, ?, ?, 'pending', ?, ?)
  `).run(id, policy.id, alert.id, JSON.stringify(alert), policy.execution_mode === 'approval' ? 1 : 0, now);

  // 4. 根据执行模式分发
  switch (policy.execution_mode) {
    case 'auto':
      this.executeWorkflowAsync(id);           // 异步执行工作流
      break;
    case 'approval':
      await this.requestApproval(id);          // 发送审批请求
      break;
    case 'suggestion':
      await this.sendSuggestion(id);           // 发送修复建议
      break;
  }

  return execution;
}

16.6.5 冷却期与频率限制

typescript
// 冷却期检查:同一策略+告警在冷却期内不重复执行
private isInCooldown(policy: RemediationPolicy, alert: { id: string }): boolean {
  const result = db.prepare(`
    SELECT cooldown_until FROM remediation_cooldowns
    WHERE policy_id = ? AND alert_id = ?
  `).get(policy.id, alert.id);

  if (!result) return false;
  return new Date().toISOString() < result.cooldown_until;
}

// 频率限制:同一策略每小时最大执行次数
private isRateLimited(policy: RemediationPolicy): boolean {
  const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString();
  const result = db.prepare(`
    SELECT COUNT(*) as count FROM remediation_executions
    WHERE policy_id = ? AND created_at > ?
  `).get(policy.id, oneHourAgo);

  return result.count >= policy.max_executions_per_hour;
}

16.6.6 工作流执行

typescript
async executeWorkflow(executionId: string): Promise<void> {
  const execution = this.getExecution(executionId);
  const policy = this.getPolicy(execution.policy_id);
  const alert = JSON.parse(execution.alert_snapshot);

  if (!policy.workflow_id) {
    this.updateExecutionStatus(executionId, 'failed', 'No workflow configured');
    return;
  }

  this.updateExecution(executionId, { status: 'running', started_at: new Date().toISOString() });
  const startTime = Date.now();

  // 获取工作流定义
  const workflow = db.prepare('SELECT * FROM workflows WHERE id = ?').get(policy.workflow_id);

  // 解析工作流参数(支持 {<!-- -->{alert.xxx}<!-- -->} 模板替换)
  const params = this.resolveParams(policy.workflow_params, alert);

  // 创建任务记录
  const taskId = uuidv4();
  db.prepare(`
    INSERT INTO tasks (id, workflow_id, name, status, context, created_at)
    VALUES (?, ?, ?, 'pending', ?, CURRENT_TIMESTAMP)
  `).run(taskId, workflow.id, `自动修复: ${workflow.name}`, JSON.stringify(params));

  // 执行工作流
  await executeWorkflow(taskId, parsedWorkflow, undefined, params);

  // 更新执行结果
  this.updateExecution(executionId, {
    workflow_execution_id: taskId,
    completed_at: new Date().toISOString(),
    execution_duration_ms: Date.now() - startTime
  });

  // 验证结果(如果配置了验证)
  if (policy.enable_verification && policy.verification_workflow_id) {
    await this.verifyResult(executionId);
  } else {
    this.updateExecutionStatus(executionId, 'success');
    this.resolveAlert(execution.alert_id);
    this.updateCooldown(policy, alert);
  }
}

16.6.7 参数模板替换

工作流参数支持 {<!-- -->{alert.xxx}<!-- -->} 模板语法,将告警数据注入到工作流参数中:

typescript
private resolveParams(paramsJson: string | undefined, alert: Record<string, unknown>): Record<string, unknown> {
  if (!paramsJson) return {};
  const params = JSON.parse(paramsJson);
  const resolved: Record<string, unknown> = {};

  for (const [key, value] of Object.entries(params)) {
    if (typeof value === 'string') {
      resolved[key] = value.replace(/\{\{alert\.(\w+)\}\}/g, (_match, prop) => {
        const val = alert[prop];
        return val !== undefined && val !== null ? String(val) : '';
      });
    } else {
      resolved[key] = value;
    }
  }

  return resolved;
}

示例:

json
{
  "server_id": "{<!-- -->{alert.host}<!-- -->}",
  "command": "systemctl restart nginx",
  "max_retries": 3
}

当告警的 host 字段为 "192.168.1.100" 时,解析结果为:

json
{
  "server_id": "192.168.1.100",
  "command": "systemctl restart nginx",
  "max_retries": 3
}

16.6.8 验证机制

执行修复后,可通过独立的工作流验证修复效果:

typescript
async verifyResult(executionId: string): Promise<{ success: boolean }> {
  const policy = this.getPolicy(execution.policy_id);
  const alert = JSON.parse(execution.alert_snapshot);

  // 获取验证工作流
  const workflow = db.prepare('SELECT * FROM workflows WHERE id = ?')
    .get(policy.verification_workflow_id);

  const params = this.resolveParams(policy.verification_params, alert);
  const timeout = policy.verification_timeout_seconds * 1000;
  const taskId = uuidv4();

  // 带超时控制的执行
  const result = await Promise.race([
    executeWorkflow(taskId, parsedWorkflow, undefined, params),
    new Promise((_, reject) =>
      setTimeout(() => reject(new Error('Verification timeout')), timeout)
    )
  ]);

  this.updateExecution(executionId, {
    verification_status: 'success',
    status: 'success'
  });

  // 验证通过 → 解决告警、更新冷却
  this.resolveAlert(execution.alert_id);
  this.updateCooldown(policy, alert);

  return { success: true, result };
}

16.6.9 回滚机制

修复失败时自动触发回滚:

typescript
async rollbackExecution(executionId: string): Promise<void> {
  const policy = this.getPolicy(execution.policy_id);

  if (!policy.rollback_workflow_id) {
    logger.warn(`No rollback workflow configured for policy ${policy.id}`);
    return;
  }

  const workflow = db.prepare('SELECT * FROM workflows WHERE id = ?')
    .get(policy.rollback_workflow_id);

  const taskId = uuidv4();
  db.prepare(`
    INSERT INTO tasks (id, workflow_id, name, status, context, created_at)
    VALUES (?, ?, ?, 'pending', ?, CURRENT_TIMESTAMP)
  `).run(taskId, workflow.id, `回滚: ${workflow.name}`, JSON.stringify({ execution_id: executionId }));

  const result = await executeWorkflow(taskId, parsedWorkflow);

  this.updateExecution(executionId, {
    rollback_triggered: 1,
    rollback_execution_id: taskId,
    rollback_result: JSON.stringify(result),
    status: 'rolled_back'
  });
}

16.6.10 审批模式

当执行模式为 approval 时,需要人工审批后才能执行:

typescript
// 请求审批
private async requestApproval(execution: RemediationExecution): Promise<void> {
  this.updateExecution(execution.id, { status: 'waiting_approval' });

  const policy = this.getPolicy(execution.policy_id);
  const alert = JSON.parse(execution.alert_snapshot);

  await notificationService.sendNotification({
    type: 'remediation_approval',
    title: '修复审批请求',
    content: `策略: ${policy.name}\n告警: ${alert.title || 'Unknown'}\n请审批执行`,
    related_alert_id: execution.alert_id
  });
}

// 审批处理
async approveExecution(executionId: string, action: 'approve' | 'reject', userId: string, comment?: string): Promise<void> {
  const execution = this.getExecution(executionId);
  if (execution.status !== 'waiting_approval') {
    throw new Error('Execution is not waiting for approval');
  }

  if (action === 'approve') {
    this.updateExecution(executionId, {
      status: 'approved',
      approved_by: userId,
      approved_at: new Date().toISOString(),
      approval_comment: comment
    });
    this.executeWorkflowAsync(executionId);  // 批准后执行工作流
  } else {
    this.updateExecution(executionId, {
      status: 'rejected',
      approved_by: userId,
      approved_at: new Date().toISOString(),
      approval_comment: comment,
      completed_at: new Date().toISOString()
    });
  }
}

16.6.11 完整修复流程

告警到达


┌─────────────────────┐
│  matchAlertToPolicies │  查找匹配策略(按source+severity+keywords+tags)
└──────────┬──────────┘
           │ 匹配到策略

┌─────────────────────┐
│  isInCooldown?       │  冷却期检查
└──────────┬──────────┘
     是 ──► 跳过记录(status=skipped, reason=cooldown)


┌─────────────────────┐
│  isRateLimited?      │  频率限制检查
└──────────┬──────────┘
     是 ──► 跳过记录(status=skipped, reason=rate_limited)


┌─────────────────────┐
│  创建执行记录         │  status=pending
└──────────┬──────────┘


    ┌──────┴──────┐
    │ execution_mode │
    └──────┬──────┘

    ┌──────┼──────┐
    │      │      │
  auto  approval suggestion
    │      │      │
    ▼      ▼      ▼
  执行    等待审批  发送建议
  工作流   │

     审批通过?

    是─┤─否
       │  │
       ▼  ▼
     执行  记录rejected
     工作流


  ┌────┴─────┐
  │ 执行成功? │
  └────┬─────┘
    是─┤─否
       │  │
       ▼  ▼
  ┌────┴──┐  ┌──────────┐
  │验证配置?│  │enable_rollback?
  └───┬───┘  └────┬─────┘
    是┤─否       是┤─否
      │  │         │  │
      ▼  ▼         ▼  ▼
   验证  解决告警  执行回滚 记录failed
   工作流 更新冷却  工作流

    成功?
    是┤─否
      │  │
      ▼  ▼
   解决告警 记录failed
   更新冷却 可能触发回滚

16.6.12 策略统计

typescript
async getPolicyStats(policyId: string, days: number): Promise<PolicyStats> {
  const sinceDate = new Date(Date.now() - days * 24 * 60 * 60 * 1000).toISOString();

  // 总触发次数
  const total = db.prepare(`
    SELECT COUNT(*) as count FROM remediation_executions
    WHERE policy_id = ? AND created_at > ?
  `).get(policyId, sinceDate).count;

  // 成功次数
  const successCount = db.prepare(`
    SELECT COUNT(*) as count FROM remediation_executions
    WHERE policy_id = ? AND status = 'success' AND created_at > ?
  `).get(policyId, sinceDate).count;

  // 失败次数
  const failedCount = db.prepare(`
    SELECT COUNT(*) as count FROM remediation_executions
    WHERE policy_id = ? AND status = 'failed' AND created_at > ?
  `).get(policyId, sinceDate).count;

  // 回滚次数
  const rolledBackCount = db.prepare(`
    SELECT COUNT(*) as count FROM remediation_executions
    WHERE policy_id = ? AND status = 'rolled_back' AND created_at > ?
  `).get(policyId, sinceDate).count;

  // 平均执行时长
  const avgDuration = db.prepare(`
    SELECT AVG(execution_duration_ms) as avg_duration FROM remediation_executions
    WHERE policy_id = ? AND execution_duration_ms IS NOT NULL AND created_at > ?
  `).get(policyId, sinceDate).avg_duration;

  // 日维度统计
  const dailyStats = db.prepare(`
    SELECT DATE(created_at) as date,
           COUNT(*) as triggers,
           SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
           SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
    FROM remediation_executions
    WHERE policy_id = ? AND created_at > ?
    GROUP BY DATE(created_at) ORDER BY date
  `).all(policyId, sinceDate);

  return {
    total_triggers: total,
    success_count: successCount,
    failed_count: failedCount,
    rolled_back_count: rolledBackCount,
    success_rate: total > 0 ? Math.round((successCount / total) * 10000) / 100 : 0,
    avg_duration_ms: avgDuration ? Math.round(avgDuration) : 0,
    daily_stats: dailyStats
  };
}

16.7 数据库表结构

16.7.1 告警表

sql
CREATE TABLE alerts (
  id TEXT PRIMARY KEY,
  source TEXT NOT NULL,
  severity TEXT NOT NULL,
  title TEXT NOT NULL,
  content TEXT,
  status TEXT NOT NULL DEFAULT 'firing',
  external_id TEXT,
  host TEXT,
  metadata TEXT,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

16.7.2 告警降噪表

sql
CREATE TABLE alert_noise_reduction (
  id TEXT PRIMARY KEY,
  alert_fingerprint TEXT NOT NULL UNIQUE,
  alert_source TEXT NOT NULL,
  alert_title TEXT NOT NULL,
  occurrence_count INTEGER NOT NULL DEFAULT 1,
  first_occurrence TEXT NOT NULL,
  last_occurrence TEXT NOT NULL,
  is_suppressed INTEGER NOT NULL DEFAULT 0,
  suppression_reason TEXT,
  suppression_until TEXT
);

16.7.3 通知表

sql
CREATE TABLE notifications (
  id TEXT PRIMARY KEY,
  type TEXT NOT NULL,
  title TEXT NOT NULL,
  content TEXT,
  recipient TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'pending',
  related_alert_id TEXT,
  related_task_id TEXT,
  error_message TEXT,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  sent_at TEXT
);

16.7.4 修复策略表

sql
CREATE TABLE remediation_policies (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  description TEXT,
  alert_source TEXT NOT NULL,
  alert_severity TEXT,
  alert_keywords TEXT,
  alert_tags TEXT,
  execution_mode TEXT NOT NULL,
  workflow_id TEXT,
  workflow_params TEXT,
  max_executions_per_hour INTEGER NOT NULL,
  cooldown_seconds INTEGER NOT NULL,
  require_confirmation INTEGER,
  enable_verification INTEGER NOT NULL DEFAULT 0,
  verification_workflow_id TEXT,
  verification_params TEXT,
  verification_timeout_seconds INTEGER,
  enable_rollback INTEGER NOT NULL DEFAULT 0,
  rollback_workflow_id TEXT,
  rollback_on_failure INTEGER,
  enabled INTEGER NOT NULL DEFAULT 1,
  created_by TEXT,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

16.7.5 修复执行表

sql
CREATE TABLE remediation_executions (
  id TEXT PRIMARY KEY,
  policy_id TEXT NOT NULL,
  alert_id TEXT NOT NULL,
  alert_snapshot TEXT NOT NULL,
  status TEXT NOT NULL,
  status_reason TEXT,
  approval_required INTEGER DEFAULT 0,
  approved_by TEXT,
  approved_at TEXT,
  approval_comment TEXT,
  workflow_execution_id TEXT,
  started_at TEXT,
  completed_at TEXT,
  execution_duration_ms INTEGER,
  execution_result TEXT,
  verification_status TEXT,
  verification_result TEXT,
  verification_completed_at TEXT,
  rollback_triggered INTEGER DEFAULT 0,
  rollback_execution_id TEXT,
  rollback_completed_at TEXT,
  rollback_result TEXT,
  created_at TEXT NOT NULL DEFAULT (datetime('now'))
);

16.7.6 冷却表

sql
CREATE TABLE remediation_cooldowns (
  policy_id TEXT NOT NULL,
  alert_id TEXT NOT NULL,
  cooldown_until TEXT NOT NULL,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  PRIMARY KEY (policy_id, alert_id)
);

16.8 完整告警处理链路

以 Prometheus 告警到达为例:

1. Prometheus Alertmanager 发送 POST Webhook 到后端
2. 后端路由收到请求,调用 detectSourceType(payload) → 'prometheus'
3. 调用 adaptPrometheus(payload) → NormalizedAlert[]
4. 对每条 NormalizedAlert:
   ├── alertNoiseReductionService.processAlert(source, title, content, severity)
   │   ├── 生成指纹: md5("prometheus:cpu usage on server is ")
   │   ├── 查询数据库:是否已有此指纹?
   │   │   ├── 否 → 新记录,shouldNotify=true
   │   │   └── 是 → 发生次数+1
   │   │       ├── <5次 → shouldNotify=true
   │   │       └── ≥5次 + severity 非 critical → 自动抑制30分钟
   ├── 如果 shouldNotify=true:
   │   ├── 保存到 alerts 表
   │   ├── notificationService.sendAlertNotification(alert)
   │   │   ├── 发送企业微信(Markdown格式)
   │   │   ├── 发送钉钉(Markdown格式)
   │   │   ├── 发送邮件(HTML格式)
   │   │   └── 发送WebSocket(前端实时展示)
   │   └── remediationService.matchAlertToPolicies(alert)
   │       ├── 查找匹配策略
   │       ├── 对每个匹配策略:
   │       │   ├── isInCooldown? → 是则跳过
   │       │   ├── isRateLimited? → 是则跳过
   │       │   └── triggerRemediation(policy, alert)
   │       │       ├── execution_mode=auto → executeWorkflowAsync
   │       │       │   ├── executeWorkflow() → 执行修复工作流
   │       │       │   ├── 成功 → 验证 → 解决告警 → 更新冷却
   │       │       │   └── 失败 → 回滚(如果配置)
   │       │       ├── execution_mode=approval → 等待人工审批
   │       │       └── execution_mode=suggestion → 发送建议通知
   └── 如果 shouldNotify=false(被抑制):
       └── 仅记录到数据库,不发送通知

本章小结

本章系统讲解了 ITOps Agent Platform 告警中心与通知系统的完整实现:

  • 告警源适配器:5种适配器(Prometheus/Zabbix/Grafana/Aliyun/Tencent)将不同格式的告警归一化为 NormalizedAlert,包含严重程度映射、来源自动检测
  • 告警规则引擎:基于内存 Map 的规则存储、阈值检查、冷却期防抖动、多渠道通知分发
  • 告警降噪:MD5 指纹算法(标题归一化)、重复检测(SQLite 持久化)、自动抑制(5次阈值 + critical 豁免 + 30分钟 TTL)
  • 通知服务:多渠道并发(企业微信/钉钉/邮件/Webhook/WebSocket)、持久化记录、失败重试、emoji 增强
  • 自动修复:策略匹配(source/severity/keywords/tags)、冷却+限流、三种执行模式(auto/approval/suggestion)、工作流驱动修复、验证工作流、回滚工作流
  • 数据库设计:6 张核心表(alerts、alert_noise_reduction、notifications、remediation_policies、remediation_executions、remediation_cooldowns)

这些组件共同构建了一个从告警接收到自动修复的闭环系统。

本章练习

基础练习

  1. 新增告警源适配器:为 AWS CloudWatch 编写 adaptCloudWatch 适配器。CloudWatch SNS 通知的 JSON 格式包含 AlarmNameNewStateValueNewStateReasonAWSAccountId 等字段。实现归一化为 NormalizedAlert

  2. 自定义冷却期规则:在 alertService.ts 中,允许每条规则配置动态冷却期(根据严重程度调整:critical 冷却期更短,warning 冷却期更长)。修改 AlertRule 接口和 triggerAlert 逻辑。

  3. 通知历史分页查询:为 notificationService.tsgetNotificationHistory 方法添加分页支持,参数包括 pagelimitstatus(过滤已发送/失败),返回 {notifications, total, page, totalPages}

进阶练习

  1. 告警关联分析:设计一个告警关联引擎,能够识别同一时间段内多台服务器的相似告警(如 "CPU usage high" 在 5 分钟内出现在 10 台服务器上),将它们聚合为一个"事件"并生成根因分析报告。

  2. 修复策略版本管理:为 remediation_policies 表添加版本控制。每次策略更新时,将旧版本保存到 remediation_policy_versions 表中。实现策略回滚到历史版本的功能,以及版本差异比较。

  3. 通知渠道降级:当某个通知渠道连续失败 3 次时自动禁用该渠道并切换到备用渠道。实现渠道健康度监控面板,展示各渠道的发送成功率、平均延迟、最近失败原因。

思考题

  1. 告警风暴场景设计:假设一个核心交换机故障导致 500 台服务器同时产生告警(网络不可达)。分析当前降噪系统在这种场景下的表现,讨论是否需要引入额外的降噪策略(如拓扑感知聚合、时间窗口聚合、依赖关系推断),并给出设计方案。

  2. 自动修复的安全性边界:自动修复系统可以执行工作流中的任何操作(包括危险的服务器命令)。讨论如何在自动修复系统中建立安全边界,包括:修复动作的权限分级、修复前备份创建、修复过程中的实时审计、修复失败时的快速回滚。如何在自动化效率和安全性之间取得平衡?

延伸阅读

基于 MPL-2.0 许可证发布