IM SDK 可靠性增强 概览
IM SDK v2.0 针对直播间场景进行了全面的可靠性增强,解决了消息丢失、高并发卡顿、断线恢复等核心痛点。本次升级覆盖服务端(ASP.NET Core SignalR)和四端客户端 SDK(Flutter、iOS、Android、Web)。
九大模块
| 模块 | 功能 | 解决的问题 |
|---|---|---|
| NEW消息缓存与断连补发 | Redis Sorted Set 缓存最近 500 条消息,断连后自动补发 | 网络抖动导致消息丢失 |
| 增强全局广播 | 使用 SignalR Group 机制高效推送飘屏通知 | 全局广播性能瓶颈 |
| NEWRedis Backplane 集群 | 多节点 SignalR 集群,共享状态存储在 Redis | 单节点并发上限 |
| NEW消息频率限制 | 滑动窗口算法限制用户发送频率(默认 5 条/秒) | 恶意刷屏 |
| NEW高并发消息合并 | 消息频率超阈值时自动批量推送(200ms 间隔) | 万人直播间客户端卡顿 |
| NEW结构化消息类型 | TextMessage、GiftMessage、SystemNotice、CustomMessage 四种标准类型 | 客户端需手动解析 JSON |
| NEW断线重连状态恢复 | Connection Snapshot 自动恢复群组关系和补发消息 | 重连后需手动重新加入 |
| NEW消息序列号与去重 | 群组内单调递增序列号,客户端自动去重和间隙检测 | 消息重复和乱序 |
| 增强连接状态增强 | 新增 restoring 状态、重连事件、心跳机制 | 连接状态不透明 |
技术栈
- 后端:ASP.NET Core 8 + SignalR Hub(OrangeCloud.SignalR 项目)
- 缓存/状态:StackExchange.Redis(Backplane、消息缓存、频率限制、连接快照、共享状态)
- 集群:Microsoft.AspNetCore.SignalR.StackExchangeRedis
- Flutter SDK:orangecloud_im_client(Dart Package)
- iOS SDK:OrangeCloudIMClient(Swift Package)
- Android SDK:orangecloud-im-client(AAR Library)
- Web SDK:orangecloud-im-client(npm + TypeScript)
服务端 API
LiveHub 路由:/hubs/live,以下为新增和增强的 Hub 方法。
Hub 方法签名
| 方法 | 参数 | 说明 |
|---|---|---|
JoinGroup |
string groupId, long? lastSequenceNumber |
加入群组。传入 lastSequenceNumber 时自动补发断连期间消息 |
QuitGroup |
string groupId |
退出群组,清除连接快照 |
SendGroupMsg |
string groupId, string messageJson |
发送群组消息。服务端自动注入 sequenceNumber 和 serverTimestamp |
GetGroupMemberList |
string groupId |
获取群组在线成员列表 |
RequestBackfill |
string groupId, long afterSeq |
NEW请求补发指定序列号之后的消息 |
Heartbeat |
无 | NEW心跳包,维持连接活跃状态 |
JoinGroup 增强说明
lastSequenceNumber 参数为可选。传入时,服务端会从 Message Buffer 中查询该序列号之后的所有消息并补发:
// 客户端调用示例(SignalR invoke)
await connection.invoke("JoinGroup", "room_10001", 12345);
// 服务端处理逻辑:
// 1. 将连接加入 SignalR Group
// 2. 如果 lastSequenceNumber 有值,从 Buffer 查询 seq > 12345 的消息
// 3. 按序列号顺序逐条发送给该客户端
// 4. 如果 Buffer 中最小 seq > lastSequenceNumber,发送 BufferOverflow 事件
服务端事件列表
以下事件由服务端推送到客户端:
| 事件名 | 数据格式 | 触发时机 |
|---|---|---|
ReceiveMessage |
JSON string(含 sequenceNumber、serverTimestamp) | 收到群组消息(逐条模式) |
ReceiveBroadcast |
JSON string | NEW收到全局广播消息(飘屏通知) |
ReceiveBatchMessage |
JSON string(messages 数组) | NEW收到批量消息(高并发合并模式) |
StateRestored |
{ groupIds: [], backfilledCount: N } |
NEW断线重连状态恢复完成 |
RateLimited |
{ cooldownMs: N } |
NEW消息被频率限制拒绝 |
BufferOverflow |
{ groupId: "..." } |
NEW消息缓冲区溢出(断连过久) |
UserJoined |
JSON string(用户信息) | 用户加入群组 |
UserLeft |
string(userId) | 用户离开群组 |
OnlineCountChanged |
int(在线人数) | 群组在线人数变化 |
ReceiveBatchMessage 数据格式
{
"type": "batch",
"messages": [
{
"messageType": "text",
"sequenceNumber": 100,
"serverTimestamp": 1700000000000,
"senderInfo": { "userId": "u1", "nickName": "张三" },
"data": { "content": "消息1" }
},
{
"messageType": "text",
"sequenceNumber": 101,
"serverTimestamp": 1700000000100,
"senderInfo": { "userId": "u2", "nickName": "李四" },
"data": { "content": "消息2" }
}
],
"count": 2
}
配置参数(IMReliability)
在 appsettings.json 中配置 IMReliability 节:
{
"IMReliability": {
"BufferMaxSize": 500,
"BufferTtlMinutes": 10,
"MaxMessagesPerSecond": 5,
"RateLimitWindowMs": 1000,
"MergerThresholdPerSecond": 50,
"MergerFlushIntervalMs": 200,
"SnapshotTtlMinutes": 10,
"EnableRedisBackplane": false,
"RedisBackplaneConnection": ""
}
}
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
BufferMaxSize | int | 500 | 每个群组消息缓冲区最大条数 |
BufferTtlMinutes | int | 10 | 消息缓冲区 TTL(分钟),超时自动清理 |
MaxMessagesPerSecond | int | 5 | 单用户每秒最大发送消息数 |
RateLimitWindowMs | int | 1000 | 频率限制滑动窗口大小(毫秒) |
MergerThresholdPerSecond | int | 50 | 消息合并触发阈值(条/秒) |
MergerFlushIntervalMs | int | 200 | 批量消息刷新间隔(毫秒) |
SnapshotTtlMinutes | int | 10 | 连接快照 TTL(分钟),超时不自动恢复 |
EnableRedisBackplane | bool | false | 是否启用 Redis Backplane 集群模式 |
RedisBackplaneConnection | string | "" | Redis Backplane 连接字符串(集群模式必填) |
Flutter SDK 增强
Flutter SDK v2.0 新增类型安全 API、事件流、消息去重、断线恢复等功能。
结构化消息类型
新增四种标准消息类型,提供强类型的发送和接收 API:
import 'package:orangecloud_im_client/orangecloud_im_client.dart';
// 消息类型枚举
enum IMMessageType {
text, // 文本聊天
gift, // 礼物动画
systemNotice, // 系统通知
custom, // 自定义扩展
}
// 消息基类
abstract class IMMessage {
final IMMessageType messageType;
final SenderInfo senderInfo;
final int timestamp;
final int sequenceNumber;
final String groupId;
}
// 具体消息类型
class TextMessage extends IMMessage { ... }
class GiftMessage extends IMMessage { ... }
class SystemNotice extends IMMessage { ... }
class CustomMessage extends IMMessage { ... }
类型安全的发送方法
final client = OrangeCloudIMClient();
// 发送文本消息
await client.sendTextMessage('room_10001', '大家好!');
// 发送礼物消息
await client.sendGiftMessage('room_10001', GiftInfo(
giftId: 'gift_001',
giftName: '火箭',
giftCount: 1,
giftPrice: 100,
animationUrl: 'https://cdn.example.com/rocket.json',
));
// 发送自定义消息
await client.sendCustomMessage('room_10001', 'like', {
'count': 1,
'emoji': '❤️',
});
事件流
新增多个类型安全的事件流,替代原有的单一 onMessageReceived:
// === 类型安全的接收回调 ===
client.onTextMessageReceived.listen((TextMessage msg) {
print('[${msg.senderInfo.nickName}]: ${msg.data.content}');
});
client.onGiftMessageReceived.listen((GiftMessage msg) {
print('${msg.senderInfo.nickName} 送出 ${msg.data.giftName} x${msg.data.giftCount}');
// 播放礼物动画
playGiftAnimation(msg.data.animationUrl);
});
client.onSystemNoticeReceived.listen((SystemNotice msg) {
print('系统通知: ${msg.data.content}');
});
client.onCustomMessageReceived.listen((CustomMessage msg) {
print('自定义消息: ${msg.data.customType} - ${msg.data.payload}');
});
// === 新增事件流 ===
// 全局广播(飘屏通知)
client.onBroadcastReceived.listen((IMMessage msg) {
showFloatingNotification(msg);
});
// 批量消息(高并发模式)
client.onBatchMessageReceived.listen((List<IMMessage> messages) {
for (final msg in messages) {
handleMessage(msg);
}
});
// 状态恢复完成
client.onStateRestored.listen((StateRestoredInfo info) {
print('已恢复 ${info.restoredGroupIds.length} 个群组');
print('补发了 ${info.backfilledMessageCount} 条消息');
});
// 重连尝试
client.onReconnectAttempt.listen((ReconnectAttemptInfo info) {
print('第 ${info.attemptNumber} 次重连,${info.delayMs}ms 后重试');
});
// 重连失败(超过最大次数)
client.onReconnectFailed.listen((_) {
showDialog('连接已断开,请检查网络后手动重连');
});
// === 向后兼容 ===
// 原有的 onMessageReceived 仍然工作,接收所有类型的原始 JSON
client.onMessageReceived.listen((String messageJson) {
// 所有消息都会触发此回调(向后兼容)
});
去重机制
SDK 自动处理消息去重,无需业务层干预:
// SDK 内部自动维护每个群组的 lastSequenceNumber
// 收到消息时:
// 1. 如果 msg.sequenceNumber <= lastSequenceNumber → 静默丢弃(去重)
// 2. 如果 msg.sequenceNumber == lastSequenceNumber + 1 → 正常处理
// 3. 如果 msg.sequenceNumber > lastSequenceNumber + 1 → 检测到间隙,自动请求补发
// 查询当前序列号
int lastSeq = client.getLastSequenceNumber('room_10001');
print('群组 room_10001 最后序列号: $lastSeq');
重连配置
final client = OrangeCloudIMClient();
// 配置最大重连次数(默认 -1 = 无限重连)
client.maxReconnectAttempts = 10;
// 重连策略:指数退避
// 第1次: 1秒后重试
// 第2次: 2秒后重试
// 第3次: 4秒后重试
// ...最大间隔 30秒
// 连接状态枚举(扩展)
// IMConnectionState.disconnected - 已断开
// IMConnectionState.connecting - 连接中
// IMConnectionState.connected - 已连接
// IMConnectionState.reconnecting - 重连中
// IMConnectionState.restoring - 状态恢复中(NEW)
client.onConnectionStateChanged.listen((state) {
switch (state) {
case IMConnectionState.restoring:
showStatus('正在恢复连接状态...');
break;
case IMConnectionState.connected:
showStatus('已连接');
break;
case IMConnectionState.reconnecting:
showStatus('重连中...');
break;
default:
break;
}
});
网络状态监听
// SDK 内部自动监听网络状态变化
// 当检测到网络从离线恢复为在线时,立即触发重连(不等待定时器)
// 心跳机制:
// - 连接成功后每 30 秒自动发送心跳包
// - 服务端超过 90 秒未收到心跳则主动断开
// - 心跳超时客户端会自动触发重连
// 无需额外配置,SDK 自动处理
iOS SDK 增强
iOS SDK v2.0 使用 Swift Concurrency 和 Combine 提供类型安全的 API。
结构化消息类型
import OrangeCloudIMClient
// 消息协议
protocol IMMessage: Codable {
var messageType: IMMessageType { get }
var senderInfo: SenderInfo { get }
var timestamp: Int { get }
var sequenceNumber: Int { get }
var groupId: String { get }
}
// 具体消息类型
struct TextMessage: IMMessage { ... }
struct GiftMessage: IMMessage { ... }
struct SystemNotice: IMMessage { ... }
struct CustomMessage: IMMessage { ... }
// 类型安全的发送方法
try await client.sendTextMessage(groupId: "room_10001", content: "大家好!")
try await client.sendGiftMessage(groupId: "room_10001", giftInfo: GiftInfo(
giftId: "gift_001",
giftName: "火箭",
giftCount: 1,
giftPrice: 100
))
try await client.sendCustomMessage(groupId: "room_10001", customType: "like", payload: ["count": 1])
事件回调
// 类型安全的接收回调
client.onTextMessage = { (msg: TextMessage) in
print("[\(msg.senderInfo.nickName)]: \(msg.data.content)")
}
client.onGiftMessage = { (msg: GiftMessage) in
playGiftAnimation(msg.data.animationUrl)
}
client.onSystemNotice = { (msg: SystemNotice) in
showSystemAlert(msg.data.content)
}
// 新增事件
client.onBroadcastReceived = { msg in /* 全局广播 */ }
client.onBatchMessageReceived = { messages in /* 批量消息 */ }
client.onStateRestored = { info in
print("恢复了 \(info.restoredGroupIds.count) 个群组")
}
client.onReconnectAttempt = { info in
print("第 \(info.attemptNumber) 次重连")
}
client.onReconnectFailed = {
showReconnectFailedAlert()
}
// 向后兼容:原有 onMessageReceived 继续工作
client.onMessageReceived = { messageJson in /* 所有消息原始 JSON */ }
重连与恢复
// 配置最大重连次数
client.maxReconnectAttempts = 10 // 默认 -1(无限重连)
// 获取序列号
let lastSeq = client.getLastSequenceNumber(groupId: "room_10001")
// 连接状态枚举
enum IMConnectionState {
case disconnected
case connecting
case connected
case reconnecting
case restoring // NEW: 状态恢复中
}
// 网络恢复立即重连(使用 NWPathMonitor)
// 心跳:每 30 秒自动发送
Android SDK 增强
Android SDK v2.0 使用 Kotlin Coroutines 和 Flow 提供类型安全的 API。
结构化消息类型
import com.orangecloud.im.OrangeCloudIMClient
import com.orangecloud.im.models.*
// 消息基类(使用 Gson 注解)
abstract class IMMessage {
abstract val messageType: IMMessageType
abstract val senderInfo: SenderInfo
abstract val timestamp: Long
abstract val sequenceNumber: Long
abstract val groupId: String
}
// 具体消息类型
data class TextMessage(...) : IMMessage()
data class GiftMessage(...) : IMMessage()
data class SystemNotice(...) : IMMessage()
data class CustomMessage(...) : IMMessage()
// 类型安全的发送方法
client.sendTextMessage("room_10001", "大家好!")
client.sendGiftMessage("room_10001", GiftInfo(
giftId = "gift_001",
giftName = "火箭",
giftCount = 1,
giftPrice = 100
))
client.sendCustomMessage("room_10001", "like", mapOf("count" to 1))
事件回调
// 类型安全的接收回调(Lambda)
client.onTextMessage = { msg: TextMessage ->
Log.d("IM", "[${msg.senderInfo.nickName}]: ${msg.data.content}")
}
client.onGiftMessage = { msg: GiftMessage ->
playGiftAnimation(msg.data.animationUrl)
}
client.onSystemNotice = { msg: SystemNotice ->
showSystemAlert(msg.data.content)
}
// 新增事件
client.onBroadcastReceived = { msg -> /* 全局广播 */ }
client.onBatchMessageReceived = { messages -> /* 批量消息 */ }
client.onStateRestored = { info ->
Log.d("IM", "恢复了 ${info.restoredGroupIds.size} 个群组")
}
client.onReconnectAttempt = { info ->
Log.d("IM", "第 ${info.attemptNumber} 次重连")
}
client.onReconnectFailed = {
showReconnectFailedDialog()
}
// 向后兼容
client.onMessageReceived = { messageJson -> /* 所有消息原始 JSON */ }
重连与恢复
// 配置最大重连次数
client.maxReconnectAttempts = 10 // 默认 -1(无限重连)
// 获取序列号
val lastSeq = client.getLastSequenceNumber("room_10001")
// 连接状态枚举
enum class IMConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
RECONNECTING,
RESTORING // NEW: 状态恢复中
}
// 网络恢复立即重连(使用 ConnectivityManager.NetworkCallback)
// 心跳:每 30 秒自动发送
Web SDK 增强
Web SDK v2.0 使用 TypeScript 提供完整的类型定义和类型安全 API。
结构化消息类型
import { OrangeCloudIMClient, IMMessageType, TextMessage, GiftMessage } from 'orangecloud-im-client';
// TypeScript 接口定义
interface IMMessage {
messageType: IMMessageType;
senderInfo: SenderInfo;
timestamp: number;
sequenceNumber: number;
groupId: string;
}
interface TextMessage extends IMMessage {
data: { content: string };
}
interface GiftMessage extends IMMessage {
data: {
giftId: string;
giftName: string;
giftCount: number;
giftPrice: number;
animationUrl?: string;
};
}
// 类型安全的发送方法
const client = new OrangeCloudIMClient();
await client.sendTextMessage('room_10001', '大家好!');
await client.sendGiftMessage('room_10001', {
giftId: 'gift_001',
giftName: '火箭',
giftCount: 1,
giftPrice: 100,
});
await client.sendCustomMessage('room_10001', 'like', { count: 1 });
事件回调
// 类型安全的接收回调
client.onTextMessage((msg: TextMessage) => {
console.log(`[${msg.senderInfo.nickName}]: ${msg.data.content}`);
});
client.onGiftMessage((msg: GiftMessage) => {
playGiftAnimation(msg.data.animationUrl);
});
client.onSystemNotice((msg: SystemNotice) => {
showSystemAlert(msg.data.content);
});
client.onCustomMessage((msg: CustomMessage) => {
console.log('自定义消息:', msg.data.customType, msg.data.payload);
});
// 新增事件
client.onBroadcastReceived((msg) => { /* 全局广播 */ });
client.onBatchMessageReceived((messages) => { /* 批量消息 */ });
client.onStateRestored((info) => {
console.log(`恢复了 ${info.restoredGroupIds.length} 个群组`);
});
client.onReconnectAttempt((info) => {
console.log(`第 ${info.attemptNumber} 次重连`);
});
client.onReconnectFailed(() => {
showReconnectFailedUI();
});
// 向后兼容
client.onMessageReceived((messageJson: string) => { /* 所有消息原始 JSON */ });
重连与恢复
// 配置最大重连次数
client.maxReconnectAttempts = 10; // 默认 -1(无限重连)
// 获取序列号
const lastSeq = client.getLastSequenceNumber('room_10001');
// 连接状态枚举
enum IMConnectionState {
disconnected = 'disconnected',
connecting = 'connecting',
connected = 'connected',
reconnecting = 'reconnecting',
restoring = 'restoring', // NEW: 状态恢复中
}
// 网络恢复立即重连(使用 navigator.onLine + online 事件)
// 心跳:每 30 秒自动发送(setInterval)
各平台实现差异
| 功能 | Flutter | iOS | Android | Web |
|---|---|---|---|---|
| 网络监听 | connectivity_plus | NWPathMonitor | ConnectivityManager | navigator.onLine |
| 事件模型 | Stream (Dart) | Closure / Combine | Lambda / Flow | Callback function |
| JSON 解析 | dart:convert | Codable | Gson | JSON.parse |
| 心跳实现 | Timer.periodic | DispatchSourceTimer | Handler.postDelayed | setInterval |
| 后台保活 | 不保活 | 不保活 | 不保活 | 不保活 |
架构图与时序图
以下使用 Mermaid 格式描述系统架构和关键流程。
系统架构图
graph TB
subgraph Clients["客户端"]
Flutter["Flutter SDK"]
iOS["iOS SDK"]
Android["Android SDK"]
Web["Web SDK"]
end
subgraph SignalR_Cluster["SignalR 集群"]
Node1["SignalR Node 1"]
Node2["SignalR Node 2"]
NodeN["SignalR Node N"]
end
subgraph Redis["Redis 存储层"]
Backplane["Redis Backplane\nSignalR 消息同步"]
MsgBuffer["Message Buffer\nSorted Set per Group"]
SeqCounter["Sequence Counter\nINCR per Group"]
RateLimiter["Rate Limiter\nSliding Window ZSET"]
Snapshot["Connection Snapshot\nHash per User"]
SharedState["Shared State\nUserConnections / RoomUsers"]
end
Flutter & iOS & Android & Web -->|"WebSocket"| Node1 & Node2 & NodeN
Node1 & Node2 & NodeN <-->|"Pub/Sub"| Backplane
Node1 & Node2 & NodeN --> MsgBuffer
Node1 & Node2 & NodeN --> SeqCounter
Node1 & Node2 & NodeN --> RateLimiter
Node1 & Node2 & NodeN --> Snapshot
Node1 & Node2 & NodeN --> SharedState
架构说明
- 客户端层:四端 SDK 通过 WebSocket 连接到 SignalR 集群任一节点
- SignalR 集群:多个 SignalR 节点通过 Redis Backplane 同步消息,支持水平扩展
- Redis 存储层:所有共享状态存储在 Redis 中,确保多节点一致性
消息流转时序图
sequenceDiagram
participant C as Client
participant H as LiveHub
participant R as Redis
participant G as Group Clients
C->>H: SendGroupMsg(groupId, messageJson)
H->>R: ZRANGEBYSCORE (Rate Limiter Check)
alt Rate Limited
H-->>C: RateLimited Event { cooldownMs }
else Allowed
H->>R: INCR (Get Sequence Number)
R-->>H: sequenceNumber
H->>H: Inject sequenceNumber + serverTimestamp
H->>R: ZADD Message_Buffer (score=seq)
H->>H: Message Merger Check
alt High Frequency (>50/sec)
H->>H: Queue to Merger
H-->>G: ReceiveBatchMessage (every 200ms)
else Normal
H-->>G: ReceiveMessage
end
end
消息处理流程说明
- 频率检查:使用 Redis Sorted Set 滑动窗口检查用户发送频率
- 序列号分配:通过 Redis INCR 原子操作获取群组内唯一递增序列号
- 元数据注入:服务端自动注入 sequenceNumber 和 serverTimestamp
- 缓冲区写入:消息写入 Redis Sorted Set(Score = sequenceNumber)
- 推送决策:根据当前群组消息频率决定逐条推送或批量合并
断线重连恢复时序图
sequenceDiagram
participant C as Client
participant H as LiveHub
participant R as Redis
Note over C: 网络断开
H->>R: HSET Connection_Snapshot\n(groups, lastSeqNums, createdAt)
Note over C: 网络恢复,自动重连
C->>H: OnConnected (携带 userId)
H->>R: HGETALL Connection_Snapshot
alt Snapshot Exists (< 10min)
H->>H: Auto JoinGroup for each group
loop For each group
H->>R: ZRANGEBYSCORE Buffer\n(seq > lastSeq)
R-->>H: Missed messages
H-->>C: Backfill messages (ReceiveMessage)
end
H-->>C: StateRestored Event\n{ groupIds, backfilledCount }
H->>R: DEL Connection_Snapshot
else Snapshot Expired (> 10min)
H-->>C: No auto-restore
Note over C: 需手动调用 JoinGroup
end
断线恢复流程说明
- 断连检测:服务端在 OnDisconnectedAsync 中保存连接快照到 Redis
- 快照内容:用户所在群组列表 + 每个群组的 lastSequenceNumber
- 重连恢复:重连后自动查询快照,恢复群组关系并补发消息
- 快照过期:TTL 10 分钟,超时后不自动恢复,需客户端手动加入
- 恢复通知:恢复完成后发送 StateRestored 事件通知客户端
迁移指南
从 IM SDK v1.x 升级到 v2.0 的步骤和注意事项。
升级步骤
服务端升级
-
更新 NuGet 包
# 新增依赖 dotnet add package Microsoft.AspNetCore.SignalR.StackExchangeRedis -
添加配置节
在
appsettings.json中添加IMReliability配置(见上方配置参数章节) -
注册服务
Program.cs 中的服务注册已自动完成,无需手动修改
-
确保 Redis 可用
消息缓存、频率限制等功能依赖 Redis。确保 Redis 连接字符串配置正确
客户端升级
- 更新 SDK 版本:将各平台 SDK 更新到 v2.0
- (可选)使用类型安全 API:将
sendGroupMsg替换为sendTextMessage等 - (可选)监听新事件:添加
onBroadcastReceived、onStateRestored等监听 - (可选)配置重连参数:设置
maxReconnectAttempts
Breaking Changes
| 变化项 | v1.x 行为 | v2.0 行为 | 影响 |
|---|---|---|---|
| 消息 JSON 新增字段 | 无 sequenceNumber/serverTimestamp | 服务端自动注入这两个字段 | 无影响,新增字段不影响已有解析逻辑 |
| 重连后自动恢复 | 重连后需手动 JoinGroup | 自动恢复群组关系和补发消息 | 正面影响,减少客户端代码 |
| 全局广播事件名 | 无独立广播事件 | 新增 ReceiveBroadcast 事件 | 无影响,不监听则不触发 |
| 高并发批量推送 | 始终逐条推送 | 高频时自动切换为批量推送 | 需监听 ReceiveBatchMessage 才能接收批量消息 |
ReceiveBatchMessage 事件,否则在高频模式下可能丢失部分消息。向后兼容保证
- ✅
onMessageReceived回调继续工作,接收所有类型消息的原始 JSON - ✅
sendGroupMsg(groupId, messageJson)方法继续可用 - ✅
joinGroup(groupId)不传 lastSequenceNumber 时行为与 v1.x 一致 - ✅ 连接状态枚举新增值不影响已有的 switch/if 判断
- ✅ 服务端配置全部有默认值,不配置 IMReliability 节也能正常运行
配置迁移
最小配置(单节点,使用默认参数)
// appsettings.json - 无需添加任何配置
// 所有参数使用默认值,功能自动启用(需要 Redis 连接)
推荐配置(生产环境)
{
"IMReliability": {
"BufferMaxSize": 500,
"BufferTtlMinutes": 10,
"MaxMessagesPerSecond": 5,
"RateLimitWindowMs": 1000,
"MergerThresholdPerSecond": 50,
"MergerFlushIntervalMs": 200,
"SnapshotTtlMinutes": 10,
"EnableRedisBackplane": true,
"RedisBackplaneConnection": "redis-cluster:6379,password=xxx,abortConnect=false"
}
}
高并发场景配置(万人直播间)
{
"IMReliability": {
"BufferMaxSize": 1000,
"BufferTtlMinutes": 5,
"MaxMessagesPerSecond": 3,
"RateLimitWindowMs": 1000,
"MergerThresholdPerSecond": 30,
"MergerFlushIntervalMs": 150,
"SnapshotTtlMinutes": 5,
"EnableRedisBackplane": true,
"RedisBackplaneConnection": "redis-cluster:6379,password=xxx,abortConnect=false"
}
}
Redis Key 格式参考
| 用途 | Key 格式 | 类型 | TTL |
|---|---|---|---|
| 消息缓冲区 | oms:signalr:live:{appId}:buffer:{groupId} | Sorted Set | 10min |
| 序列号计数器 | oms:signalr:live:{appId}:seq:{groupId} | String (INCR) | 10min |
| 频率限制 | oms:signalr:live:{appId}:rate:{userId} | Sorted Set | 2s |
| 连接快照 | oms:signalr:live:{appId}:snapshot:{userId} | Hash | 10min |
| 用户连接 | oms:signalr:live:{appId}:conns:{userId} | Set | - |
| 群组成员 | oms:signalr:live:{appId}:room:{groupId}:users | Hash | - |
| 连接-群组映射 | oms:signalr:live:conngroup:{connectionId} | String | - |