摘要
本文系统性地阐述了某车企在百万级车联网设备背景下,如何通过构建"流优先、存算分离、统一分析、可观测、可治理"的云原生数据平台,完成从传统大数据架构向现代化实时数仓的跃迁。重点介绍了 Apache Doris 3.0 存算分离架构的生产级落地实践,并结合 Flink CDC、Kubernetes Operator、Prometheus 等技术,打造高可用、高性能、易运维的下一代数据底座。
1. 引言:从"批处理思维"到"流优先 + 云原生"
随着智能网联汽车在全球范围的快速部署,某北欧车企已实现超过100万辆车辆的实时数据联网。每辆车每秒上报位置、电池、驾驶行为等数十个维度数据,日均数据量突破PB级别,峰值写入吞吐达10万条/秒。
原有以 Hive 批处理为核心、Airflow 调度的 T+1 架构,已无法满足远程诊断、电池健康预警、用户画像等关键业务对秒级延迟、高并发查询、弹性伸缩的需求。为此,我们启动了数据平台的全面云原生化升级,目标是构建一个以流为先、存算分离、统一分析、全链路可观测、支持自动化治理的现代化数据平台。
1.1 架构升级全景图
原架构:
Kafka → Flume → HDFS → Hive (T+1) → 报表
↑
Airflow 定时调度
新架构:
Kafka → Flink CDC → Flink Stream → Doris 3.0 (存算分离) → 实时报表
↓
S3 存储
↑
Kubernetes + Prometheus
1.2 核心升级内容
主要升级:
实时优先:Flink 流式计算全面替代 Hive 批量处理,实现端到端秒级延迟
实时采集:Flink CDC 替代传统 Batch 拉取模式,实现全量+增量一体化同步
统一分析:基于 Apache Doris 3.0,统一替代 HBase 点查与 Hive 离线报表
存算分离:Doris 3.0 + S3 构建弹性可扩展的数据底座
可观测性:构建基于 Prometheus + Grafana + PushGateway 的全链路监控体系
云原生运维:基于 Kubernetes 与 StreamPark,实现 Flink 任务的容器化、可视化、自动化管理
次要升级:
Flink 版本升级:从 1.15 升级至 1.19
代码优化:自研容错反序列化器、异步 IO 调用、精确一次 Sink 实现
权限治理:引入 Apache Ranger,实现统一权限管控
2. 实时优先:Flink 流式计算全面替代 Hive 批量处理
2.1 原有架构与问题
原架构:
Kafka → Flume → HDFS → Hive (T+1) → 报表
↑
Airflow 定时调度
存在问题:
延迟高:数据从产生到可用需1~24小时,无法支持实时业务
链路长:组件多,故障定位耗时长
资源利用率低:批处理任务集中在凌晨执行,形成"潮汐负载"
扩展性差:突发流量无法快速响应
2.2 技术选型对比
2.3 选择 Flink 的原因
Flink 凭借其精确一次语义(Exactly-Once)、低延迟、强大的状态管理能力,已成为实时数仓的事实标准:
阿里巴巴:日均处理超10万亿事件,支撑双十一实时大屏、风控、推荐
腾讯:日均处理PB级数据,支撑广告推荐与实时监控
Uber:实现实时计费与司机调度
2.4 生产级 Flink 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ✅ 启用精确一次语义
env.enableCheckpointing(30_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// ✅ 大状态使用 RocksDB
StateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();
env.setStateBackend(rocksDBBackend);
// ✅ Checkpoint 存储于 S3
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/prod/");
// ✅ 容错配置
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.of(1, TimeUnit.SECONDS),
Time.of(10, TimeUnit.MINUTES),
1.2
));
2.5 Flink 1.15 → 1.19 升级收益
升级效果对比:
2.6 升级价值
数据延迟:从 T+1 → 3~8 秒
Hive 任务减少:70%+
支持 CEP:复杂事件处理,如急加速、急刹车模式识别
资源利用率提升:消除"潮汐负载",计算资源持续高效利用
3. 实时数据采集:Flink CDC 实现全量+增量一体化同步
3.1 原有模式问题
通过定时 SQL 拉取 MySQL:
SELECT * FROM vehicle_status WHERE update_time > '...'
存在问题:
延迟高:5~15分钟,无法满足实时需求
一致性差:拉取过程中数据变更无法感知,易漏读或重复
性能影响:大表全表扫描导致数据库负载升高
无法处理删除:物理删除无法捕获
3.2 技术选型对比
3.3 选择 Flink CDC 的原因
无缝集成:作为 Flink Source,与 Checkpoint 机制深度集成,保障 Exactly-Once
全量+增量一体化:自动完成快照与 binlog 消费的切换
无锁读取:通过 RR 隔离级别 + GTID 实现一致性快照
Schema 变更支持:实验性支持 ADD COLUMN 等 DDL
3.4 生产级 Flink CDC 配置
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("mysql-prod.internal")
.port(3306)
.databaseList("vehicle_db")
.tableList("vehicle_db.vehicle_status", "vehicle_db.driver_profile")
.username("flink_reader")
.password("secure_password")
// ✅ 从初始快照开始,实现全量+增量
.startupOptions(StartupOptions.initial())
// ✅ 并行快照配置
.splitSize(8096)
.splitMetaGroupSize(1000)
// ✅ 服务器时区配置
.serverTimeZone("UTC")
.build();
DataStreamSource<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"MySQL CDC Source"
);
3.5 生产环境优化配置
MySQL 用户权限配置:
-- 创建专用同步用户
CREATE USER 'flink_reader'@'%' IDENTIFIED BY 'secure_password';
-- 授予必要权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_reader'@'%';
-- 针对具体库表的权限
GRANT SELECT ON vehicle_db.* TO 'flink_reader'@'%';
FLUSH PRIVILEGES;
MySQL 服务器配置:
# my.cnf
[mysqld]
# 启用 binlog
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
# GTID 配置(推荐)
gtid-mode=ON
enforce-gtid-consistency=ON
# 服务器时区
default-time-zone='+00:00'
3.6 常见问题及解决方案
问题1:快照阶段耗时过长
// 解决方案:启用并行快照
.splitSize(4096) // 调小 split 大小
.splitMetaGroupSize(1000) // 增加并行度
问题2:时区问题导致时间字段错误
// 解决方案:显式设置时区
.serverTimeZone("UTC")
.jdbcProperties(Properties.of("serverTimezone", "UTC"))
问题3:DDL 变更导致任务失败
// 解决方案:升级至 CDC 3.1+,启用 Schema Evolution
.includeSchemaChanges(true)
.option("scan.incremental.snapshot.enabled", "true")
3.7 升级价值
同步延迟:从15分钟 → <2秒
源库负载:CPU使用率下降60%
变更捕获:完整支持 insert/update/delete
运维成本:减少70%的ETL脚本维护
4. 统一分析引擎:Apache Doris 3.0 存算分离架构生产实践
4.1 原有架构痛点
采用 HBase(点查) + Hive(分析) 双引擎:
存在问题:
查询割裂:同一数据需维护两套逻辑
一致性难保障:ETL延迟导致数据不一致
开发成本高:需编写两套SQL
性能瓶颈:Hive查询延迟120s+,HBase并发有限
4.2 选择 Apache Doris 的原因
Doris 是 MPP 架构的实时 OLAP 引擎,具备以下优势:
统一分析:同时支持高并发点查与复杂分析
高性能:列式存储 + 向量化执行 + CBO 优化
易用性:兼容 MySQL 协议,支持标准 SQL
云原生:自3.0版本起,正式支持存算分离架构
行业实践:小米、京东、腾讯音乐、B站等均已采用Doris作为统一OLAP引擎。
4.3 Doris 3.0 存算分离架构
┌─────────────────┐ ┌─────────────────┐
│ Frontend (FE) │ │ Frontend (FE) │
│ SQL Gateway │ │ SQL Gateway │
└─────────────────┘ └─────────────────┘
│ │
└───────────┬───────────┘
│
┌─────────────────────────────────────────┐
│ Meta Service (MS) │
│ FoundationDB Cluster │
└─────────────────────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Backend(BE) │ │ Backend(BE) │ │ Backend(BE) │
│ Compute │ │ Compute │ │ Compute │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌───────────┼───────────┐
│ │ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ S3 Bucket │ │ S3 Bucket │ │ S3 Bucket │
│ Storage │ │ Storage │ │ Storage │
└─────────────┘ └─────────────┘ └─────────────┘
4.4 核心组件说明
4.5 生产环境部署规划
FoundationDB 集群:
# FoundationDB 配置
version: 1
cluster:
redundancy_mode: triple # 三副本保证高可用
storage_engine: ssd-2
processes:
- class: storage
count: 6 # 至少6个存储进程
- class: log
count: 3 # 3个日志进程
- class: stateless
count: 3 # 3个无状态进程
Meta Service + FE 部署:
# ms-fe-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: doris-ms-fe
spec:
replicas: 3
template:
spec:
containers:
- name: meta-service
image: apache/doris:3.0.1-meta-service
env:
- name: FDB_CLUSTER_FILE
value: /etc/foundationdb/fdb.cluster
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
- name: frontend
image: apache/doris:3.0.1-fe
env:
- name: DORIS_ROOT
value: /opt/apache-doris
- name: FE_SERVERS
value: doris-fe-0:9010,doris-fe-1:9010,doris-fe-2:9010
ports:
- containerPort: 8030 # HTTP端口
- containerPort: 9010 # RPC端口
resources:
requests:
memory: "8Gi"
cpu: "4"
limits:
memory: "16Gi"
cpu: "8"
Backend 弹性计算节点:
# be-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: doris-be
spec:
replicas: 6 # 可根据负载动态调整
template:
spec:
containers:
- name: backend
image: apache/doris:3.0.1-be
env:
- name: FE_SERVERS
value: doris-fe:8030
- name: BE_ADDR
valueFrom:
fieldRef:
fieldPath: status.podIP
resources:
requests:
memory: "16Gi"
cpu: "8"
limits:
memory: "32Gi"
cpu: "16"
volumeMounts:
- name: be-data
mountPath: /opt/apache-doris/be/storage
volumes:
- name: be-data
emptyDir: {} # 存算分离架构下,BE无需持久化存储
4.6 存算分离模式表设计
-- 创建存算分离模式的数据表
CREATE TABLE dws.dws_vehicle_summary (
cds_id VARCHAR(64) COMMENT '车辆ID',
vin VARCHAR(17) COMMENT 'VIN码',
battery_level DECIMAL(5,2) COMMENT '电量百分比',
last_location STRING COMMENT '最后位置(JSON格式)',
mileage_total BIGINT COMMENT '总里程(米)',
last_charge_time DATETIME COMMENT '最后充电时间',
update_time DATETIME COMMENT '数据更新时间'
)
DUPLICATE KEY(cds_id, update_time) -- 存算分离下推荐DUPLICATE KEY
DISTRIBUTED BY HASH(cds_id) BUCKETS 32 -- 根据数据量选择合适bucket数
PROPERTIES (
"storage_medium" = "S3", -- 使用S3存储
"storage_cooldown_time" = "2023-12-31 23:59:59", -- 冷却时间
"enable_unique_key_merge_on_write" = "false", -- 关闭MOW提升写入性能
"compression_type" = "ZSTD", -- 压缩算法
"enable_single_replica_compaction" = "true" -- 单副本压缩
);
-- 创建分区表支持历史数据管理
CREATE TABLE dwd.dwd_vehicle_events (
event_id VARCHAR(64),
cds_id VARCHAR(64),
event_type VARCHAR(32),
event_data JSON,
event_time DATETIME
)
DUPLICATE KEY(event_id)
PARTITION BY RANGE(event_time) (
PARTITION p202312 VALUES LESS THAN ("2024-01-01 00:00:00"),
PARTITION p202401 VALUES LESS THAN ("2024-02-01 00:00:00")
-- 自动分区管理
)
DISTRIBUTED BY HASH(cds_id) BUCKETS 32
PROPERTIES (
"storage_medium" = "S3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.start" = "-12", -- 保留12个月历史数据
"dynamic_partition.end" = "3", -- 预创建3个月分区
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
4.7 Flink 写入 Doris 精确一次配置
// Doris Sink 精确一次配置
DorisOptions options = DorisOptions.builder()
.setFenodes("doris-fe-svc:8030") // K8s Service
.setTableIdentifier("dws.dws_vehicle_summary")
.setUsername("root")
.setPassword("")
.build();
DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("flink_job_" + System.currentTimeMillis())
.setEnable2PC(true) // 启用两阶段提交保证精确一次
.setBufferSize(8192)
.setBufferCount(3)
.setMaxRetries(3)
.setStreamLoadProp(Properties.of(
"format", "json",
"read_json_by_line", "true",
"load_to_single_tablet", "false"
))
.build();
DorisReadOptions readOptions = DorisReadOptions.builder().build();
vehicleDataStream
.map(new VehicleDataSerializer()) // 序列化为JSON
.addSink(DorisSink.sink(readOptions, executionOptions, options));
4.8 性能调优实践
写入性能优化:
-- 调整Stream Load参数
SET enable_insert_strict = false;
SET enable_http_server_v2 = true;
SET max_filter_ratio = 0.1;
SET load_mem_limit = 4294967296; -- 4GB
查询性能优化:
-- 创建Rollup表加速聚合查询
ALTER TABLE dws.dws_vehicle_summary
ADD ROLLUP vehicle_daily_summary (
cds_id,
battery_level,
mileage_total,
DATE(update_time)
);
-- 创建物化视图
CREATE MATERIALIZED VIEW mv_vehicle_battery_trend AS
SELECT
cds_id,
DATE(update_time) as stat_date,
AVG(battery_level) as avg_battery_level,
MIN(battery_level) as min_battery_level,
MAX(battery_level) as max_battery_level
FROM dws.dws_vehicle_summary
GROUP BY cds_id, DATE(update_time);
4.9 升级价值
查询延迟:从Hive的120s → Doris的1.5s
并发能力:支持1000+并发查询
弹性计算:BE节点可独立扩缩,应对查询高峰
成本优化:存储成本降低40%+(S3 + 压缩)
统一出口:"一数一源",数据一致性显著提升
5. 云原生基础设施:Kubernetes + StreamPark 实现高效运维
5.1 原有模式问题
Flink作业通过Shell脚本提交至YARN:
存在问题:
无版本管理:作业升级回滚困难
无可视化界面:需要命令行操作
资源隔离差:多任务共享YARN资源池
运维复杂:手动管理配置文件
5.2 选择 Kubernetes 的原因
K8s已成云原生基础设施标准:
弹性调度:HPA自动伸缩
资源隔离:Namespace天然隔离
CI/CD友好:Helm/ArgoCD集成
统一编排:声明式管理
5.3 Flink on Kubernetes 部署架构
┌─────────────────────┐ ┌──────────────────┐
│ StreamPark │ │ Flink Operator │
│ Management UI │ │ K8s Operator │
└─────────────────────┘ └──────────────────┘
│ │
└──────────┬─────────────────┘
│
┌───────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ JM │ │ TM │ │ TM │ │
│ │Pod │ │Pod │ │Pod │ │
│ └─────┘ └─────┘ └─────┘ │
│ │
└───────────────────────────┘
5.4 Flink Kubernetes Operator 配置
# flink-operator.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: vehicle-realtime-etl
namespace: flink-jobs
spec:
image: flink:1.19.1-scala_2.12-java11
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: rocksdb
state.checkpoints.dir: s3://flink-checkpoints/vehicle-etl/
state.savepoints.dir: s3://flink-savepoints/vehicle-etl/
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://flink-ha/vehicle-etl/
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 10min
restart-strategy.exponential-delay.backoff-multiplier: 1.2
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 30s
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 3
job:
jarURI: s3://flink-jars/vehicle-realtime-etl-1.0.0.jar
parallelism: 12
upgradeMode: savepoint
allowNonRestoredState: false
args:
- --kafka.bootstrap.servers
- kafka-cluster:9092
- --doris.fenodes
- doris-fe-svc:8030
5.5 StreamPark 集成配置
StreamPark 部署:
# streampark-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: streampark-console
spec:
replicas: 2
template:
spec:
containers: