IM SDK 可靠性增强

Reliability Enhancement Documentation
v2.0.0
Online
← 返回基础文档

IM SDK 可靠性增强 概览

IM SDK v2.0 针对直播间场景进行了全面的可靠性增强,解决了消息丢失、高并发卡顿、断线恢复等核心痛点。本次升级覆盖服务端(ASP.NET Core SignalR)和四端客户端 SDK(Flutter、iOS、Android、Web)。

✅ 本次升级完全向后兼容,现有 v1.x 客户端无需修改即可继续使用。新功能通过新增 API 和事件提供,不影响已有接口。

九大模块

模块功能解决的问题
NEW消息缓存与断连补发Redis Sorted Set 缓存最近 500 条消息,断连后自动补发网络抖动导致消息丢失
增强全局广播使用 SignalR Group 机制高效推送飘屏通知全局广播性能瓶颈
NEWRedis Backplane 集群多节点 SignalR 集群,共享状态存储在 Redis单节点并发上限
NEW消息频率限制滑动窗口算法限制用户发送频率(默认 5 条/秒)恶意刷屏
NEW高并发消息合并消息频率超阈值时自动批量推送(200ms 间隔)万人直播间客户端卡顿
NEW结构化消息类型TextMessage、GiftMessage、SystemNotice、CustomMessage 四种标准类型客户端需手动解析 JSON
NEW断线重连状态恢复Connection Snapshot 自动恢复群组关系和补发消息重连后需手动重新加入
NEW消息序列号与去重群组内单调递增序列号,客户端自动去重和间隙检测消息重复和乱序
增强连接状态增强新增 restoring 状态、重连事件、心跳机制连接状态不透明

技术栈

  • 后端:ASP.NET Core 8 + SignalR Hub(OrangeCloud.SignalR 项目)
  • 缓存/状态:StackExchange.Redis(Backplane、消息缓存、频率限制、连接快照、共享状态)
  • 集群:Microsoft.AspNetCore.SignalR.StackExchangeRedis
  • Flutter SDK:orangecloud_im_client(Dart Package)
  • iOS SDK:OrangeCloudIMClient(Swift Package)
  • Android SDK:orangecloud-im-client(AAR Library)
  • Web SDK:orangecloud-im-client(npm + TypeScript)

服务端 API

LiveHub 路由:/hubs/live,以下为新增和增强的 Hub 方法。

Hub 方法签名

方法参数说明
JoinGroup string groupId, long? lastSequenceNumber 加入群组。传入 lastSequenceNumber 时自动补发断连期间消息
QuitGroup string groupId 退出群组,清除连接快照
SendGroupMsg string groupId, string messageJson 发送群组消息。服务端自动注入 sequenceNumber 和 serverTimestamp
GetGroupMemberList string groupId 获取群组在线成员列表
RequestBackfill string groupId, long afterSeq NEW请求补发指定序列号之后的消息
Heartbeat NEW心跳包,维持连接活跃状态

JoinGroup 增强说明

lastSequenceNumber 参数为可选。传入时,服务端会从 Message Buffer 中查询该序列号之后的所有消息并补发:

// 客户端调用示例(SignalR invoke)
await connection.invoke("JoinGroup", "room_10001", 12345);

// 服务端处理逻辑:
// 1. 将连接加入 SignalR Group
// 2. 如果 lastSequenceNumber 有值,从 Buffer 查询 seq > 12345 的消息
// 3. 按序列号顺序逐条发送给该客户端
// 4. 如果 Buffer 中最小 seq > lastSequenceNumber,发送 BufferOverflow 事件

服务端事件列表

以下事件由服务端推送到客户端:

事件名数据格式触发时机
ReceiveMessage JSON string(含 sequenceNumber、serverTimestamp) 收到群组消息(逐条模式)
ReceiveBroadcast JSON string NEW收到全局广播消息(飘屏通知)
ReceiveBatchMessage JSON string(messages 数组) NEW收到批量消息(高并发合并模式)
StateRestored { groupIds: [], backfilledCount: N } NEW断线重连状态恢复完成
RateLimited { cooldownMs: N } NEW消息被频率限制拒绝
BufferOverflow { groupId: "..." } NEW消息缓冲区溢出(断连过久)
UserJoined JSON string(用户信息) 用户加入群组
UserLeft string(userId) 用户离开群组
OnlineCountChanged int(在线人数) 群组在线人数变化

