夜航星
Lila's dream
Lila's dream
Published on 2025-03-29 / 2 Visits
0
0

基于 Flink + Doris + Kubernetes 的实时数仓升级实践

摘要

本文系统性地阐述了某车企在百万级车联网设备背景下,如何通过构建"流优先、存算分离、统一分析、可观测、可治理"的云原生数据平台,完成从传统大数据架构向现代化实时数仓的跃迁。重点介绍了 Apache Doris 3.0 存算分离架构的生产级落地实践,并结合 Flink CDC、Kubernetes Operator、Prometheus 等技术,打造高可用、高性能、易运维的下一代数据底座。

1. 引言:从"批处理思维"到"流优先 + 云原生"

随着智能网联汽车在全球范围的快速部署,某北欧车企已实现超过100万辆车辆的实时数据联网。每辆车每秒上报位置、电池、驾驶行为等数十个维度数据,日均数据量突破PB级别,峰值写入吞吐达10万条/秒。

原有以 Hive 批处理为核心、Airflow 调度的 T+1 架构,已无法满足远程诊断、电池健康预警、用户画像等关键业务对秒级延迟、高并发查询、弹性伸缩的需求。为此,我们启动了数据平台的全面云原生化升级,目标是构建一个以流为先、存算分离、统一分析、全链路可观测、支持自动化治理的现代化数据平台。

1.1 架构升级全景图

原架构:
Kafka → Flume → HDFS → Hive (T+1) → 报表
           ↑
    Airflow 定时调度

新架构:
Kafka → Flink CDC → Flink Stream → Doris 3.0 (存算分离) → 实时报表
                                      ↓
                                   S3 存储
                                      ↑
                              Kubernetes + Prometheus

1.2 核心升级内容

主要升级:

  • 实时优先:Flink 流式计算全面替代 Hive 批量处理,实现端到端秒级延迟

  • 实时采集:Flink CDC 替代传统 Batch 拉取模式,实现全量+增量一体化同步

  • 统一分析:基于 Apache Doris 3.0,统一替代 HBase 点查与 Hive 离线报表

  • 存算分离:Doris 3.0 + S3 构建弹性可扩展的数据底座

  • 可观测性:构建基于 Prometheus + Grafana + PushGateway 的全链路监控体系

  • 云原生运维:基于 Kubernetes 与 StreamPark,实现 Flink 任务的容器化、可视化、自动化管理

次要升级:

  • Flink 版本升级:从 1.15 升级至 1.19

  • 代码优化:自研容错反序列化器、异步 IO 调用、精确一次 Sink 实现

  • 权限治理:引入 Apache Ranger,实现统一权限管控

2.1 原有架构与问题

原架构:

Kafka → Flume → HDFS → Hive (T+1) → 报表
           ↑
    Airflow 定时调度

存在问题:

  • 延迟高:数据从产生到可用需1~24小时,无法支持实时业务

  • 链路长:组件多,故障定位耗时长

  • 资源利用率低:批处理任务集中在凌晨执行,形成"潮汐负载"

  • 扩展性差:突发流量无法快速响应

2.2 技术选型对比

方案

代表产品

优势

劣势

批处理主导

Hive + Airflow

成熟、稳定

延迟高,无法实时

微批处理

Spark Streaming

比批处理快

最小延迟100ms+,非真正流式

流式计算

Flink / Kafka Streams

真正流式,精确一次语义

学习曲线陡峭

流批一体

Flink / Spark

统一API,降低复杂度

对状态管理要求高

Flink 凭借其精确一次语义(Exactly-Once)、低延迟、强大的状态管理能力,已成为实时数仓的事实标准:

  • 阿里巴巴:日均处理超10万亿事件,支撑双十一实时大屏、风控、推荐

  • 腾讯:日均处理PB级数据,支撑广告推荐与实时监控

  • Uber:实现实时计费与司机调度

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// ✅ 启用精确一次语义
env.enableCheckpointing(30_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// ✅ 大状态使用 RocksDB
StateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();
env.setStateBackend(rocksDBBackend);

// ✅ Checkpoint 存储于 S3
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/prod/");

// ✅ 容错配置
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
    Time.of(1, TimeUnit.SECONDS),
    Time.of(10, TimeUnit.MINUTES),
    1.2
));

