Skip to content

第11章 实时通信开发

作者

谭策 — 独立开发者 | AIOps 领域探索者

IT Online 微信公众号

许可证

MPL-2.0 © 谭策

本章导读

在现代 IT 运维平台中,实时通信是连接后端服务与前端用户的核心纽带。当运维人员通过 Web 终端远程管理服务器时,需要毫秒级的命令响应;当 Agent 执行多步任务时,需要实时推送执行进度;当系统产生新告警时,需要立即通知相关人员。传统的 HTTP 请求/响应模式无法满足这些场景的低延迟和双向通信需求。

本章将深入讲解 ITOps Agent Platform 如何使用 Socket.IO 构建完整的实时通信架构,涵盖服务端配置、事件设计、客户端集成、连接管理和 xterm.js 终端集成等核心主题。通过本章学习,你将掌握如何为运维平台设计高可靠、可扩展的 WebSocket 通信方案。

学习目标

  • 理解 WebSocket 协议和 Socket.IO 的工作原理
  • 掌握 Socket.IO 服务端配置和中间件使用
  • 学会设计结构化的 WebSocket 事件体系
  • 掌握 WebSocket 客户端集成和状态管理
  • 理解心跳检测、重连机制等连接管理策略
  • 掌握 xterm.js 与 WebSocket 的集成方式
  • 了解实时数据流在告警推送和任务进度中的应用

11.1 WebSocket 与 Socket.IO 基础

11.1.1 为什么选择 WebSocket

HTTP 协议采用请求/响应模型,客户端发起请求后必须等待服务器响应。这种模式存在三个关键局限:

局限说明运维场景影响
单向通信只能客户端 → 服务端服务器无法主动推送告警
连接开销每次请求新建 TCP 连接终端输入延迟高
无状态服务端不知道客户端状态无法感知终端是否在线

WebSocket 通过一次 HTTP 握手升级为持久化的 TCP 双向通道,解决了上述问题:

客户端                    服务端
  │                         │
  ├──── HTTP Upgrade ──────►│  (握手)
  │◄──── 101 Switching ─────┤  (升级成功)
  │                         │
  ├──── WebSocket Frame ───►│  (双向通信)
  │◄──── WebSocket Frame ───┤
  │         ...             │
  │◄──── Server Push ───────┤  (服务端主动推送)

11.1.2 Socket.IO 的优势

原生 WebSocket API 功能有限,缺少重连、房间、事件命名等高级特性。Socket.IO 在 WebSocket 之上提供了丰富的工程化能力:

┌─────────────────────────────────────────────┐
│            Socket.IO 应用层                   │
├─────────────────────────────────────────────┤
│  自动重连 │ 事件命名 │ 房间模型 │ 二进制支持    │
├─────────────────────────────────────────────┤
│            Socket.IO 传输层                   │
├─────────────────────────────────────────────┤
│  WebSocket │ HTTP Long-Polling │ Engine.IO    │
├─────────────────────────────────────────────┤
│            TCP / TLS 传输                     │
└─────────────────────────────────────────────┘

项目选择 Socket.IO 的核心原因:

  • 自动降级:WebSocket 不可用时自动切换到 HTTP 长轮询
  • 房间模型:天然的组播能力,适合任务/告警订阅场景
  • ACK 回调:服务端响应可返回结果到客户端回调函数
  • TypeScript 支持:完整的类型定义,减少运行时错误

11.2 Socket.IO 服务端实现

11.2.1 服务初始化与配置

Socket.IO 服务在应用入口 app.ts 中随 HTTP 服务器一起创建:

typescript
// backend/src/app.ts
import { createServer } from 'http';
import { Server as SocketIOServer } from 'socket.io';
import { env } from './utils/env';

const app = express();
const httpServer = createServer(app);

const io = new SocketIOServer(httpServer, {
  cors: {
    origin: env.ALLOWED_ORIGINS,       // 跨域白名单
    methods: ['GET', 'POST']
  },
  maxHttpBufferSize: 1e6,              // 1MB 消息大小限制
  pingTimeout: 60000,                  // 60秒无响应判定断开
  pingInterval: 25000                  // 25秒发送一次心跳
});

// Socket.IO 共享同一个 HTTP 服务器实例,共用端口
setupWebSocket(io);
setIOInstance(io);                      // 将 io 实例暴露给其他模块使用

httpServer.listen(PORT, HOST, () => {
  logger.info(`🚀 ITOps Agent Platform Backend running on ${HOST}:${PORT}`);
  logger.info(`📡 WebSocket server ready`);
});

关键配置说明

配置项默认值作用
cors.origin必须指定限制允许连接的前端域名
maxHttpBufferSize1MB防止恶意大消息耗尽内存
pingTimeout60s超过此时间无响应则判定断开
pingInterval25s每隔此时间发送一次心跳探测