ReceiveBatchMessage 数据格式

{
  "type": "batch",
  "messages": [
    {
      "messageType": "text",
      "sequenceNumber": 100,
      "serverTimestamp": 1700000000000,
      "senderInfo": { "userId": "u1", "nickName": "张三" },
      "data": { "content": "消息1" }
    },
    {
      "messageType": "text",
      "sequenceNumber": 101,
      "serverTimestamp": 1700000000100,
      "senderInfo": { "userId": "u2", "nickName": "李四" },
      "data": { "content": "消息2" }
    }
  ],
  "count": 2
}

配置参数(IMReliability)

appsettings.json 中配置 IMReliability 节:

{
  "IMReliability": {
    "BufferMaxSize": 500,
    "BufferTtlMinutes": 10,
    "MaxMessagesPerSecond": 5,
    "RateLimitWindowMs": 1000,
    "MergerThresholdPerSecond": 50,
    "MergerFlushIntervalMs": 200,
    "SnapshotTtlMinutes": 10,
    "EnableRedisBackplane": false,
    "RedisBackplaneConnection": ""
  }
}
参数类型默认值说明
BufferMaxSizeint500每个群组消息缓冲区最大条数
BufferTtlMinutesint10消息缓冲区 TTL(分钟),超时自动清理
MaxMessagesPerSecondint5单用户每秒最大发送消息数
RateLimitWindowMsint1000频率限制滑动窗口大小(毫秒)
MergerThresholdPerSecondint50消息合并触发阈值(条/秒)
MergerFlushIntervalMsint200批量消息刷新间隔(毫秒)
SnapshotTtlMinutesint10连接快照 TTL(分钟),超时不自动恢复
EnableRedisBackplaneboolfalse是否启用 Redis Backplane 集群模式
RedisBackplaneConnectionstring""Redis Backplane 连接字符串(集群模式必填)
💡 单节点部署时 EnableRedisBackplane 设为 false 可减少 Redis 开销。消息缓存、频率限制等功能仍使用 Redis,但不启用 Backplane 消息同步。

Flutter SDK 增强

Flutter SDK v2.0 新增类型安全 API、事件流、消息去重、断线恢复等功能。

结构化消息类型

新增四种标准消息类型,提供强类型的发送和接收 API:

import 'package:orangecloud_im_client/orangecloud_im_client.dart';

// 消息类型枚举
enum IMMessageType {
  text,        // 文本聊天
  gift,        // 礼物动画
  systemNotice, // 系统通知
  custom,      // 自定义扩展
}

// 消息基类
abstract class IMMessage {
  final IMMessageType messageType;
  final SenderInfo senderInfo;
  final int timestamp;
  final int sequenceNumber;
  final String groupId;
}

// 具体消息类型
class TextMessage extends IMMessage { ... }
class GiftMessage extends IMMessage { ... }
class SystemNotice extends IMMessage { ... }
class CustomMessage extends IMMessage { ... }

类型安全的发送方法

final client = OrangeCloudIMClient();

// 发送文本消息
await client.sendTextMessage('room_10001', '大家好!');

// 发送礼物消息
await client.sendGiftMessage('room_10001', GiftInfo(
  giftId: 'gift_001',
  giftName: '火箭',
  giftCount: 1,
  giftPrice: 100,
  animationUrl: 'https://cdn.example.com/rocket.json',
));

// 发送自定义消息
await client.sendCustomMessage('room_10001', 'like', {
  'count': 1,
  'emoji': '❤️',
});

事件流

新增多个类型安全的事件流,替代原有的单一 onMessageReceived

// === 类型安全的接收回调 ===
client.onTextMessageReceived.listen((TextMessage msg) {
  print('[${msg.senderInfo.nickName}]: ${msg.data.content}');
});

client.onGiftMessageReceived.listen((GiftMessage msg) {
  print('${msg.senderInfo.nickName} 送出 ${msg.data.giftName} x${msg.data.giftCount}');
  // 播放礼物动画
  playGiftAnimation(msg.data.animationUrl);
});