版本

关键特性

对我们的价值

Flink 1.15

支持批模式下 Window TVF

批处理更灵活,资源利用率更高

Flink 1.16

异步 Barrier 轮询机制

提升高并行作业稳定性

Flink 1.17

StateBackend 内存管理优化

优化内存使用,减少 OOM

Flink 1.18

非阻塞 Barrier 轮询

提升作业稳定性

Flink 1.19

支持流模式 SESSION Window TVF

提升SQL灵活性,配置更现代化

升级效果对比:

指标

Flink 1.15

Flink 1.19

提升

吞吐量(万条/秒)

8.2

11.6

+41%

Checkpoint 成功率

85%

99.8%

+14.8pp

平均延迟(ms)

850

520

-39%

TaskManager 内存占用(GB)

32

26

-19%

2.6 升级价值

  • 数据延迟:从 T+1 → 3~8 秒

  • Hive 任务减少:70%+

  • 支持 CEP:复杂事件处理,如急加速、急刹车模式识别

  • 资源利用率提升:消除"潮汐负载",计算资源持续高效利用

3.1 原有模式问题

通过定时 SQL 拉取 MySQL:

SELECT * FROM vehicle_status WHERE update_time > '...'

存在问题:

  • 延迟高:5~15分钟,无法满足实时需求

  • 一致性差:拉取过程中数据变更无法感知,易漏读或重复

  • 性能影响:大表全表扫描导致数据库负载升高

  • 无法处理删除:物理删除无法捕获

3.2 技术选型对比

方案

说明

优势

劣势

定时 SQL 拉取

自定义脚本

简单易实现

延迟高,一致性差

Canal + Kafka

阿里开源

成熟稳定

需额外维护 Canal 集群

Debezium

Kafka Connect

开源生态好

配置复杂,与 Flink 集成需定制

Flink CDC

Flink 官方 Connector

与 Flink 深度集成,Exactly-Once

对 MySQL 权限要求高

  • 无缝集成:作为 Flink Source,与 Checkpoint 机制深度集成,保障 Exactly-Once

  • 全量+增量一体化:自动完成快照与 binlog 消费的切换

  • 无锁读取:通过 RR 隔离级别 + GTID 实现一致性快照

  • Schema 变更支持:实验性支持 ADD COLUMN 等 DDL

MySqlSource<String> source = MySqlSource.<String>builder()
    .hostname("mysql-prod.internal")
    .port(3306)
    .databaseList("vehicle_db")
    .tableList("vehicle_db.vehicle_status", "vehicle_db.driver_profile")
    .username("flink_reader")
    .password("secure_password")
    // ✅ 从初始快照开始,实现全量+增量
    .startupOptions(StartupOptions.initial())
    // ✅ 并行快照配置
    .splitSize(8096)
    .splitMetaGroupSize(1000)
    // ✅ 服务器时区配置
    .serverTimeZone("UTC")
    .build();

DataStreamSource<String> stream = env.fromSource(
    source, 
    WatermarkStrategy.noWatermarks(), 
    "MySQL CDC Source"
);

3.5 生产环境优化配置

MySQL 用户权限配置:

-- 创建专用同步用户
CREATE USER 'flink_reader'@'%' IDENTIFIED BY 'secure_password';

-- 授予必要权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_reader'@'%';

-- 针对具体库表的权限
GRANT SELECT ON vehicle_db.* TO 'flink_reader'@'%';

FLUSH PRIVILEGES;

MySQL 服务器配置:

# my.cnf
[mysqld]
# 启用 binlog
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL

# GTID 配置(推荐)
gtid-mode=ON
enforce-gtid-consistency=ON

# 服务器时区
default-time-zone='+00:00'