11.2.2 JWT 认证中间件

WebSocket 连接需要与 REST API 使用相同的身份认证体系。Socket.IO 提供 io.use() 中间件机制,在连接建立时进行认证:

typescript
// backend/src/websocket/handler.ts
import { Server as SocketIOServer, Socket } from 'socket.io';
import jwt from 'jsonwebtoken';
import db from '../models/database';
import type { User } from '../types';

interface SocketWithUser extends Socket {
  user?: User;           // 认证成功后附加用户信息
  isAlive?: boolean;     // 心跳状态标记
}

function authenticateSocket(socket: Socket, next: (err?: Error) => void) {
  // 从 auth.token 或 Authorization header 获取 Token
  const token = socket.handshake.auth?.token || 
                socket.handshake.headers?.authorization?.replace('Bearer ', '');

  if (!token) {
    logger.error('❌ WebSocket 认证失败: 未提供 token');
    return next(new Error('未提供认证token'));
  }

  try {
    // 验证 JWT 签名和过期时间
    const decoded = jwt.verify(token, env.JWT_SECRET) as { id: string };
    
    // 从数据库查询用户状态
    const user = db.prepare(
      'SELECT id, username, email, role, enabled FROM users WHERE id = ?'
    ).get(decoded.id) as User | undefined;
    
    if (!user || !user.enabled) {
      return next(new Error('用户不存在或已禁用'));
    }

    // 将用户信息附加到 socket 对象
    (socket as SocketWithUser).user = user;
    logger.info(`✅ WebSocket 认证成功: ${user.username}`);
    next();
  } catch (error: unknown) {
    return next(new Error('无效的token'));
  }
}

export function setupWebSocket(io: SocketIOServer) {
  // 注册认证中间件
  io.use(authenticateSocket);
  // ... 后续事件处理
}

认证流程图

客户端连接


io.use(authenticateSocket)

    ├── 未提供 Token ──► next(Error) ──► 连接拒绝

    ├── Token 无效 ────► next(Error) ──► 连接拒绝

    ├── 用户不存在/禁用 ─► next(Error) ──► 连接拒绝

    └── 认证成功 ──────► socket.user = user ──► next() ──► 进入 connection 事件

11.2.3 心跳检测机制

网络环境复杂,TCP 连接可能已经断开但双方都不知道。心跳检测通过定期探测确保连接活性:

typescript
// backend/src/websocket/handler.ts
const HEARTBEAT_INTERVAL = 30000;   // 30秒发送一次探测
const HEARTBEAT_TIMEOUT = 5000;     // 5秒内必须响应

export function setupWebSocket(io: SocketIOServer) {
  io.use(authenticateSocket);

  // 定时心跳检测
  const heartbeatInterval = setInterval(() => {
    io.sockets.sockets.forEach((socket) => {
      const socketWithUser = socket as SocketWithUser;
      
      // 上次标记为 false 且未收到 pong → 判定失联
      if (socketWithUser.isAlive === false) {
        logger.warn(`💔 WebSocket client ${socket.id} did not respond to ping`);
        socket.disconnect();
        return;
      }
      
      // 标记为待验证,发送 ping 事件
      socketWithUser.isAlive = false;
      socket.emit('ping');
    });
  }, HEARTBEAT_INTERVAL);

  io.on('connection', (socket: Socket) => {
    (socket as SocketWithUser).isAlive = true;

    // 客户端收到 ping 后回复 pong
    socket.on('pong', () => {
      (socket as SocketWithUser).isAlive = true;
    });

    // Engine.IO 底层的 ping/pong 检测
    let pingTimeout: NodeJS.Timeout | null = null;
    
    socket.conn.on('ping', () => {
      pingTimeout = setTimeout(() => {
        logger.warn(`💔 WebSocket client ${socket.id} ping timeout`);
        socket.disconnect();
      }, HEARTBEAT_TIMEOUT);
    });

    socket.conn.on('pong', () => {
      if (pingTimeout) {
        clearTimeout(pingTimeout);
        pingTimeout = null;
      }
    });
    
    // ... 其他事件处理
  });

  io.on('close', () => {
    clearInterval(heartbeatInterval);
  });
}

心跳检测流程

时间线 ───────────────────────────────────────────────►

Server:  emit('ping')                emit('ping')
              │                           │
              │  30s 间隔                  │  30s 间隔
              ▼                           ▼
Client:  emit('pong') ◄──────────── emit('pong')
              │                           │
    isAlive = true              isAlive = true
              │                           │
              │                    (未收到 pong)
              │                           │
              │                    isAlive = false
              │                    下次检测 → disconnect

项目实现了双层心跳检测

  1. 应用层心跳:自定义 ping/pong 事件,30秒间隔
  2. 传输层心跳:Engine.IO 内置的 ping/pong,由 pingInterval/pingTimeout 配置控制

11.2.4 房间模型与订阅机制

Socket.IO 的房间(Room)是一个逻辑分组,可以向组内所有成员广播消息。项目定义了三种房间模型:

房间类型命名模式用途订阅方式
任务房间task:{taskId}订阅特定任务的执行进度task:subscribe
告警房间alerts订阅所有新告警通知alert:subscribe
终端房间terminal:{sessionId}单个终端会话terminal:open 自动加入
typescript
// backend/src/websocket/handler.ts
const taskRooms = new Map<string, Set<string>>();  // 追踪每个任务的订阅者

io.on('connection', (socket: Socket) => {

  // 订阅任务进度
  socket.on('task:subscribe', (taskId: string) => {
    socket.join(`task:${taskId}`);
    if (!taskRooms.has(taskId)) {
      taskRooms.set(taskId, new Set());
    }
    taskRooms.get(taskId)!.add(socket.id);
    logger.info(`📡 Client ${socket.id} subscribed to task ${taskId}`);
  });

  // 取消订阅任务
  socket.on('task:unsubscribe', (taskId: string) => {
    socket.leave(`task:${taskId}`);
    taskRooms.get(taskId)?.delete(socket.id);
  });

  // 订阅告警通知
  socket.on('alert:subscribe', () => {
    socket.join('alerts');
    logger.info(`🔔 Client ${socket.id} subscribed to alerts`);
  });

  // 断开连接时清理
  socket.on('disconnect', () => {
    taskRooms.forEach((sockets) => {
      sockets.delete(socket.id);
    });
  });
});

11.2.5 消息推送辅助函数

为了在不同模块中方便地推送消息,项目提供了三个辅助函数:

typescript
// backend/src/websocket/handler.ts
import { Server as SocketIOServer } from 'socket.io';

// 向特定任务房间推送
export function emitToTask(io: SocketIOServer, taskId: string, event: string, data: Record<string, unknown>) {
  io.to(`task:${taskId}`).emit(event, { taskId, ...data });
}

// 向告警房间推送
export function emitToAlerts(io: SocketIOServer, event: string, data: Record<string, unknown>) {
  io.to('alerts').emit(event, data);
}

// 向所有客户端广播
export function broadcast(io: SocketIOServer, event: string, data: Record<string, unknown>) {
  io.emit(event, data);
}

使用示例

typescript
// 多 Agent 协作任务完成时推送
// backend/src/routes/multiAgentRoutes.ts
import { emitToTask } from '../websocket/handler';
import { io } from '../app';

emitToTask(io!, taskId, 'task:completed', {
  result: finalResult,
  duration: Date.now() - startTime
});

// Webhook 收到新告警时推送
// backend/src/routes/webhookRoutes.ts
io.emit('alert:new', { 
  id, 
  source: alert.source, 
  severity, 
  title, 
  content, 
  taskId, 
  host: alert.host 
});

// 自动修复开始时推送
// backend/src/routes/alertRoutes.ts
import { emitToAlerts } from '../websocket/handler';

emitToAlerts(io!, 'remediation:started', {
  alertId,
  policyId,
  startTime: new Date().toISOString()
});

11.3 WebSocket 事件设计

11.3.1 事件分类体系

项目中的 WebSocket 事件按功能分为四大类:

WebSocket 事件体系
├── SSH 终端类 (terminal:*)
│   ├── terminal:open        (ACK)  创建终端会话
│   ├── terminal:data        (双向)  终端数据传输
│   ├── terminal:resize      (单向)  终端尺寸调整
│   ├── terminal:close       (单向)  关闭终端会话
│   └── terminal:close-session:{id} (单向) 特定会话关闭通知

├── 任务执行类 (task:*)
│   ├── task:subscribe       (单向)  订阅任务进度
│   ├── task:unsubscribe     (单向)  取消订阅
│   ├── task:progress        (推送)  执行进度更新
│   ├── task:completed       (推送)  任务完成
│   └── task:error           (推送)  任务出错

├── 告警通知类 (alert:*)
│   ├── alert:subscribe      (单向)  订阅告警
│   ├── alert:new            (推送)  新告警产生
│   ├── alert:resolved       (推送)  告警已解决
│   ├── remediation:started  (推送)  自动修复开始
│   ├── remediation:result   (推送)  修复执行结果
│   └── remediation:completed (推送) 修复流程完成

└── 连接管理类
    ├── ping                 (探测)  服务端心跳探测
    ├── pong                 (响应)  客户端心跳响应
    ├── connect              (系统)  连接建立
    ├── disconnect           (系统)  连接断开
    └── connect_error        (系统)  连接错误

11.3.2 SSH 终端事件详解

SSH Web 终端是项目中最重要的实时通信场景,涉及复杂的双向数据流:

事件交互时序

前端 WebTerminal              后端 handler.ts              terminalService         SSH Server
      │                          │                           │                       │
      │──── terminal:open ──────►│                           │                       │
      │   {serverId,cols,rows}   │                           │                       │
      │                          │── createTerminalSession ─►│                       │
      │                          │                           │── SSH connect ───────►│
      │                          │                           │◄── ready ─────────────│
      │                          │                           │── shell ─────────────►│
      │◄── callback ────────────│                           │◄── stream data ───────│
      │   {sessionId}            │                           │                       │
      │                          │                           │                       │
      │──── terminal:data ──────►│                           │                       │
      │   {sessionId, "ls\n"}    │── sendData ──────────────►│                       │
      │                          │── shell.write("ls\n") ──►│                       │
      │                          │                           │── stream data ───────►│
      │                          │◄── emit('terminal:data')─│◄── "total 42\n" ◄─────│
      │◄── terminal:data ───────│                           │                       │
      │   {sessionId, data}      │                           │                       │
      │                          │                           │                       │
      │──── terminal:resize ────►│                           │                       │
      │   {sessionId,cols,rows}  │── resizeTerminal ───────►│                       │
      │                          │── shell.setWindow() ────►│                       │
      │                          │                           │                       │
      │──── terminal:close ─────►│                           │                       │
      │   {sessionId}            │                           │                       │
      │                          │── closeTerminalSession ─►│── conn.end() ────────►│

服务端终端事件处理

typescript
// backend/src/websocket/handler.ts

// 打开终端会话(使用 ACK 回调返回结果)
socket.on('terminal:open', async (
  data: { serverId: string; cols: number; rows: number },
  callback: (result: { sessionId?: string; error?: string }) => void
) => {
  try {
    const result = await terminalService.createTerminalSession(
      data.serverId, data.cols, data.rows
    );
    
    if (result.error) {
      callback({ error: result.error });
      return;
    }

    // 加入终端房间
    socket.join(`terminal:${result.sessionId}`);

    // 绑定 shell 数据流转发
    const shellDataHandler = (shellData: Buffer) => {
      socket.emit('terminal:data', {
        sessionId: result.sessionId,
        data: shellData.toString('utf-8')
      });
    };
    result.shell.on('data', shellDataHandler);

    // 清理:终端断开时移除监听
    socket.on('terminal:disconnect', () => {
      result.shell.removeListener('data', shellDataHandler);
    });
    socket.on(`terminal:close-session:${result.sessionId}`, () => {
      result.shell.removeListener('data', shellDataHandler);
    });

    callback({ sessionId: result.sessionId });
  } catch (err) {
    callback({ error: err instanceof Error ? err.message : 'Unknown error' });
  }
});

// 接收终端输入数据
socket.on('terminal:data', (data: { sessionId: string; data: string }) => {
  const role = (socket as SocketWithUser).user?.role;
  // 包含角色权限的命令安全检查
  terminalService.sendData(data.sessionId, data.data, role);
});

// 终端窗口大小调整
socket.on('terminal:resize', (data: { sessionId: string; cols: number; rows: number }) => {
  terminalService.resizeTerminal(data.sessionId, data.cols, data.rows);
});

// 关闭终端
socket.on('terminal:close', (data: { sessionId: string }) => {
  socket.leave(`terminal:${data.sessionId}`);
  socket.emit(`terminal:close-session:${data.sessionId}`);
  terminalService.closeTerminalSession(data.sessionId);
});

11.3.3 任务进度事件

多 Agent 协作任务执行时,前端需要实时接收每个 Agent 的执行结果:

typescript
// backend/src/routes/multiAgentRoutes.ts
import { emitToTask } from '../websocket/handler';

// 任务完成时推送最终结果
emitToTask(io!, taskId, 'task:completed', {
  result: finalResult,
  duration: Date.now() - startTime
});

// 任务出错时推送错误信息
emitToTask(io!, taskId, 'task:error', {
  error: error.message,
  failedAt: currentAgent.name
});

11.3.4 告警推送事件

告警系统涉及多个推送场景:

事件名触发时机数据内容
alert:newWebhook 收到新告警{id, source, severity, title, host}
alert:resolved告警自动解决{source, title, host}
remediation:started自动修复开始{alertId, policyId, startTime}
remediation:result修复命令执行结果{alertId, command, output, success}
remediation:completed修复流程完成{alertId, totalSteps, successCount}
remediation:error修复流程出错{alertId, error, failedStep}

11.4 Socket.IO 客户端集成

11.4.1 连接建立与认证

客户端使用 socket.io-client 库建立连接,在连接配置中携带 JWT Token:

typescript
// frontend/src/components/WebTerminal.tsx
import { io, Socket } from 'socket.io-client';