client.onSystemNoticeReceived.listen((SystemNotice msg) {
  print('系统通知: ${msg.data.content}');
});

client.onCustomMessageReceived.listen((CustomMessage msg) {
  print('自定义消息: ${msg.data.customType} - ${msg.data.payload}');
});

// === 新增事件流 ===
// 全局广播(飘屏通知)
client.onBroadcastReceived.listen((IMMessage msg) {
  showFloatingNotification(msg);
});

// 批量消息(高并发模式)
client.onBatchMessageReceived.listen((List<IMMessage> messages) {
  for (final msg in messages) {
    handleMessage(msg);
  }
});

// 状态恢复完成
client.onStateRestored.listen((StateRestoredInfo info) {
  print('已恢复 ${info.restoredGroupIds.length} 个群组');
  print('补发了 ${info.backfilledMessageCount} 条消息');
});

// 重连尝试
client.onReconnectAttempt.listen((ReconnectAttemptInfo info) {
  print('第 ${info.attemptNumber} 次重连,${info.delayMs}ms 后重试');
});

// 重连失败(超过最大次数)
client.onReconnectFailed.listen((_) {
  showDialog('连接已断开,请检查网络后手动重连');
});

// === 向后兼容 ===
// 原有的 onMessageReceived 仍然工作,接收所有类型的原始 JSON
client.onMessageReceived.listen((String messageJson) {
  // 所有消息都会触发此回调(向后兼容)
});

去重机制

SDK 自动处理消息去重,无需业务层干预:

// SDK 内部自动维护每个群组的 lastSequenceNumber
// 收到消息时:
// 1. 如果 msg.sequenceNumber <= lastSequenceNumber → 静默丢弃(去重)
// 2. 如果 msg.sequenceNumber == lastSequenceNumber + 1 → 正常处理
// 3. 如果 msg.sequenceNumber > lastSequenceNumber + 1 → 检测到间隙,自动请求补发

// 查询当前序列号
int lastSeq = client.getLastSequenceNumber('room_10001');
print('群组 room_10001 最后序列号: $lastSeq');
💡 去重和间隙检测完全由 SDK 内部处理。业务层收到的消息已经过去重,保证不会重复。间隙检测触发的补发请求也是自动的。

重连配置

final client = OrangeCloudIMClient();

// 配置最大重连次数(默认 -1 = 无限重连)
client.maxReconnectAttempts = 10;

// 重连策略:指数退避
// 第1次: 1秒后重试
// 第2次: 2秒后重试
// 第3次: 4秒后重试
// ...最大间隔 30秒

// 连接状态枚举(扩展)
// IMConnectionState.disconnected  - 已断开
// IMConnectionState.connecting    - 连接中
// IMConnectionState.connected     - 已连接
// IMConnectionState.reconnecting  - 重连中
// IMConnectionState.restoring     - 状态恢复中(NEW)

client.onConnectionStateChanged.listen((state) {
  switch (state) {
    case IMConnectionState.restoring:
      showStatus('正在恢复连接状态...');
      break;
    case IMConnectionState.connected:
      showStatus('已连接');
      break;
    case IMConnectionState.reconnecting:
      showStatus('重连中...');
      break;
    default:
      break;
  }
});

网络状态监听

// SDK 内部自动监听网络状态变化
// 当检测到网络从离线恢复为在线时,立即触发重连(不等待定时器)

// 心跳机制:
// - 连接成功后每 30 秒自动发送心跳包
// - 服务端超过 90 秒未收到心跳则主动断开
// - 心跳超时客户端会自动触发重连

// 无需额外配置,SDK 自动处理

iOS SDK 增强

iOS SDK v2.0 使用 Swift Concurrency 和 Combine 提供类型安全的 API。

结构化消息类型

import OrangeCloudIMClient

// 消息协议
protocol IMMessage: Codable {
    var messageType: IMMessageType { get }
    var senderInfo: SenderInfo { get }
    var timestamp: Int { get }
    var sequenceNumber: Int { get }
    var groupId: String { get }
}

// 具体消息类型
struct TextMessage: IMMessage { ... }
struct GiftMessage: IMMessage { ... }
struct SystemNotice: IMMessage { ... }
struct CustomMessage: IMMessage { ... }