3.6 常见问题及解决方案

问题1:快照阶段耗时过长

// 解决方案:启用并行快照
.splitSize(4096)  // 调小 split 大小
.splitMetaGroupSize(1000)  // 增加并行度

问题2:时区问题导致时间字段错误

// 解决方案:显式设置时区
.serverTimeZone("UTC")
.jdbcProperties(Properties.of("serverTimezone", "UTC"))

问题3:DDL 变更导致任务失败

// 解决方案:升级至 CDC 3.1+,启用 Schema Evolution
.includeSchemaChanges(true)
.option("scan.incremental.snapshot.enabled", "true")

3.7 升级价值

  • 同步延迟:从15分钟 → <2秒

  • 源库负载:CPU使用率下降60%

  • 变更捕获:完整支持 insert/update/delete

  • 运维成本:减少70%的ETL脚本维护

4. 统一分析引擎:Apache Doris 3.0 存算分离架构生产实践

4.1 原有架构痛点

采用 HBase(点查) + Hive(分析) 双引擎:

存在问题:

  • 查询割裂:同一数据需维护两套逻辑

  • 一致性难保障:ETL延迟导致数据不一致

  • 开发成本高:需编写两套SQL

  • 性能瓶颈:Hive查询延迟120s+,HBase并发有限

4.2 选择 Apache Doris 的原因

Doris 是 MPP 架构的实时 OLAP 引擎,具备以下优势:

  • 统一分析:同时支持高并发点查与复杂分析

  • 高性能:列式存储 + 向量化执行 + CBO 优化

  • 易用性:兼容 MySQL 协议,支持标准 SQL

  • 云原生:自3.0版本起,正式支持存算分离架构

行业实践:小米、京东、腾讯音乐、B站等均已采用Doris作为统一OLAP引擎。

4.3 Doris 3.0 存算分离架构

┌─────────────────┐    ┌─────────────────┐
│   Frontend (FE) │    │   Frontend (FE) │
│  SQL Gateway    │    │  SQL Gateway    │
└─────────────────┘    └─────────────────┘
         │                       │
         └───────────┬───────────┘
                     │
┌─────────────────────────────────────────┐
│            Meta Service (MS)             │
│        FoundationDB Cluster             │
└─────────────────────────────────────────┘
                     │
         ┌───────────┼───────────┐
         │           │           │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Backend(BE) │ │ Backend(BE) │ │ Backend(BE) │
│  Compute    │ │  Compute    │ │  Compute    │
└─────────────┘ └─────────────┘ └─────────────┘
                     │
         ┌───────────┼───────────┐
         │           │           │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│  S3 Bucket  │ │  S3 Bucket  │ │  S3 Bucket  │
│   Storage   │ │   Storage   │ │   Storage   │
└─────────────┘ └─────────────┘ └─────────────┘

4.4 核心组件说明

组件

角色

说明

Frontend (FE)

SQL网关

处理SQL请求,查询规划,元数据缓存

Meta Service (MS)

元数据服务

基于FoundationDB的共享元数据服务

Backend (BE)

无状态计算节点

执行查询任务,可弹性扩缩容

S3 存储

数据存储

统一数据存储,支持多副本和冷热分层

4.5 生产环境部署规划

FoundationDB 集群:

# FoundationDB 配置
version: 1
cluster:
  redundancy_mode: triple  # 三副本保证高可用
  storage_engine: ssd-2
  processes:
    - class: storage
      count: 6  # 至少6个存储进程
    - class: log  
      count: 3  # 3个日志进程
    - class: stateless
      count: 3  # 3个无状态进程

Meta Service + FE 部署:

# ms-fe-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: doris-ms-fe
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: meta-service
        image: apache/doris:3.0.1-meta-service
        env:
        - name: FDB_CLUSTER_FILE
          value: /etc/foundationdb/fdb.cluster
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"
      - name: frontend
        image: apache/doris:3.0.1-fe
        env:
        - name: DORIS_ROOT
          value: /opt/apache-doris
        - name: FE_SERVERS
          value: doris-fe-0:9010,doris-fe-1:9010,doris-fe-2:9010
        ports:
        - containerPort: 8030  # HTTP端口
        - containerPort: 9010  # RPC端口
        resources:
          requests:
            memory: "8Gi"
            cpu: "4"
          limits:
            memory: "16Gi"
            cpu: "8"