const socket = io(import.meta.env.VITE_API_URL || 'http://localhost:3001', {
  auth: { token },              // JWT Token 认证
  transports: ['websocket']     // 强制使用 WebSocket,禁用降级
});

配置说明

配置项说明
auth.tokenJWT string与服务端认证中间件配合
transports['websocket']终端场景强制 WebSocket 保证低延迟
URLVITE_API_URL后端服务地址

11.4.2 状态管理与生命周期

Web 终端组件管理了 5 种连接状态:

typescript
const [status, setStatus] = useState<
  'connecting' |    // 正在建立连接
  'connected' |     // 连接成功,终端可用
  'error' |         // 连接失败,显示错误信息
  'disconnected'    // 连接已断开
>('connecting');

状态流转图

                    ┌─────────────┐
                    │ connecting  │
                    └──────┬──────┘

              ┌────────────┼────────────┐
              ▼            ▼            ▼
       ┌──────────┐ ┌──────────┐ ┌────────────┐
       │connected │ │  error   │ │disconnected│
       └──────────┘ └──────────┘ └────────────┘
            │            │            │
            │            │            │
            ▼            ▼            ▼
       终端可用      显示错误      提示重新连接

11.4.3 指数退避重连机制

网络不稳定时,客户端需要自动重连。项目实现了指数退避策略:

typescript
// frontend/src/components/WebTerminal.tsx
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const reconnectCountRef = useRef(0);
const maxReconnectAttempts = 3;

socket.on('disconnect', (reason) => {
  // 服务端主动断开,不重连
  if (reason === 'io server disconnect') {
    socket.disconnect();
    setStatus('disconnected');
    return;
  }

  // 异常断开,尝试重连
  if (reconnectCountRef.current < maxReconnectAttempts) {
    setStatus('connecting');
    reconnectCountRef.current++;
    
    // 指数退避:2s → 4s → 5s (上限)
    const delay = Math.min(
      1000 * Math.pow(2, reconnectCountRef.current), 
      5000
    );
    
    reconnectTimerRef.current = setTimeout(() => {
      socket.connect();
    }, delay);
  } else {
    setStatus('disconnected');
    setError('Terminal connection lost');
  }
});

重连时间计算

重试次数计算公式延迟时间
第 1 次min(1000 × 2¹, 5000)2 秒
第 2 次min(1000 × 2², 5000)4 秒
第 3 次min(1000 × 2³, 5000)5 秒 (到达上限)
超过 3 次-放弃重连,显示错误

11.4.4 完整的资源清理

WebSocket 连接涉及多个资源(Socket、xterm 实例、事件监听器),组件卸载时必须完整清理:

typescript
// frontend/src/components/WebTerminal.tsx
const cleanup = useCallback(() => {
  // 1. 清除重连定时器
  if (reconnectTimerRef.current) {
    clearTimeout(reconnectTimerRef.current);
    reconnectTimerRef.current = null;
  }
  reconnectCountRef.current = 0;

  const socket = socketRef.current;
  const sessionId = sessionIdRef.current;
  const xterm = xtermRef.current;
  const handler = terminalDataHandlerRef.current;

  // 2. 移除终端数据监听器
  if (handler && socket) {
    socket.removeListener('terminal:data', handler);
    terminalDataHandlerRef.current = null;
  }

  // 3. 关闭终端会话并断开连接
  if (socket && sessionId) {
    socket.emit('terminal:close', { sessionId });
    socket.disconnect();
    socketRef.current = null;
  }

  // 4. 销毁 xterm 实例
  if (xterm) {
    xterm.dispose();
    xtermRef.current = null;
  }
}, []);

// useEffect cleanup 中调用
useEffect(() => {
  // ... 初始化逻辑
  
  return () => {
    window.removeEventListener('resize', handleResize);
    cleanup();
  };
}, [serverId, token, cleanup, connect]);

11.5 xterm.js 集成

11.5.1 xterm.js 是什么

xterm.js 是一个基于 TypeScript 的 Web 终端组件,提供完整的终端模拟功能:

┌─────────────────────────────────────────────┐
│                 xterm.js                      │
├─────────────────────────────────────────────┤
│  ANSI 转义序列解析 │ VT100 兼容 │ UTF-8 支持   │
├─────────────────────────────────────────────┤
│  可组合 Addon 系统                            │
│  ├── FitAddon       自适应容器大小            │
│  ├── WebLinksAddon  自动识别可点击链接         │
│  ├── SearchAddon    终端内容搜索              │
│  └── UnicodeAddon   多语言字符支持            │
├─────────────────────────────────────────────┤
│               Canvas 渲染                     │
└─────────────────────────────────────────────┘

11.5.2 终端初始化

项目使用完整的主题配置创建终端实例:

typescript
// frontend/src/components/WebTerminal.tsx
import { Terminal } from 'xterm';
import { FitAddon } from 'xterm-addon-fit';
import { WebLinksAddon } from 'xterm-addon-web-links';
import 'xterm/css/xterm.css';

const term = new Terminal({
  cursorBlink: true,                    // 光标闪烁
  fontSize: 14,                         // 字体大小
  fontFamily: 'Menlo, Monaco, "Courier New", monospace',
  theme: {
    background: '#1e1e1e',             // VS Code 暗色背景
    foreground: '#d4d4d4',
    cursor: '#d4d4d4',
    selectionBackground: '#264f78',
    // 16 色完整配置
    black: '#000000', red: '#cd3131', green: '#0dbc79',
    yellow: '#e5e510', blue: '#2472c8', magenta: '#bc3fbc',
    cyan: '#11a8cd', white: '#e5e5e5',
    brightBlack: '#666666', brightRed: '#f14c4c',
    brightGreen: '#23d18b', brightYellow: '#f5f543',
    brightBlue: '#3b8eea', brightMagenta: '#d670d6',
    brightCyan: '#29b8db', brightWhite: '#e5e5e5'
  },
  allowProposedApi: true,
  scrollback: 5000                      // 回滚缓冲区行数
});

// 加载 Addon
const fitAddon = new FitAddon();
const webLinksAddon = new WebLinksAddon();
term.loadAddon(fitAddon);
term.loadAddon(webLinksAddon);

// 挂载到 DOM
term.open(terminalRef.current);
fitAddon.fit();                         // 自适应容器大小

11.5.3 双向数据流

xterm.js 与 WebSocket 之间通过事件驱动实现双向数据流:

用户键盘输入                    xterm.js                  WebSocket                  SSH Shell
     │                            │                          │                          │
     │── 输入 "ls" ──────────────►│                          │                          │
     │                            │── term.onData ──────────►│                          │
     │                            │   emit('terminal:data')  │                          │
     │                            │   {sessionId, "ls"}     │── terminal:data ────────►│
     │                            │                          │── shell.write("ls") ───►│
     │                            │                          │                          │
     │                            │                          │◄── stream data "total\n" │
     │                            │◄── emit('terminal:data')─│                          │
     │                            │   {sessionId, data}      │                          │
     │◄── term.write(data) ──────│                          │                          │
     │   显示 "total 42\n"        │                          │                          │

输入处理代码

typescript
// 用户输入 → WebSocket → SSH
term.onData((data) => {
  if (socketRef.current?.connected && sessionIdRef.current) {
    socketRef.current.emit('terminal:data', {
      sessionId: sessionIdRef.current,
      data   // 包含按键字符、控制字符等
    });
  }
});

// SSH 输出 → WebSocket → xterm 显示
const terminalDataHandler = (data: { sessionId: string; data: string }) => {
  if (data.sessionId === sessionIdRef.current && xtermRef.current) {
    xtermRef.current.write(data.data);  // 渲染终端输出
  }
};
socket.on('terminal:data', terminalDataHandler);

11.5.4 终端自适应

当浏览器窗口大小变化时,需要通知 xterm.js 重新计算行列数,并同步到后端 SSH:

typescript
// xterm 尺寸变化 → 通知后端调整 SSH 窗口
term.onResize(({ cols, rows }) => {
  if (socketRef.current?.connected && sessionIdRef.current) {
    socketRef.current.emit('terminal:resize', {
      sessionId: sessionIdRef.current,
      cols,
      rows
    });
  }
});

// 浏览器窗口 resize → 触发 xterm 重新 fit
const handleResize = () => {
  fitAddon.fit();
};
window.addEventListener('resize', handleResize);

11.5.5 Ref 管理策略

WebTerminal 组件管理了 8 个 Ref,合理区分了状态和引用:

Ref 名称类型为什么用 Ref 而不是 State
terminalRefHTMLDivElementDOM 引用,不需要触发重渲染
xtermRefTerminal实例引用,变化不需要重渲染
fitAddonRefFitAddonAddon 引用,变化不需要重渲染
socketRefSocket连接实例,变化不需要重渲染
sessionIdRefstring回调函数中需要最新值
terminalDataHandlerRefFunction清理时需要引用
reconnectTimerRefTimer定时器引用,不需要重渲染
reconnectCountRefnumber回调函数中需要最新值

state vs ref 选择原则

是否需要触发 UI 重渲染?

    ├── 是 ──► 使用 useState
    │          (status, error)

    └── 否 ──► 使用 useRef
               (实例、回调、定时器等)

11.5.6 UI 状态展示

终端组件通过状态指示器向用户展示当前连接状态:

tsx
<div className="flex items-center gap-2">
  <div className={`w-2 h-2 rounded-full ${
    status === 'connected' ? 'bg-green-500' :
    status === 'connecting' ? 'bg-yellow-500 animate-pulse' :
    status === 'error' ? 'bg-red-500' :
    'bg-gray-500'
  }`} />
  <span className="text-sm text-gray-300 font-medium">{serverName}</span>
  <span className="text-xs text-gray-500">({status})</span>
</div>

状态视觉效果

状态颜色动画含义
connected绿色终端正常可用
connecting黄色pulse 脉冲动画正在建立连接
error红色连接失败,显示错误
disconnected灰色连接已断开

11.6 优雅关闭与服务生命周期

11.6.1 服务关闭流程

应用退出时需要按顺序关闭所有服务,确保资源正确释放:

typescript
// backend/src/app.ts
const gracefulShutdown = async (signal: string) => {
  logger.info(`${signal} received, starting graceful shutdown...`);
  
  // 30 秒超时保护
  const shutdownTimeout = setTimeout(() => {
    logger.error('Graceful shutdown timed out, forcing exit');
    process.exit(1);
  }, 30000);

  try {
    // 1. 关闭 HTTP 和 WebSocket 服务器
    await Promise.all([
      new Promise<void>((resolve) => httpServer.close(() => resolve())),
      new Promise<void>((resolve) => io.close(() => {
        logger.info('WebSocket server closed');
        resolve();
      }))
    ]);

    // 2. 停止定时服务
    schedulerService.shutdown();
    backupService.stopAutoBackup();

    // 3. 关闭数据库
    db.close();

    logger.shutdown();
    clearTimeout(shutdownTimeout);
    process.exit(0);
  } catch (error) {
    clearTimeout(shutdownTimeout);
    process.exit(1);
  }
};

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

关闭顺序图

SIGTERM / SIGINT


┌──────────────────────┐
│ 停止接收新连接         │  HTTP + WebSocket 关闭
│ (不再处理新请求)       │
└──────────┬───────────┘


┌──────────────────────┐
│ 等待现有请求完成       │  等待中的终端会话结束
│ (最多 30 秒)          │
└──────────┬───────────┘


┌──────────────────────┐
│ 停止定时任务           │  Scheduler, Backup
└──────────┬───────────┘


┌──────────────────────┐
│ 关闭数据库连接         │  SQLite close
└──────────┬───────────┘


      process.exit(0)

11.6.2 终端会话管理

终端服务通过定时清理机制管理会话生命周期:

typescript
// backend/src/services/terminalService.ts
const SESSION_TTL_MS = 30 * 60 * 1000;     // 30 分钟超时
const SESSION_MAX_COUNT = 100;             // 最多 100 个会话
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // 每 5 分钟检查一次

const cleanupTimer = setInterval(() => {
  const now = Date.now();
  let cleaned = 0;
  
  // 超过最大数量时,按创建时间排序,清理最老的会话
  if (activeSessions.size > SESSION_MAX_COUNT) {
    const entries = Array.from(activeSessions.entries())
      .sort((a, b) => a[1].createdAt.getTime() - b[1].createdAt.getTime());
    
    const toRemove = entries.slice(0, activeSessions.size - SESSION_MAX_COUNT);
    for (const [id, session] of toRemove) {
      try { session.shell.end(); } catch { /* ignore */ }
      try { session.conn.end(); } catch { /* ignore */ }
      activeSessions.delete(id);
      cleaned++;
    }
  }
  
  // 清理超时会话
  for (const [id, session] of activeSessions.entries()) {
    if (now - session.createdAt.getTime() > SESSION_TTL_MS) {
      try { session.shell.end(); } catch { /* ignore */ }
      try { session.conn.end(); } catch { /* ignore */ }
      activeSessions.delete(id);
      cleaned++;
    }
  }
  
  if (cleaned > 0) {
    logger.info(`Cleaned up ${cleaned} expired/orphan terminal sessions`);
  }
}, CLEANUP_INTERVAL_MS);

cleanupTimer.unref();  // 不阻止 Node.js 进程退出

会话生命周期配置

配置项说明
SESSION_TTL_MS30 分钟单个会话最大存活时间
SESSION_MAX_COUNT100全局最大同时在线终端数
CLEANUP_INTERVAL_MS5 分钟清理检查间隔
unref()调用定时器不阻止进程退出

11.7 实时通信最佳实践

11.7.1 安全实践

实践实现方式代码位置
JWT 认证io.use(authenticateSocket)handler.ts
消息大小限制maxHttpBufferSize: 1e6app.ts
CORS 限制origin: env.ALLOWED_ORIGINSapp.ts
命令安全检查checkCommandSafety(data, userRole)terminalService.ts
会话数量限制SESSION_MAX_COUNT = 100terminalService.ts

11.7.2 性能实践