// 类型安全的发送方法
try await client.sendTextMessage(groupId: "room_10001", content: "大家好!")
try await client.sendGiftMessage(groupId: "room_10001", giftInfo: GiftInfo(
    giftId: "gift_001",
    giftName: "火箭",
    giftCount: 1,
    giftPrice: 100
))
try await client.sendCustomMessage(groupId: "room_10001", customType: "like", payload: ["count": 1])

事件回调

// 类型安全的接收回调
client.onTextMessage = { (msg: TextMessage) in
    print("[\(msg.senderInfo.nickName)]: \(msg.data.content)")
}

client.onGiftMessage = { (msg: GiftMessage) in
    playGiftAnimation(msg.data.animationUrl)
}

client.onSystemNotice = { (msg: SystemNotice) in
    showSystemAlert(msg.data.content)
}

// 新增事件
client.onBroadcastReceived = { msg in /* 全局广播 */ }
client.onBatchMessageReceived = { messages in /* 批量消息 */ }
client.onStateRestored = { info in
    print("恢复了 \(info.restoredGroupIds.count) 个群组")
}
client.onReconnectAttempt = { info in
    print("第 \(info.attemptNumber) 次重连")
}
client.onReconnectFailed = {
    showReconnectFailedAlert()
}

// 向后兼容:原有 onMessageReceived 继续工作
client.onMessageReceived = { messageJson in /* 所有消息原始 JSON */ }

重连与恢复

// 配置最大重连次数
client.maxReconnectAttempts = 10  // 默认 -1(无限重连)

// 获取序列号
let lastSeq = client.getLastSequenceNumber(groupId: "room_10001")

// 连接状态枚举
enum IMConnectionState {
    case disconnected
    case connecting
    case connected
    case reconnecting
    case restoring  // NEW: 状态恢复中
}

// 网络恢复立即重连(使用 NWPathMonitor)
// 心跳:每 30 秒自动发送

Android SDK 增强

Android SDK v2.0 使用 Kotlin Coroutines 和 Flow 提供类型安全的 API。

结构化消息类型

import com.orangecloud.im.OrangeCloudIMClient
import com.orangecloud.im.models.*

// 消息基类(使用 Gson 注解)
abstract class IMMessage {
    abstract val messageType: IMMessageType
    abstract val senderInfo: SenderInfo
    abstract val timestamp: Long
    abstract val sequenceNumber: Long
    abstract val groupId: String
}

// 具体消息类型
data class TextMessage(...) : IMMessage()
data class GiftMessage(...) : IMMessage()
data class SystemNotice(...) : IMMessage()
data class CustomMessage(...) : IMMessage()

// 类型安全的发送方法
client.sendTextMessage("room_10001", "大家好!")
client.sendGiftMessage("room_10001", GiftInfo(
    giftId = "gift_001",
    giftName = "火箭",
    giftCount = 1,
    giftPrice = 100
))
client.sendCustomMessage("room_10001", "like", mapOf("count" to 1))

事件回调

// 类型安全的接收回调(Lambda)
client.onTextMessage = { msg: TextMessage ->
    Log.d("IM", "[${msg.senderInfo.nickName}]: ${msg.data.content}")
}

client.onGiftMessage = { msg: GiftMessage ->
    playGiftAnimation(msg.data.animationUrl)
}

client.onSystemNotice = { msg: SystemNotice ->
    showSystemAlert(msg.data.content)
}

// 新增事件
client.onBroadcastReceived = { msg -> /* 全局广播 */ }
client.onBatchMessageReceived = { messages -> /* 批量消息 */ }
client.onStateRestored = { info ->
    Log.d("IM", "恢复了 ${info.restoredGroupIds.size} 个群组")
}
client.onReconnectAttempt = { info ->
    Log.d("IM", "第 ${info.attemptNumber} 次重连")
}
client.onReconnectFailed = {
    showReconnectFailedDialog()
}

// 向后兼容
client.onMessageReceived = { messageJson -> /* 所有消息原始 JSON */ }

重连与恢复

// 配置最大重连次数
client.maxReconnectAttempts = 10  // 默认 -1(无限重连)

