项目背景
构建一个支持百万级智能设备并发连接的物联网数据采集平台,实时接收、处理和分析来自全国各地的智能传感器数据(如智能电表、环境监测设备、工业传感器等)。
核心功能
设备连接管理:支持百万级设备长连接,设备认证与会话保持
实时数据采集:接收设备上报的遥测数据(温度、湿度、电量等)
指令下发:远程控制设备(开关控制、参数配置等)
数据处理分析:实时数据清洗、聚合、告警
数据持久化:历史数据存储供后续分析
技术架构设计
1. 负载均衡层 - Nginx
作用:
TCP/WebSocket负载均衡,将百万设备连接分散到多个Netty节点
支持IP Hash保证设备连接稳定性(同一设备始终连接到同一节点)
健康检查,自动剔除故障节点
配置示例:
stream {
upstream netty_backend {
hash $remote_addr consistent; # IP Hash保证会话粘性
server netty-node1:8883 max_fails=3 fail_timeout=30s;
server netty-node2:8883 max_fails=3 fail_timeout=30s;
server netty-node3:8883 max_fails=3 fail_timeout=30s;
}
server {
listen 1883; # MQTT标准端口
proxy_pass netty_backend;
proxy_timeout 600s;
proxy_connect_timeout 10s;
}
}2. 接入层 - Netty三节点集群
职责:
处理MQTT协议连接(或自定义TCP协议)
设备认证(连接时验证设备ID + Token)
心跳检测(每30秒检测设备在线状态)
消息编解码(Protobuf/JSON)
数据初步校验与过滤
核心实现:
设备认证流程
// 1. 设备连接时触发
@Override
public void channelActive(ChannelHandlerContext ctx) {
String deviceId = parseDeviceId(ctx);
// 从Redis获取设备Token验证
String token = redisCluster.get("device:token:" + deviceId);
if (!validateToken(token)) {
ctx.close();
return;
}
// 保存连接到Redis(用于后续指令下发)
String nodeId = getLocalNodeId();
redisCluster.hset("device:online", deviceId, nodeId);
// 更新在线设备计数
redisCluster.incr("stats:online:count");
}数据接收处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DeviceMessage message = (DeviceMessage) msg;
// 1. 数据校验
if (!validateMessage(message)) {
return;
}
// 2. 构建Kafka消息
DataPayload payload = DataPayload.builder()
.deviceId(message.getDeviceId())
.timestamp(System.currentTimeMillis())
.dataType(message.getType())
.data(message.getData())
.build();
// 3. 异步发送到Kafka(解耦,防止阻塞IO线程)
kafkaProducer.send("topic-device-data", payload);
// 4. 更新设备最后活跃时间到Redis
redisCluster.setex("device:last_seen:" + message.getDeviceId(),
3600, String.valueOf(System.currentTimeMillis()));
}指令下发流程
// 从Redis订阅指令通道
redisCluster.subscribe("channel:device:command", (channel, message) -> {
CommandMessage cmd = JSON.parseObject(message, CommandMessage.class);
// 查找设备连接在哪个节点
String nodeId = redisCluster.hget("device:online", cmd.getDeviceId());
if (nodeId.equals(getLocalNodeId())) {
// 设备在本节点,直接下发
Channel deviceChannel = getDeviceChannel(cmd.getDeviceId());
if (deviceChannel != null && deviceChannel.isActive()) {
deviceChannel.writeAndFlush(cmd);
}
}
// 如果不在本节点,其他节点会处理
});高性能优化:
使用Netty的EventLoopGroup,每个节点支持30-40万并发连接
零拷贝技术减少内存复制
对象池复用(ByteBuf池化)
批量刷写(减少系统调用)
3. 缓存层 - Redis Cluster
作用:
设备会话管理:存储设备在线状态、连接节点信息
设备元数据缓存:设备配置信息、认证Token
实时统计:在线设备数、区域分布等
消息队列:Pub/Sub实现跨节点指令下发
分布式锁:防止设备重复连接
数据结构设计:
# 设备在线信息(Hash)
device:online -> {
"device001": "netty-node1",
"device002": "netty-node2",
...
}
# 设备认证Token(String)
device:token:device001 -> "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
# 设备最后上报时间(String)
device:last_seen:device001 -> "1701234567890"
# 实时统计(String/HyperLogLog)
stats:online:count -> "1000000"
stats:region:beijing -> "150000"4. 消息队列 - Kafka
作用:
数据解耦:Netty接收数据后立即发送到Kafka,避免阻塞
削峰填谷:应对设备数据上报峰值
数据分发:多个消费者处理不同业务
Topic设计:
topic-device-data # 原始设备数据
topic-device-data-parsed # 清洗后的数据
topic-device-alert # 告警数据
topic-device-event # 设备事件(上线/下线/故障)数据流向:
Netty接收数据 → Kafka
topic-device-data数据处理服务消费 → 数据清洗、校验、转换
处理后数据 → Kafka
topic-device-data-parsed多个下游服务消费:
实时告警服务
数据分析服务
数据存储服务
完整业务流程
场景1:设备数据上报
1. 智能电表每分钟上报电量数据
↓
2. 设备通过TCP连接发送数据到Nginx(1883端口)
↓
3. Nginx根据IP Hash转发到Netty Node2
↓
4. Netty解析MQTT数据包,提取:
- deviceId: "meter_12345"
- timestamp: 1701234567
- voltage: 220V
- current: 5A
- power: 1100W
↓
5. Netty异步发送消息到Kafka topic-device-data
↓
6. 实时处理服务消费Kafka消息:
- 数据校验(电压是否在正常范围)
- 数据聚合(计算当前小时用电量)
- 异常检测(功率突然升高触发告警)
↓
7. 处理后数据分流:
- 告警数据 → topic-device-alert → 告警推送服务(短信/邮件)
- 正常数据 → topic-device-data-parsed → 存储服务
↓
8. 存储服务将数据写入:
- 时序数据库(InfluxDB/TDengine):原始时序数据
- MySQL:设备统计信息、告警记录
- HBase/Elasticsearch:历史数据归档,供大数据分析场景2:远程控制设备
1. 用户在管理后台点击"关闭设备"
↓
2. API服务构建控制指令,查询Redis获取设备在线节点
redis> HGET device:online "meter_12345"
返回: "netty-node2"
↓
3. API服务发布消息到Redis Pub/Sub:
redis> PUBLISH channel:device:command '{
"deviceId": "meter_12345",
"command": "switch_off"
}'
↓
4. Netty Node2订阅到消息,查找设备连接的Channel
↓
5. 通过Channel直接写入指令到设备
↓
6. 设备执行并返回响应(成功/失败)
↓
7. Netty将响应结果发送到Kafka topic-device-event
↓
8. API服务消费事件,更新数据库并通知前端数据最终去向
1. 实时数据(热数据)
Redis:最近1小时的设备状态、实时统计
时序数据库(InfluxDB/TDengine):最近7-30天的时序数据
用于实时监控看板
快速查询设备历史趋势
2. 业务数据
MySQL:
设备基础信息表(device_info)
告警记录表(alert_records)
用户操作日志(operation_logs)
设备统计报表(daily_statistics)
3. 历史数据(冷数据)
HBase/Elasticsearch:
全量历史原始数据(保存1-3年)
用于大数据分析、机器学习训练
支持复杂查询和数据挖掘
4. 数据分析
数据仓库(Hive/ClickHouse):
离线批处理分析
设备运行报告、故障统计
用户行为分析
性能指标
并发连接数:100万+ 长连接
消息吞吐量:50万条/秒
消息延迟:P99 < 100ms
系统可用性:99.95%
数据可靠性:零丢失(Kafka持久化 + 消费确认)
面试亮点
技术难点与解决方案
百万连接如何管理?
Netty集群 + Nginx负载均衡
单节点30-40万连接,3节点可支撑120万
Redis集中管理设备会话,支持跨节点指令下发
如何保证消息不丢失?
Kafka持久化(acks=all)
Netty异步发送 + 重试机制
消费者手动提交offset
如何应对流量峰值?
Kafka削峰填谷
Netty背压(Backpressure)控制
限流熔断(Sentinel)
如何实现秒级告警?
Flink实时流处理
规则引擎(Drools)
告警聚合去重
系统如何扩展?
Netty节点无状态,可水平扩展
Kafka分区扩容
Redis Cluster动态添加节点
项目收益
支撑公司百万级IoT设备接入
实时数据处理延迟降低60%(从300ms降至100ms)
系统故障率下降80%
节省服务器成本40%(相比传统架构)
部署架构图
[智能设备层]
100万+ IoT终端设备
|
↓
[Nginx集群]
TCP负载均衡(1883端口)
IP Hash策略
|
┌─────────────────────┼─────────────────────┐
↓ ↓ ↓
[Netty Node1] [Netty Node2] [Netty Node3]
30万连接 35万连接 35万连接
| | |
└─────────────────────┴─────────────────────┘
|
┌─────────────┼─────────────┐
↓ ↓
[Redis Cluster] [Kafka Cluster]
设备会话/缓存/锁 消息队列/数据总线
|
┌─────────────────────┼─────────────────────┐
↓ ↓ ↓
[实时处理服务] [告警服务] [存储服务]
Flink/Spark 规则引擎/推送 数据持久化
| | |
└─────────────────────┴─────────────────────┘
|
┌─────────────────────┼─────────────────────┐
↓ ↓ ↓
[InfluxDB] [MySQL] [HBase/ES]
时序数据 业务数据 历史归档项目总结
该系统是一个典型的高并发、低延迟、高可靠的IoT平台架构,涵盖了从设备接入、数据处理到存储分析的完整链路。通过Nginx+Netty+Redis+Kafka的组合,实现了百万级设备的稳定接入和实时数据处理,是物联网、车联网、工业互联网领域的经典解决方案。