博客内容Blog Content
Flink Fluss的概念和测试 Concepts and Testing of Flink Fluss
Flink Fluss是一个正在孵化中的专为实时分析打造的流式存储系统,本文介绍其概念并进行简单使用和测试 Flink Fluss is an incubating streaming storage system purpose-built for real-time analytics. This article introduces its core concepts and demonstrates a basic usage and testing scenario.
背景 Background
使用kafka做消息队列的不足:
更新:需要使用upsert模式
获取&查询数据困难:需要用流拉取进行全量数据才能分析
行存储&查询成本高:无法单独只读部分列分析,需要读取整行
总结:kafka的目的是流存储,没有为流分析单独优化
Limitations of using Kafka for message queue:
Updates: Requires using the upsert mode
Data retrieval & querying is difficult: Full data must be pulled via streams for analysis
High cost for row-based storage & queries: Cannot read only partial columns for analysis; entire rows must be read
Summary: Kafka is designed for stream storage, not specifically optimized for stream analytics
而Fluss 的价值在于:
更高效地支持流实时分析:列裁剪查询、kv查询、delta join(双流驱动的维表join)
统一了流和批管理:有更强的数据一致性保证(有统一的底层格式和自动转换服务)
节省成本、提升开发体验:
网络节省
不再需要 Kafka + Flink + Hudi + Druid 这一堆系统;相当于kafka+mysql
上层只需基于 Flink SQL 查询,底层数据变化对用户无感
The value of Fluss lies in:
More efficient support for real-time stream analytics:
Column pruning queries
KV queries
Delta join (dimension table join powered by dual streams)
Unified stream and batch management:
Stronger data consistency guarantees
Unified underlying format and automatic conversion services
Cost saving and improved developer experience:
Network savings
No longer need Kafka + Flink + Hudi + Druid; equivalent to Kafka + MySQL
Upper-layer only needs to query using Flink SQL, underlying data changes are transparent to the user
架构 Architecture
Cluster:由 Coordinator Server 和 Tablet Server 组成。
Coordinator Server:作为集群的中心控制节点,负责元数据管理、Leader 分配和权限管理。
Tablet Server:数据存储节点,包含 Log Store 和 KV Store。
KV Store:支持更新和点查操作。
Log Store:存储更新产生的 Change Logs。
Fluss Client:客户端组件,用于与 Fluss 集群交互。
ZK(ZooKeeper):用于集群协调和元数据管理。
Remote Storage:远程存储,用于冷数据的归档。
Cluster: Composed of a Coordinator Server and Tablet Server.
Coordinator Server: Acts as the central control node of the cluster. Responsible for metadata management, leader assignment, and access control.
Tablet Server: Data storage node, containing Log Store and KV Store.
KV Store: Supports update and point lookup operations.
Log Store: Stores change logs generated by updates.
Fluss Client: Client component used to interact with the Fluss cluster.
ZK (ZooKeeper): Used for cluster coordination and metadata management.
Remote Storage: Remote archive storage for cold data.
核心功能及其实现原理 Core Features and Implementation Principles
(1)流式列存储 Stream-based Columnar Storage
Fluss 采用流式列存储格式,重点优化了列裁剪能力。具体实现如下:
列裁剪:Fluss 采用 Apache Arrow 的 IPC Streaming Format 协议进行文件存储,基于 Arrow 实现高效的列裁剪性能,保持流式读写的毫秒级延迟,节省大量网络成本。读取时支持端到端的 Zero-copy。
列压缩:从 V0.6 版本开始,Fluss 支持列压缩,压缩比可达 7 倍,支持 ZSTD 和 LZ4 压缩算法。
Fluss adopts a stream-based columnar storage format, with a focus on optimizing column pruning. The implementation details are as follows:
Column Pruning: Fluss uses the Apache Arrow IPC Streaming Format for file storage. Based on Arrow, it achieves highly efficient column pruning while maintaining millisecond-level latency for stream read/write operations, significantly reducing network cost. Supports end-to-end zero-copy during reads.
Column Compression: Starting from version V0.6, Fluss supports column compression with a compression ratio of up to 7x, supporting ZSTD and LZ4 compression algorithms.
(2)实时更新与 CDC Real-time Updates and CDC
为了支持高效的实时更新,Fluss 结合了 Log Tablet 和 KV Tablet,具体机制如下:
底层结构:以 Log Tablet 为基础,在其上构建 KV 索引,支持大规模实时更新。KV 更新会生成 Changelogs,写入 Log Tablet。
故障恢复:在故障恢复时,Log Tablet 数据用于恢复 KV Tablet。
流读 Changelog 无需去重:KV 表生成的 Changelog 可直接流式读取,无需额外去重,节省计算资源并促进数据复用。
To support efficient real-time updates, Fluss combines Log Tablet and KV Tablet. The detailed mechanism is as follows:
Underlying structure:
Based on the Log Tablet, Fluss builds a KV index on top of it to support large-scale real-time updates. KV updates generate changelogs, which are written into the Log Tablet.Failure recovery:
During failure recovery, data in the Log Tablet is used to restore the KV Tablet.No deduplication needed for streaming changelog reads:
Changelogs generated by the KV table can be streamed directly without additional deduplication, saving compute resources and enhancing data reusability.
(3)Delta Join
与传统 Join 方式将所有数据缓存在 Flink 状态后端不同,Delta Join 转而依赖外部存储系统(例如基于 RocksDB 构建的 Apache Fluss),将数据存于外部,实现真正的无状态计算。
Unlike the traditional join approach, which caches all data in Flink's state backend, Delta Join relies on external storage systems (such as Apache Fluss, built on RocksDB) to store data externally, thereby enabling true stateless computation.
其工作原理如下:
How It Works:
Fluss 会持续发送变更日志(changelog)更新,确保 Join 数据始终最新。每当有新事件到达时,Delta Join 只需在 Fluss 中执行一次索引查询——就像访问一个键值存储(key-value store)一样简单高效。
Fluss continuously sends changelog updates to ensure that the join data remains up to date. When a new event arrives, Delta Join performs just a single index lookup in Fluss—making the operation as simple and efficient as accessing a key-value store.
可以看到,现在的 Delta Join 已经完全无状态。此前困扰流处理的各种“大状态”问题也随之消失。这使得过去难以实现的大规模 Join 任务,如今成为可能。Fluss 结合 Flink 推出了 Delta Join 算子,利用 Fluss 的 CDC 流读和索引点查能力,实现双边驱动的维表 Join。
As a result, Delta Join is now fully stateless. The long-standing challenge of managing "large state" in stream processing is effectively resolved. This breakthrough makes it possible to perform large-scale join operations that were previously difficult or even unfeasible. Powered by Fluss, Flink introduces the Delta Join operator, which leverages Fluss’s CDC stream reading and point-index lookup capabilities to implement a dual-driven dimension table join.
实现逻辑:左右流均为 Fluss 表,当 Log 到达时,根据 Join Key 进行点查。如果 Local Cache 未命中,则通过 Flink 的 Async Lookup 算子查询 Fluss。
性能优化:Delta Join 可以显著减少 Flink 的状态大小,降低资源消耗。以淘宝成交作业为例,该作业原本需要消耗 50TB 的状态,迁移到 Delta Join 后,不仅减免了大状态带来的成本,还提高了作业的稳定性和效率。Flink 的资源从 2300 CU 降低到 200 CU。
回溯优化:利用归档到 Paimon 表的冷数据和 Flink Batch join 进行高效的历史数据回溯,大幅缩短处理时间。例如,将一天的数据回溯时间从以前的 4 小时降低到 0.5 小时。
灵活性提升:Delta Join 将状态与作业解耦,使用户可以更灵活地修改作业逻辑而不需重新计算状态。且支持可探查和可分析的数据,提升业务灵活性和开发效率。
Flink Delta Join 算子: Flink 侧引入 Delta Join 算子,支持流式 Join 操作。 算子内部通过 Async Lookup 算子异步查询 Fluss,确保低延迟和高吞吐量。
Implementation logic:
Both left and right streams are Fluss tables. When logs arrive, a point lookup is performed based on the join key. If the local cache misses, Flink’s Async Lookup operator is used to query Fluss.Performance optimization:
Delta Join significantly reduces Flink state size and resource usage.
For example, in a Taobao transaction job, the original setup required 50TB of state. After switching to Delta Join, not only were large state costs eliminated, but job stability and efficiency also improved. Flink resource usage dropped from 2300 CU to 200 CU.Backfill optimization:
By using cold data archived into Paimon tables and Flink Batch Join, historical data backfill becomes much more efficient — reducing one day’s backfill time from 4 hours to 0.5 hours.Flexibility improvement:
Delta Join decouples state from the job, allowing users to modify job logic more flexibly without recalculating state. It also supports queryable and analyzable data, enhancing business agility and developer productivity.Flink Delta Join operator:
On the Flink side, the Delta Join operator supports streaming joins. Internally, it uses the Async Lookup operator to asynchronously query Fluss, ensuring low latency and high throughput.
(4)湖流一体 Unified Lake and Stream Architecture
为了解决传统架构中湖和流割裂的问题,Fluss 推出了湖流一体特性,旨在统一管理湖存储和流存储的数据。 在底层,Fluss 会维护一个 Compaction Service,自动将 Fluss 数据转为湖格式数据,并且保证两边元数据的一致,在读的时候可以通过 Fluss 直接完成对 lakehouse 上数据的操作。因此,Fluss 湖流一体简化了整体架构,降低了成本,并提高了数据的一致性和实时性。
To address the separation between lake and stream in traditional architectures, Fluss introduces the integrated lake-stream feature, aiming to unify the management of data in both lake storage and stream storage. At the underlying level, Fluss maintains a Compaction Service that automatically converts Fluss data into lake-format data and ensures metadata consistency between the two. During data reads, Fluss can directly operate on data in the lakehouse. As a result, Fluss’s lake-stream integration simplifies the overall architecture, reduces costs, and improves data consistency and real-time performance.
在 Fluss 的架构中,虽然使用 Lakehouse storage(如 Lakehouse)作为冷数据管理层是一种优化策略,但它并非强制要求。Fluss 集群本身具备 remote storage 功能,可以作为冷数据的存储位置。Remote storage 不仅充当了冷数据的存储角色,而且其存储成本相对较低,比存储在 Tablet Server 上更加经济。
In Fluss’s architecture, while using Lakehouse storage (such as Lakehouse) as the cold data management layer is an optimization strategy, it is not mandatory. The Fluss cluster itself supports remote storage, which can serve as the storage location for cold data. Remote storage not only plays the role of storing cold data, but also offers lower storage costs, making it more economical than storing data on the Tablet Server.
实战Fluss使用 Hands-on with Fluss
启动demo,并进入http://localhost:8083/#/overview查看flink是否启动成功
Start the demo and go to http://localhost:8083/#/overview to check whether Flink has started successfully.
进入flink sql client
go to flink sql client
docker compose exec jobmanager ./sql-client
查看默认已经建好的表,模拟数据的来源
SHOW CREATE TABLE source_customer; SHOW CREATE TABLE source_order; SHOW CREATE TABLE source_nation;
创建Fluss上下文
Create the Fluss context
CREATE CATALOG fluss_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123' ); USE CATALOG fluss_catalog;
创建Fluss表
Create Fluss tables
CREATE TABLE fluss_order ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `ptime` AS PROCTIME(), PRIMARY KEY (`order_key`) NOT ENFORCED ); CREATE TABLE fluss_customer ( `cust_key` INT NOT NULL, `name` STRING, `phone` STRING, `nation_key` INT NOT NULL, `acctbal` DECIMAL(15, 2), `mktsegment` STRING, PRIMARY KEY (`cust_key`) NOT ENFORCED ); CREATE TABLE fluss_nation ( `nation_key` INT NOT NULL, `name` STRING, PRIMARY KEY (`nation_key`) NOT ENFORCED ); CREATE TABLE enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `cust_name` STRING, `cust_phone` STRING, `cust_acctbal` DECIMAL(15, 2), `cust_mktsegment` STRING, `nation_name` STRING, PRIMARY KEY (`order_key`) NOT ENFORCED );
将模拟数据源数据同步到Fluss
Synchronize simulated data source to Fluss
EXECUTE STATEMENT SET BEGIN INSERT INTO fluss_nation SELECT * FROM `default_catalog`.`default_database`.source_nation; INSERT INTO fluss_customer SELECT * FROM `default_catalog`.`default_database`.source_customer; INSERT INTO fluss_order SELECT * FROM `default_catalog`.`default_database`.source_order; END;
将模拟数据源进行打宽,然后写入Fluss
Flatten the simulated data source and write it into Fluss
INSERT INTO enriched_orders SELECT o.order_key, o.cust_key, o.total_price, o.order_date, o.order_priority, o.clerk, c.name, c.phone, c.acctbal, c.mktsegment, n.name FROM fluss_order o LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON o.cust_key = c.cust_key LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` ON c.nation_key = n.nation_key;
任务提交后,Fluss便开始进行同步了,我们可以直接用批模式查询里面的实时数据
Once the task is submitted, Fluss begins synchronization immediately, and we can directly query the real-time data in batch mode
-- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; -- switch to batch mode SET 'execution.runtime-mode' = 'batch'; -- use limit to query the enriched_orders table SELECT * FROM enriched_orders LIMIT 2; -- lookup by primary key SELECT * FROM fluss_customer WHERE `cust_key` = 1;
也可以直接修改或删除里面的数据
You can also directly modify or delete the data inside.
-- update by primary key UPDATE fluss_customer SET `name` = 'fluss_updated' WHERE `cust_key` = 1; SELECT * FROM fluss_customer WHERE `cust_key` = 1; DELETE FROM fluss_customer WHERE `cust_key` = 1; SELECT * FROM fluss_customer WHERE `cust_key` = 1;
使用Paimon配合Fluss Using Paimon with Fluss
首先需要开启Lakehouse Tiering Service这个服务,新开一个窗口执行
First, you need to start the Lakehouse Tiering Service. Open a new window to run it.
docker compose exec jobmanager \ /opt/flink/bin/flink run \ /opt/flink/opt/fluss-flink-tiering-0.7.0.jar \ --fluss.bootstrap.servers coordinator-server:9123 \ --datalake.format paimon \ --datalake.paimon.metastore filesystem \ --datalake.paimon.warehouse /tmp/paimon
由于默认Fluss数据不写到datalake里面,所以要开启table.datalake.enabled = true重新将Fluss数据写到datalake
Since Fluss data is not written to the data lake by default, you need to enable it by setting table.datalake.enabled = true to write Fluss data back to the data lake.
CREATE TABLE datalake_enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, `total_price` DECIMAL(15, 2), `order_date` DATE, `order_priority` STRING, `clerk` STRING, `cust_name` STRING, `cust_phone` STRING, `cust_acctbal` DECIMAL(15, 2), `cust_mktsegment` STRING, `nation_name` STRING, PRIMARY KEY (`order_key`) NOT ENFORCED ) WITH ( 'table.datalake.enabled' = 'true', 'table.datalake.freshness' = '30s' ); -- switch to streaming mode SET 'execution.runtime-mode' = 'streaming'; -- insert tuples into datalake_enriched_orders INSERT INTO datalake_enriched_orders SELECT o.order_key, o.cust_key, o.total_price, o.order_date, o.order_priority, o.clerk, c.name, c.phone, c.acctbal, c.mktsegment, n.name FROM fluss_order o LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON o.cust_key = c.cust_key LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` ON c.nation_key = n.nation_key;
数据被写入Fluss之后,可以从Fluss查全量实时数据,也可以从Paimon查历史数据
The data for the datalake_enriched_orders table is stored in Fluss (for real-time data) and Paimon (for historical data).
使用批查湖里的数据
-- switch to batch mode SET 'execution.runtime-mode' = 'batch'; -- query snapshots in paimon SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapshots; -- to sum prices of all orders in paimon SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;
查全量实时数据
Use batch queries to read data from the lake.
-- to sum prices of all orders in fluss and paimon SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;
同时可以查看Paimon的文件
we can also view Paimon’s files.
docker compose exec taskmanager tree /tmp/paimon/fluss.db
清理数据
Clean up the data.
docker compose down -v
参考文献 References
https://zhuanlan.zhihu.com/p/32862491988
Fluss:新一代流存储核心技术解析
Fluss: An In-Depth Analysis of Next-Generation Stream Storage Core Technologies
https://mp.weixin.qq.com/s/3BoE3IhHjGBVi6tqvYhHGg
淘天AB实验分析平台Fluss落地实践:更适合实时OLAP的消息队列
Practical Implementation of Fluss in Taotian’s A/B Testing Analytics Platform: A Message Queue More Suitable for Real-Time OLAP
https://zhuanlan.zhihu.com/p/11971775236
Fluss:面向实时分析设计的下一代流存储
Fluss: A Next-Generation Stream Storage Designed for Real-Time Analytics