// 获取序列号
val lastSeq = client.getLastSequenceNumber("room_10001")

// 连接状态枚举
enum class IMConnectionState {
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    RECONNECTING,
    RESTORING  // NEW: 状态恢复中
}

// 网络恢复立即重连(使用 ConnectivityManager.NetworkCallback)
// 心跳:每 30 秒自动发送

Web SDK 增强

Web SDK v2.0 使用 TypeScript 提供完整的类型定义和类型安全 API。

结构化消息类型

import { OrangeCloudIMClient, IMMessageType, TextMessage, GiftMessage } from 'orangecloud-im-client';

// TypeScript 接口定义
interface IMMessage {
  messageType: IMMessageType;
  senderInfo: SenderInfo;
  timestamp: number;
  sequenceNumber: number;
  groupId: string;
}

interface TextMessage extends IMMessage {
  data: { content: string };
}

interface GiftMessage extends IMMessage {
  data: {
    giftId: string;
    giftName: string;
    giftCount: number;
    giftPrice: number;
    animationUrl?: string;
  };
}

// 类型安全的发送方法
const client = new OrangeCloudIMClient();
await client.sendTextMessage('room_10001', '大家好!');
await client.sendGiftMessage('room_10001', {
  giftId: 'gift_001',
  giftName: '火箭',
  giftCount: 1,
  giftPrice: 100,
});
await client.sendCustomMessage('room_10001', 'like', { count: 1 });

事件回调

// 类型安全的接收回调
client.onTextMessage((msg: TextMessage) => {
  console.log(`[${msg.senderInfo.nickName}]: ${msg.data.content}`);
});

client.onGiftMessage((msg: GiftMessage) => {
  playGiftAnimation(msg.data.animationUrl);
});

client.onSystemNotice((msg: SystemNotice) => {
  showSystemAlert(msg.data.content);
});

client.onCustomMessage((msg: CustomMessage) => {
  console.log('自定义消息:', msg.data.customType, msg.data.payload);
});

// 新增事件
client.onBroadcastReceived((msg) => { /* 全局广播 */ });
client.onBatchMessageReceived((messages) => { /* 批量消息 */ });
client.onStateRestored((info) => {
  console.log(`恢复了 ${info.restoredGroupIds.length} 个群组`);
});
client.onReconnectAttempt((info) => {
  console.log(`第 ${info.attemptNumber} 次重连`);
});
client.onReconnectFailed(() => {
  showReconnectFailedUI();
});

// 向后兼容
client.onMessageReceived((messageJson: string) => { /* 所有消息原始 JSON */ });

重连与恢复

// 配置最大重连次数
client.maxReconnectAttempts = 10; // 默认 -1(无限重连)

// 获取序列号
const lastSeq = client.getLastSequenceNumber('room_10001');

// 连接状态枚举
enum IMConnectionState {
  disconnected = 'disconnected',
  connecting = 'connecting',
  connected = 'connected',
  reconnecting = 'reconnecting',
  restoring = 'restoring',  // NEW: 状态恢复中
}

// 网络恢复立即重连(使用 navigator.onLine + online 事件)
// 心跳:每 30 秒自动发送(setInterval)

各平台实现差异

功能FlutteriOSAndroidWeb
网络监听connectivity_plusNWPathMonitorConnectivityManagernavigator.onLine
事件模型Stream (Dart)Closure / CombineLambda / FlowCallback function
JSON 解析dart:convertCodableGsonJSON.parse
心跳实现Timer.periodicDispatchSourceTimerHandler.postDelayedsetInterval
后台保活不保活不保活不保活不保活

架构图与时序图

以下使用 Mermaid 格式描述系统架构和关键流程。

系统架构图