Backend 弹性计算节点:

# be-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: doris-be
spec:
  replicas: 6  # 可根据负载动态调整
  template:
    spec:
      containers:
      - name: backend
        image: apache/doris:3.0.1-be
        env:
        - name: FE_SERVERS
          value: doris-fe:8030
        - name: BE_ADDR
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        resources:
          requests:
            memory: "16Gi"
            cpu: "8"
          limits:
            memory: "32Gi"
            cpu: "16"
        volumeMounts:
        - name: be-data
          mountPath: /opt/apache-doris/be/storage
      volumes:
      - name: be-data
        emptyDir: {}  # 存算分离架构下,BE无需持久化存储

4.6 存算分离模式表设计

-- 创建存算分离模式的数据表
CREATE TABLE dws.dws_vehicle_summary (
    cds_id VARCHAR(64) COMMENT '车辆ID',
    vin VARCHAR(17) COMMENT 'VIN码',
    battery_level DECIMAL(5,2) COMMENT '电量百分比',
    last_location STRING COMMENT '最后位置(JSON格式)',
    mileage_total BIGINT COMMENT '总里程(米)',
    last_charge_time DATETIME COMMENT '最后充电时间',
    update_time DATETIME COMMENT '数据更新时间'
) 
DUPLICATE KEY(cds_id, update_time)  -- 存算分离下推荐DUPLICATE KEY
DISTRIBUTED BY HASH(cds_id) BUCKETS 32  -- 根据数据量选择合适bucket数
PROPERTIES (
    "storage_medium" = "S3",  -- 使用S3存储
    "storage_cooldown_time" = "2023-12-31 23:59:59",  -- 冷却时间
    "enable_unique_key_merge_on_write" = "false",  -- 关闭MOW提升写入性能
    "compression_type" = "ZSTD",  -- 压缩算法
    "enable_single_replica_compaction" = "true"  -- 单副本压缩
);

-- 创建分区表支持历史数据管理
CREATE TABLE dwd.dwd_vehicle_events (
    event_id VARCHAR(64),
    cds_id VARCHAR(64),
    event_type VARCHAR(32),
    event_data JSON,
    event_time DATETIME
)
DUPLICATE KEY(event_id)
PARTITION BY RANGE(event_time) (
    PARTITION p202312 VALUES LESS THAN ("2024-01-01 00:00:00"),
    PARTITION p202401 VALUES LESS THAN ("2024-02-01 00:00:00")
    -- 自动分区管理
)
DISTRIBUTED BY HASH(cds_id) BUCKETS 32
PROPERTIES (
    "storage_medium" = "S3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "MONTH",
    "dynamic_partition.start" = "-12",  -- 保留12个月历史数据
    "dynamic_partition.end" = "3",      -- 预创建3个月分区
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "32"
);
// Doris Sink 精确一次配置
DorisOptions options = DorisOptions.builder()
    .setFenodes("doris-fe-svc:8030")  // K8s Service
    .setTableIdentifier("dws.dws_vehicle_summary")
    .setUsername("root")
    .setPassword("")
    .build();

DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
    .setLabelPrefix("flink_job_" + System.currentTimeMillis())
    .setEnable2PC(true)  // 启用两阶段提交保证精确一次
    .setBufferSize(8192)
    .setBufferCount(3)
    .setMaxRetries(3)
    .setStreamLoadProp(Properties.of(
        "format", "json",
        "read_json_by_line", "true",
        "load_to_single_tablet", "false"
    ))
    .build();

DorisReadOptions readOptions = DorisReadOptions.builder().build();