实践说明
传输协议选择终端场景强制 WebSocket,避免 Long-Polling 开销
事件监听器清理disconnect 时 removeListener 防止内存泄漏
Buffer 转字符串shell data 使用 toString('utf-8') 按需转换
房间订阅清理disconnect 时从 taskRooms Map 中删除
定时器 unrefcleanupTimer 不阻止进程退出

11.7.3 可靠性实践

实践实现
心跳检测应用层 + 传输层双层心跳
指数退避重连Math.min(1000 * 2^n, 5000)
优雅关闭按顺序关闭服务,30 秒超时保护
会话超时30 分钟自动清理闲置终端
ACK 回调terminal:open 使用 callback 返回结果

11.8 常见陷阱与解决方案

11.8.1 事件监听器泄漏

问题:每次重连时重复注册事件监听器,导致消息重复处理。

typescript
// 错误做法:每次 connect 都注册
socket.on('connect', () => {
  socket.on('terminal:data', handler);  // 重复注册!
});

// 正确做法:在外部注册一次
socket.on('terminal:data', handler);
socket.on('connect', () => {
  // 只处理连接成功后的逻辑
});

11.8.2 State 与 Ref 的混淆

问题:在回调函数中使用过期的 state 值。

typescript
// 错误做法:回调中 sessionId 可能过期
const [sessionId, setSessionId] = useState<string | null>(null);

socket.on('terminal:data', (data) => {
  // 这里的 sessionId 是闭包捕获的旧值
  if (data.sessionId === sessionId) { ... }
});

// 正确做法:使用 ref 保存最新值
const sessionIdRef = useRef<string | null>(null);
sessionIdRef.current = result.sessionId;

socket.on('terminal:data', (data) => {
  if (data.sessionId === sessionIdRef.current) { ... }  // 始终是最新值
});

11.8.3 强制传输协议的风险

问题:某些企业网络环境可能阻断 WebSocket 连接。

typescript
// 当前配置:强制 WebSocket
transports: ['websocket']

// 如需兼容受限网络,可改为:
transports: ['websocket', 'polling']

本章小结

本章系统讲解了 ITOps Agent Platform 的实时通信架构,核心要点包括:

  1. Socket.IO 服务端:通过中间件实现 JWT 认证,通过房间模型实现消息路由,通过双层心跳检测保障连接活性
  2. 事件设计体系:按 SSH 终端、任务执行、告警通知三大场景分类设计,使用命名空间区分事件类型
  3. 客户端集成:使用指数退避重连、ACK 回调、完整的资源清理确保连接可靠性
  4. xterm.js 集成:通过 WebSocket 双向数据流实现 Web SSH 终端,使用 FitAddon 实现自适应布局
  5. 连接管理:会话 TTL 超时、最大数量限制、优雅关闭流程保障系统稳定性

WebSocket 通信是运维平台的核心基础设施,合理的事件设计和连接管理策略是构建高可用系统的关键。

本章练习

基础练习

  1. 阅读 backend/src/websocket/handler.ts,画出完整的 WebSocket 事件处理流程图,标注每个事件的触发条件和响应动作。

  2. 在 WebTerminal 组件中,cleanup 函数按什么顺序清理资源?为什么这个顺序很重要?如果顺序颠倒可能产生什么问题?

  3. 解释 Socket.IO 的房间模型是如何工作的。如果 10 个用户同时订阅了同一个任务 task:abc123,当服务端调用 emitToTask(io, 'abc123', 'task:completed', {...}) 时,会发生什么?

进阶练习

  1. 当前的心跳检测使用 isAlive 布尔标记。请分析:如果客户端网络中断但没有完全断开(半开连接),服务端需要多长时间才能检测到?请计算最坏情况下的检测延迟时间。

  2. 终端服务的 cleanupTimer 使用 setInterval 实现。如果改为基于每个会话的独立 setTimeout,各有什么优缺点?在什么场景下你会选择哪种方案?

  3. 当前 WebSocket 认证只在连接时验证一次。如果用户的 Token 在连接期间过期或被拉入黑名单,如何实现连接期间的 Token 刷新或踢出机制?

思考题

  1. Socket.IO 的 maxHttpBufferSize: 1e6(1MB)限制了单条消息大小。如果一个终端命令的输出超过 1MB(例如 cat /var/log/syslog),会发生什么?如何设计分片传输机制支持大消息?

  2. 项目中的 WebSocket 使用内存 Map 存储房间订阅关系。如果部署多个后端实例(水平扩展),跨实例的房间广播如何实现?请提出至少两种方案并比较优缺点。

  3. 当前终端组件使用 3 次指数退避重连。在移动端网络环境下,这个策略是否合适?请分析 4G/5G/WiFi 切换场景下的网络特征,提出更优的重连策略。

延伸阅读

基于 MPL-2.0 许可证发布