graph TB
    subgraph Clients["客户端"]
        Flutter["Flutter SDK"]
        iOS["iOS SDK"]
        Android["Android SDK"]
        Web["Web SDK"]
    end

    subgraph SignalR_Cluster["SignalR 集群"]
        Node1["SignalR Node 1"]
        Node2["SignalR Node 2"]
        NodeN["SignalR Node N"]
    end

    subgraph Redis["Redis 存储层"]
        Backplane["Redis Backplane\nSignalR 消息同步"]
        MsgBuffer["Message Buffer\nSorted Set per Group"]
        SeqCounter["Sequence Counter\nINCR per Group"]
        RateLimiter["Rate Limiter\nSliding Window ZSET"]
        Snapshot["Connection Snapshot\nHash per User"]
        SharedState["Shared State\nUserConnections / RoomUsers"]
    end

    Flutter & iOS & Android & Web -->|"WebSocket"| Node1 & Node2 & NodeN
    Node1 & Node2 & NodeN <-->|"Pub/Sub"| Backplane
    Node1 & Node2 & NodeN --> MsgBuffer
    Node1 & Node2 & NodeN --> SeqCounter
    Node1 & Node2 & NodeN --> RateLimiter
    Node1 & Node2 & NodeN --> Snapshot
    Node1 & Node2 & NodeN --> SharedState

架构说明

  • 客户端层:四端 SDK 通过 WebSocket 连接到 SignalR 集群任一节点
  • SignalR 集群:多个 SignalR 节点通过 Redis Backplane 同步消息,支持水平扩展
  • Redis 存储层:所有共享状态存储在 Redis 中,确保多节点一致性

消息流转时序图

sequenceDiagram
    participant C as Client
    participant H as LiveHub
    participant R as Redis
    participant G as Group Clients

    C->>H: SendGroupMsg(groupId, messageJson)
    H->>R: ZRANGEBYSCORE (Rate Limiter Check)
    alt Rate Limited
        H-->>C: RateLimited Event { cooldownMs }
    else Allowed
        H->>R: INCR (Get Sequence Number)
        R-->>H: sequenceNumber
        H->>H: Inject sequenceNumber + serverTimestamp
        H->>R: ZADD Message_Buffer (score=seq)
        H->>H: Message Merger Check
        alt High Frequency (>50/sec)
            H->>H: Queue to Merger
            H-->>G: ReceiveBatchMessage (every 200ms)
        else Normal
            H-->>G: ReceiveMessage
        end
    end

消息处理流程说明

  1. 频率检查:使用 Redis Sorted Set 滑动窗口检查用户发送频率
  2. 序列号分配:通过 Redis INCR 原子操作获取群组内唯一递增序列号
  3. 元数据注入:服务端自动注入 sequenceNumber 和 serverTimestamp
  4. 缓冲区写入:消息写入 Redis Sorted Set(Score = sequenceNumber)
  5. 推送决策:根据当前群组消息频率决定逐条推送或批量合并

断线重连恢复时序图

sequenceDiagram
    participant C as Client
    participant H as LiveHub
    participant R as Redis

    Note over C: 网络断开
    H->>R: HSET Connection_Snapshot\n(groups, lastSeqNums, createdAt)
    Note over C: 网络恢复,自动重连
    C->>H: OnConnected (携带 userId)
    H->>R: HGETALL Connection_Snapshot
    alt Snapshot Exists (< 10min)
        H->>H: Auto JoinGroup for each group
        loop For each group
            H->>R: ZRANGEBYSCORE Buffer\n(seq > lastSeq)
            R-->>H: Missed messages
            H-->>C: Backfill messages (ReceiveMessage)
        end
        H-->>C: StateRestored Event\n{ groupIds, backfilledCount }
        H->>R: DEL Connection_Snapshot
    else Snapshot Expired (> 10min)
        H-->>C: No auto-restore
        Note over C: 需手动调用 JoinGroup
    end

断线恢复流程说明

  1. 断连检测:服务端在 OnDisconnectedAsync 中保存连接快照到 Redis
  2. 快照内容:用户所在群组列表 + 每个群组的 lastSequenceNumber
  3. 重连恢复:重连后自动查询快照,恢复群组关系并补发消息
  4. 快照过期:TTL 10 分钟,超时后不自动恢复,需客户端手动加入
  5. 恢复通知:恢复完成后发送 StateRestored 事件通知客户端

迁移指南

从 IM SDK v1.x 升级到 v2.0 的步骤和注意事项。

✅ v2.0 完全向后兼容 v1.x。现有代码无需修改即可继续工作。新功能通过新增 API 提供,可按需逐步迁移。

升级步骤

