Flink Reads
Fluss supports streaming and batch read with Apache Flink's SQL & Table API. Execute the following SQL command to switch execution mode from streaming to batch, and vice versa:
-- Execute the Flink job in streaming mode for current session context
SET 'execution.runtime-mode' = 'streaming';
-- Execute the Flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
Streaming Read
By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the latest changes.
Fluss by default ensures that your startup is properly processed with all data included.
Fluss Source in streaming mode is unbounded, like a queue that never ends.
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM my_table ;
You can also do streaming read without reading the snapshot data, you can use latest scan mode, which only reads the changelogs (or logs) from the latest offset:
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;
Column Pruning
Column pruning minimizes I/O by reading only the columns used in a query and ignoring unused ones at the storage layer. In Fluss, column pruning is implemented using Apache Arrow as the default log format to optimize streaming reads from Log Tables and change logs of PrimaryKey Tables. Benchmark results show that column pruning can reach 10x read performance improvement, and reduce unnecessary network traffic (reduce 80% I/O if 80% columns are not used).
- Column pruning is only available when the table uses the Arrow log format (
'table.log.format' = 'arrow'), which is enabled by default. - Reading log data from remote storage currently does not support column pruning.
Example
1. Create a table
CREATE TABLE `log_table` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL
);
2. Query a single column:
SELECT `c_name` FROM `log_table`;
3. Verify with EXPLAIN:
EXPLAIN SELECT `c_name` FROM `log_table`;
Output:
== Optimized Execution Plan ==
TableSourceScan(table=[[fluss_catalog, fluss, log_table, project=[c_name]]], fields=[c_name])
This confirms that only the c_name column is being read from storage.
Partition Pruning
Partition pruning is an optimization technique for Fluss partitioned tables. It reduces the number of partitions scanned during a query by filtering based on partition keys. This optimization is especially useful in streaming scenarios for Multi-Field Partitioned Tables that has many partitions. The partition pruning also supports dynamically pruning new created partitions during streaming read.
The supported filter operators for partition pruning on the partition fields are:
=><>=<=<>IN (...)NOT IN (...)IS NULLIS NOT NULLIS TRUEIS FALSELIKE 'abc%'for prefix matchingLIKE '%abc'for suffix matchingLIKE '%abc%'for substring matching- OR conjunctions of filter conditions
- AND conjunctions of filter conditions
Example
1. Create a partitioned table:
CREATE TABLE `log_partitioned_table` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` STRING NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL
) PARTITIONED BY (`c_nationkey`,`dt`);
2. Query with partition filter:
SELECT * FROM `log_partitioned_table` WHERE `c_nationkey` = 'US';
Fluss source will scan only the partitions where c_nationkey = 'US'.
For example, if the following partitions exist:
US,2025-06-13China,2025-06-13US,2025-06-14China,2025-06-14
Only US,2025-06-13 and US,2025-06-14 will be read.
As new partitions like US,2025-06-15, China,2025-06-15 are created, partition US,2025-06-15 will be automatically included in the stream, while China,2025-06-15 will be dynamically filtered out based on the partition pruning condition.
3. Verify with EXPLAIN:
EXPLAIN SELECT * FROM `log_partitioned_table` WHERE `c_nationkey` = 'US';
Output:
== Optimized Execution Plan ==
TableSourceScan(table=[[fluss_catalog, fluss, log_partitioned_table, filter=[=(c_nationkey, _UTF-16LE'US':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]]], fields=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, dt])
This confirms that only partitions matching c_nationkey = 'US' will be scanned.
Filter Pushdown
Filter pushdown is a server-side optimization for Log Tables (non-primary-key tables). When enabled, the server evaluates filter predicates against per-batch column statistics (min, max, null count) and skips entire record batches that cannot contain matching rows. This reduces network I/O and deserialization cost without changing query semantics — Flink still applies the filter on the client side as a safety net.
- Filter pushdown requires the Arrow log format (
'table.log.format' = 'arrow'), which is enabled by default. - Column statistics must be explicitly enabled via the
table.statistics.columnstable property. Without this configuration, no filters will be pushed down. - Only data written after enabling statistics will contain batch-level statistics. Historical data will not benefit from filter pushdown.
Enabling Column Statistics
Set the table.statistics.columns property when creating or altering a table. It is recommended to specify only the columns used in your filter conditions to minimize overhead:
-- Recommended: collect statistics only for columns used in filter conditions
CREATE TABLE sensor_data (
sensor_id INT NOT NULL,
temperature DOUBLE NOT NULL,
humidity DOUBLE NOT NULL,
location STRING NOT NULL,
ts TIMESTAMP NOT NULL
) WITH (
'table.statistics.columns' = 'temperature,humidity,location'
);
-- Or use '*' to collect statistics for all supported columns (higher overhead)
ALTER TABLE sensor_data SET ('table.statistics.columns' = '*');
Statistics collection supports the following types: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, STRING, CHAR, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ. Unsupported types (BYTES, BINARY, ARRAY, MAP, ROW) are automatically excluded.
Supported Filter Operators
The following filter operators can be pushed down to the server:
=,<>,>,>=,<,<=IN (...)IS NULL,IS NOT NULLBETWEEN ... AND ...LIKE 'abc%'(prefix),LIKE '%abc'(suffix),LIKE '%abc%'(contains)AND/ORconjunctions
All columns referenced in a filter expression must have statistics enabled. If any referenced column lacks statistics, that filter will not be pushed down.
Example
1. Create a table with statistics enabled:
CREATE TABLE sensor_data (
sensor_id INT NOT NULL,
temperature DOUBLE NOT NULL,
humidity DOUBLE NOT NULL,
location STRING NOT NULL,
ts TIMESTAMP NOT NULL
) WITH (
'table.statistics.columns' = 'temperature,location'
);
2. Query with filter:
SELECT * FROM sensor_data WHERE temperature > 30.0 AND location = 'warehouse-A';
3. Verify with EXPLAIN:
EXPLAIN SELECT * FROM sensor_data WHERE temperature > 30.0 AND location = 'warehouse-A';
If filter pushdown is active, the TableSourceScan node in the execution plan will contain a filter=[...] clause showing the pushed-down predicates. For example:
TableSourceScan(table=[[..., sensor_data, filter=[and(>(temperature, 30.0:DOUBLE), =(location, ...))]]], fields=[...])
The server evaluates these predicates against per-batch column statistics and skips entire record batches that cannot contain matching rows. Note that the filter also appears in a Calc node above the source — this is expected because Flink retains all filters for client-side verification as a safety net.
Batch Read
Limit Read
The Fluss source supports limiting reads for both primary-key tables and log tables, making it convenient to preview the latest N records in a table.
Example
- Create a table and prepare data
CREATE TABLE log_table (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL
);
INSERT INTO log_table
VALUES (1, 'Customer1', 'IVhzIApeRb ot,c,E', 15, '25-989-741-2988', 711.56, 'BUILDING', 'comment1'),
(2, 'Customer2', 'XSTf4,NCwDVaWNe6tEgvwfmRchLXak', 13, '23-768-687-3665', 121.65, 'AUTOMOBILE', 'comment2'),
(3, 'Customer3', 'MG9kdTD2WBHm', 1, '11-719-748-3364', 7498.12, 'AUTOMOBILE', 'comment3');
- Query from table.
-- Execute the flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT * FROM log_table LIMIT 10;
Point Query
The Fluss source supports point queries for primary-key tables, allowing you to inspect specific records efficiently. Currently, this functionality is exclusive to primary-key tables.
Example
- Create a table and prepare data
CREATE TABLE pk_table (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (c_custkey) NOT ENFORCED
);
INSERT INTO pk_table
VALUES (1, 'Customer1', 'IVhzIApeRb ot,c,E', 15, '25-989-741-2988', 711.56, 'BUILDING', 'comment1'),
(2, 'Customer2', 'XSTf4,NCwDVaWNe6tEgvwfmRchLXak', 13, '23-768-687-3665', 121.65, 'AUTOMOBILE', 'comment2'),
(3, 'Customer3', 'MG9kdTD2WBHm', 1, '11-719-748-3364', 7498.12, 'AUTOMOBILE', 'comment3');
- Query from table.
-- Execute the flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT * FROM pk_table WHERE c_custkey = 1;
Aggregations
The Fluss source supports pushdown COUNT(*) aggregation in batch mode for both Log Tables and Primary Key Tables. This feature enables efficient row counting without scanning all records.
Example for Log Table
-- Execute the flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT COUNT(*) FROM log_table;
Example for Primary Key Table
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT COUNT(*) FROM pk_table;
COUNT(*) pushdown for Primary Key Tables requires the table to use the default changelog mode ('table.changelog.image' = 'FULL'). Tables configured with 'table.changelog.image' = 'WAL' do not support this feature.
Read Options
Start Reading Position
The config option scan.startup.mode enables you to specify the starting point for data consumption. Fluss currently supports the following scan.startup.mode options:
full(default): For primary key tables, it first consumes the full data set and then consumes incremental data. For log tables, it starts consuming from the earliest offset.earliest: For primary key tables, it starts consuming from the earliest changelog offset; for log tables, it starts consuming from the earliest log offset.latest: For primary key tables, it starts consuming from the latest changelog offset; for log tables, it starts consuming from the latest log offset.timestamp: For primary key tables, it starts consuming the changelog from a specified time (defined by the configuration itemscan.startup.timestamp); for log tables, it starts consuming from the offset corresponding to the specified time.
You can dynamically apply the scan parameters via SQL hints. For instance, the following SQL statement temporarily sets the scan.startup.mode to latest when consuming the log_table table.
SELECT * FROM log_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;
Also, the following SQL statement temporarily sets the scan.startup.mode to timestamp when consuming the log_table table.
-- timestamp mode with microseconds.
SELECT * FROM log_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1678883047356') */;
-- timestamp mode with a time string format
SELECT * FROM log_table
/*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '2023-12-09 23:09:12') */;