vehicleDataStream
    .map(new VehicleDataSerializer())  // 序列化为JSON
    .addSink(DorisSink.sink(readOptions, executionOptions, options));

4.8 性能调优实践

写入性能优化:

-- 调整Stream Load参数
SET enable_insert_strict = false;
SET enable_http_server_v2 = true;
SET max_filter_ratio = 0.1;
SET load_mem_limit = 4294967296;  -- 4GB

查询性能优化:

-- 创建Rollup表加速聚合查询
ALTER TABLE dws.dws_vehicle_summary 
ADD ROLLUP vehicle_daily_summary (
    cds_id, 
    battery_level, 
    mileage_total,
    DATE(update_time)
);

-- 创建物化视图
CREATE MATERIALIZED VIEW mv_vehicle_battery_trend AS
SELECT 
    cds_id,
    DATE(update_time) as stat_date,
    AVG(battery_level) as avg_battery_level,
    MIN(battery_level) as min_battery_level,
    MAX(battery_level) as max_battery_level
FROM dws.dws_vehicle_summary
GROUP BY cds_id, DATE(update_time);

4.9 升级价值

  • 查询延迟:从Hive的120s → Doris的1.5s

  • 并发能力:支持1000+并发查询

  • 弹性计算:BE节点可独立扩缩,应对查询高峰

  • 成本优化:存储成本降低40%+(S3 + 压缩)

  • 统一出口:"一数一源",数据一致性显著提升

5. 云原生基础设施:Kubernetes + StreamPark 实现高效运维

5.1 原有模式问题

Flink作业通过Shell脚本提交至YARN:

存在问题:

  • 无版本管理:作业升级回滚困难

  • 无可视化界面:需要命令行操作

  • 资源隔离差:多任务共享YARN资源池

  • 运维复杂:手动管理配置文件

5.2 选择 Kubernetes 的原因

K8s已成云原生基础设施标准:

  • 弹性调度:HPA自动伸缩

  • 资源隔离:Namespace天然隔离

  • CI/CD友好:Helm/ArgoCD集成

  • 统一编排:声明式管理

┌─────────────────────┐    ┌──────────────────┐
│   StreamPark        │    │  Flink Operator  │
│   Management UI     │    │   K8s Operator   │
└─────────────────────┘    └──────────────────┘
           │                          │
           └──────────┬─────────────────┘
                     │
         ┌───────────────────────────┐
         │    Kubernetes Cluster     │
         │                           │
         │  ┌─────┐ ┌─────┐ ┌─────┐  │
         │  │ JM  │ │ TM  │ │ TM  │  │
         │  │Pod  │ │Pod  │ │Pod  │  │
         │  └─────┘ └─────┘ └─────┘  │
         │                           │
         └───────────────────────────┘
# flink-operator.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: vehicle-realtime-etl
  namespace: flink-jobs
spec:
  image: flink:1.19.1-scala_2.12-java11
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: rocksdb
    state.checkpoints.dir: s3://flink-checkpoints/vehicle-etl/
    state.savepoints.dir: s3://flink-savepoints/vehicle-etl/
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://flink-ha/vehicle-etl/
    restart-strategy: exponential-delay
    restart-strategy.exponential-delay.initial-backoff: 1s
    restart-strategy.exponential-delay.max-backoff: 10min
    restart-strategy.exponential-delay.backoff-multiplier: 1.2
    execution.checkpointing.mode: EXACTLY_ONCE
    execution.checkpointing.interval: 30s
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "4096m"
      cpu: 2
    replicas: 3
  job:
    jarURI: s3://flink-jars/vehicle-realtime-etl-1.0.0.jar
    parallelism: 12
    upgradeMode: savepoint
    allowNonRestoredState: false
    args:
      - --kafka.bootstrap.servers
      - kafka-cluster:9092
      - --doris.fenodes
      - doris-fe-svc:8030

5.5 StreamPark 集成配置

StreamPark 部署:

# streampark-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: streampark-console
spec:
  replicas: 2
  template:
    spec:
      containers:


Comment