服务端升级

  1. 更新 NuGet 包
    # 新增依赖
    dotnet add package Microsoft.AspNetCore.SignalR.StackExchangeRedis
  2. 添加配置节

    appsettings.json 中添加 IMReliability 配置(见上方配置参数章节)

  3. 注册服务

    Program.cs 中的服务注册已自动完成,无需手动修改

  4. 确保 Redis 可用

    消息缓存、频率限制等功能依赖 Redis。确保 Redis 连接字符串配置正确

客户端升级

  1. 更新 SDK 版本:将各平台 SDK 更新到 v2.0
  2. (可选)使用类型安全 API:将 sendGroupMsg 替换为 sendTextMessage
  3. (可选)监听新事件:添加 onBroadcastReceivedonStateRestored 等监听
  4. (可选)配置重连参数:设置 maxReconnectAttempts

Breaking Changes

⚠️ v2.0 没有 Breaking Changes。以下列出的是行为变化,不影响现有代码运行。
变化项v1.x 行为v2.0 行为影响
消息 JSON 新增字段 无 sequenceNumber/serverTimestamp 服务端自动注入这两个字段 无影响,新增字段不影响已有解析逻辑
重连后自动恢复 重连后需手动 JoinGroup 自动恢复群组关系和补发消息 正面影响,减少客户端代码
全局广播事件名 无独立广播事件 新增 ReceiveBroadcast 事件 无影响,不监听则不触发
高并发批量推送 始终逐条推送 高频时自动切换为批量推送 需监听 ReceiveBatchMessage 才能接收批量消息
⚠️ 注意:如果您的直播间可能触发高并发(>50条/秒),建议同时监听 ReceiveBatchMessage 事件,否则在高频模式下可能丢失部分消息。

向后兼容保证

  • onMessageReceived 回调继续工作,接收所有类型消息的原始 JSON
  • sendGroupMsg(groupId, messageJson) 方法继续可用
  • joinGroup(groupId) 不传 lastSequenceNumber 时行为与 v1.x 一致
  • ✅ 连接状态枚举新增值不影响已有的 switch/if 判断
  • ✅ 服务端配置全部有默认值,不配置 IMReliability 节也能正常运行

配置迁移

最小配置(单节点,使用默认参数)

// appsettings.json - 无需添加任何配置
// 所有参数使用默认值,功能自动启用(需要 Redis 连接)

推荐配置(生产环境)

{
  "IMReliability": {
    "BufferMaxSize": 500,
    "BufferTtlMinutes": 10,
    "MaxMessagesPerSecond": 5,
    "RateLimitWindowMs": 1000,
    "MergerThresholdPerSecond": 50,
    "MergerFlushIntervalMs": 200,
    "SnapshotTtlMinutes": 10,
    "EnableRedisBackplane": true,
    "RedisBackplaneConnection": "redis-cluster:6379,password=xxx,abortConnect=false"
  }
}

高并发场景配置(万人直播间)

{
  "IMReliability": {
    "BufferMaxSize": 1000,
    "BufferTtlMinutes": 5,
    "MaxMessagesPerSecond": 3,
    "RateLimitWindowMs": 1000,
    "MergerThresholdPerSecond": 30,
    "MergerFlushIntervalMs": 150,
    "SnapshotTtlMinutes": 5,
    "EnableRedisBackplane": true,
    "RedisBackplaneConnection": "redis-cluster:6379,password=xxx,abortConnect=false"
  }
}
💡 高并发场景建议降低 MergerThresholdPerSecond(更早触发合并)和 MergerFlushIntervalMs(更频繁刷新),同时降低 MaxMessagesPerSecond 限制刷屏。

Redis Key 格式参考

用途Key 格式类型TTL
消息缓冲区oms:signalr:live:{appId}:buffer:{groupId}Sorted Set10min
序列号计数器oms:signalr:live:{appId}:seq:{groupId}String (INCR)10min
频率限制oms:signalr:live:{appId}:rate:{userId}Sorted Set2s
连接快照oms:signalr:live:{appId}:snapshot:{userId}Hash10min
用户连接oms:signalr:live:{appId}:conns:{userId}Set-
群组成员oms:signalr:live:{appId}:room:{groupId}:usersHash-
连接-群组映射oms:signalr:live:conngroup:{connectionId}String-