The Delta Join
Beginning with Apache Flink 2.1, a new operator called Delta Join was introduced. Compared to traditional streaming joins, the delta join operator significantly reduces the amount of state that needs to be maintained during execution. This improvement helps mitigate several common issues associated with large state sizes, including:
- Excessive computing resource and storage consumption
- Long checkpointing durations
- Extended recovery times after failures or restarts
- Heavy state bootstrap overhead after changing job logic
Starting with Apache Fluss 0.8, streaming join jobs running on Flink 2.1 or later will be automatically optimized into delta joins whenever applicable. This optimization happens transparently at query planning time, requiring no manual configuration.
How Delta Join Works
Traditional streaming joins in Flink require maintaining both input sides entirely in state to match records across streams. Delta join, by contrast, uses a index-key lookup mechanism to transform the behavior of querying data from the state into querying data from the Fluss source table, thereby avoiding redundant storage of the same data in both the Fluss source table and the state. This drastically reduces state size and improves performance for many streaming analytics and enrichment workloads.

Example: Delta Join in Flink 2.1
Below is an example demonstrating a delta join query supported by Flink 2.1.
Create Source and Sink Tables
USE CATALOG fluss_catalog;
CREATE DATABASE my_db;
USE my_db;
Create Left Source Table
CREATE TABLE `fluss_catalog`.`my_db`.`left_src` (
`city_id` INT NOT NULL,
`order_id` INT NOT NULL,
`content` VARCHAR NOT NULL,
PRIMARY KEY (city_id, order_id) NOT ENFORCED
) WITH (
-- prefix key
'bucket.key' = 'city_id',
-- in Flink 2.1, delta join only support append-only source
'table.merge-engine' = 'first_row'
);
Create Right Source Table
CREATE TABLE `fluss_catalog`.`my_db`.`right_src` (
`city_id` INT NOT NULL,
`city_name` VARCHAR NOT NULL,
PRIMARY KEY (city_id) NOT ENFORCED
) WITH (
-- in Flink 2.1, delta join only support append-only source
'table.merge-engine' = 'first_row'
);
Create Sink Table
CREATE TABLE `fluss_catalog`.`my_db`.`snk` (
`city_id` INT NOT NULL,
`order_id` INT NOT NULL,
`content` VARCHAR NOT NULL,
`city_name` VARCHAR NOT NULL,
PRIMARY KEY (city_id, order_id) NOT ENFORCED
);