Skip to main content
Version: Next

Iceberg

Introduction

Apache Iceberg is an open table format for huge analytic datasets. It provides ACID transactions, schema evolution, and efficient data organization for data lakes. To integrate Fluss with Iceberg, you must enable lakehouse storage and configure Iceberg as the lakehouse storage. For more details, see Enable Lakehouse Storage.

NOTE: Iceberg requires JDK11 or later. Please ensure that both your Fluss deployment and the Flink cluster used for tiering services are running on JDK11+.

Configure Iceberg as LakeHouse Storage

Configure Iceberg in Cluster Configurations

To configure Iceberg as the lakehouse storage, you must configure the following configurations in server.yaml:

# Iceberg configuration
datalake.format: iceberg

# the catalog config about Iceberg, assuming using Hadoop catalog,
datalake.iceberg.type: hadoop
datalake.iceberg.warehouse: /tmp/iceberg

Fluss processes Iceberg configurations by stripping the datalake.iceberg. prefix and uses the stripped configurations (without the prefix datalake.iceberg.) to initialize the Iceberg catalog. This approach enables passing custom configurations for iceberg catalog initiation. Checkout the Iceberg Catalog Properties for more details on the available configurations of catalog.

Fluss supports all Iceberg-compatible catalog types. For catalogs such as hive, hadoop, rest, glue, nessie, and jdbc, you can specify them using the configuration datalake.iceberg.type with the corresponding value (e.g., hive, hadoop, etc.). For other types of catalogs, you can use datalake.iceberg.catalog-impl: <your_iceberg_catalog_impl_class_name> to specify the catalog implementation. For example, configure with datalake.iceberg.catalog-impl: org.apache.iceberg.snowflake.SnowflakeCatalog to use Snowflake catalog.

NOTE:
1: Some catalog requires Hadoop related classes such as hadoop, hive catalog. Make sure hadoop related classes are in your classpath. You can either download from pre-bundled Hadoop jar or hadoop.tar.gz which required to be unzipped. Then put hadoop related jars into FLUSS_HOME/plugins/iceberg.
2: Fluss only bundles the catalog implementation included in iceberg-core module. For any other catalog implementations not bundled within iceberg-core module (e.g., Hive Catalog), you must place the corresponding JAR file into the into FLUSS_HOME/plugins/iceberg.
3: The version if Iceberg that Fluss bundles is based on 1.9.2, please make sure the jars you put is compatible with Iceberg-1.9.2

Start Tiering Service to Iceberg

Then, you must start the datalake tiering service to tier Fluss's data to Iceberg. For guidance, you can refer to Start The Datalake Tiering Service . Although the example uses Paimon, the process is also applicable to Iceberg.

However, for the Prepare required jars step, adhere to the dependency management guidelines listed below:

Additionally, when following the Start Datalake Tiering Service guide, make sure to use Iceberg-specific configurations as parameters when starting the Flink tiering job:

<FLINK_HOME>/bin/flink run /path/to/fluss-flink-tiering-0.8-SNAPSHOT.jar \
--fluss.bootstrap.servers localhost:9123 \
--datalake.format iceberg \
--datalake.iceberg.type hadoop \
--datalake.iceberg.warehouse /tmp/iceberg

Table Mapping Between Fluss and Iceberg

When a Fluss table is created or altered with the option 'table.datalake.enabled' = 'true' and configured with Iceberg as the datalake format, Fluss will automatically create a corresponding Iceberg table with the same table path.

The schema of the Iceberg table matches that of the Fluss table, except for the addition of three system columns at the end: __bucket, __offset, and __timestamp.
These system columns help Fluss clients consume data from Iceberg in a streaming fashion, such as seeking by a specific bucket using an offset or timestamp.

Here is an example using Flink SQL to create a table with data lake enabled:

Flink SQL
USE CATALOG fluss_catalog;

CREATE TABLE fluss_order_with_lake (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME(),
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH (
'table.datalake.enabled' = 'true',
'table.datalake.freshness' = '30s'
);

You can also specify Iceberg table properties when creating a datalake-enabled Fluss table by using the iceberg. prefix within the Fluss table properties clause. Here is an example to change iceberg format to orc and set commit.retry.num-retries to 5:

Flink SQL
CREATE TABLE fluss_order_with_lake (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME(),
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH (
'table.datalake.enabled' = 'true',
'table.datalake.freshness' = '30s',
'table.datalake.auto-maintenance' = 'true',
'iceberg.write.format.default' = 'orc',
'iceberg.commit.retry.num-retries' = '5'
);

Primary Key Tables

Primary key tables in Fluss are mapped to Iceberg tables with:

  • Primary key constraints: The Iceberg table maintains the same primary key definition
  • Merge-on-read (MOR) strategy: Updates and deletes are handled efficiently using Iceberg's MOR capabilities
  • Bucket partitioning: Automatically partitioned by the primary key using Iceberg's bucket transform with the bucket num of Fluss to align with Fluss
  • Sorted by system column __offset: Sorted by the system column __offset (which is derived from the Fluss change log) to preserve the data order and facilitate mapping back to the original Fluss change log
Primary Key Table Example
CREATE TABLE user_profiles (
`user_id` BIGINT,
`username` STRING,
`email` STRING,
`last_login` TIMESTAMP,
`profile_data` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'table.datalake.enabled' = 'true',
'bucket.num' = '4',
'bucket.key' = 'user_id'
);

Corresponding Iceberg table structure:

CREATE TABLE user_profiles (
user_id BIGINT,
username STRING,
email STRING,
last_login TIMESTAMP,
profile_data STRING,
__bucket INT,
__offset BIGINT,
__timestamp TIMESTAMP_LTZ,
PRIMARY KEY (user_id) NOT ENFORCED
) PARTITIONED BY (bucket(user_id, 4))
SORTED BY (__offset ASC);

Log Tables

The table mapping for Fluss log table are a little of different depending on whether the bucket key is specified or not.

No Bucket Key

Log Table without bucket in Fluss are mapped to Iceberg tables with:

  • Identity partitioning: Using identity partitioning on the __bucket system column, which enables to seek to the data files in iceberg if a specified Fluss bucket is given
  • Sorted by system column __offset: Sorted by the system column __offset (which is derived from the Fluss log data) to preserve the data order and facilitate mapping back to the original Fluss log data
Log Table without Bucket Key
CREATE TABLE access_logs (
`timestamp` TIMESTAMP,
`user_id` BIGINT,
`action` STRING,
`ip_address` STRING
) WITH (
'table.datalake.enabled' = 'true',
'bucket.num' = '3'
);

Corresponding Iceberg table:

CREATE TABLE access_logs (
timestamp TIMESTAMP,
user_id BIGINT,
action STRING,
ip_address STRING,
__bucket INT,
__offset BIGINT,
__timestamp TIMESTAMP_LTZ
) PARTITIONED BY (IDENTITY(__bucket))
SORTED BY (__offset ASC);

Single Bucket Key

Log Table with one bucket key in Fluss are mapped to Iceberg tables with:

  • Bucket partitioning: Automatically partitioned by the bucket key using Iceberg's bucket transform with the bucket num of Fluss to align with Fluss
  • Sorted by system column __offset: Sorted by the system column __offset (which is derived from the Fluss log data) to preserve the data order and facilitate mapping back to the original Fluss log data
Log Table with Bucket Key
CREATE TABLE order_events (
`order_id` BIGINT,
`item_id` BIGINT,
`amount` INT,
`event_time` TIMESTAMP
) WITH (
'table.datalake.enabled' = 'true',
'bucket.num' = '5',
'bucket.key' = 'order_id'
);

Corresponding Iceberg table:

CREATE TABLE order_events (
order_id BIGINT,
item_id BIGINT,
amount INT,
event_time TIMESTAMP,
__bucket INT,
__offset BIGINT,
__timestamp TIMESTAMP_LTZ
) PARTITIONED BY (bucket(order_id, 5))
SORTED BY (__offset ASC);

Partitioned Tables

For Fluss partitioned tables, Iceberg first partitions by Fluss partition keys, then by following the above rules:

Partitioned Table Example
CREATE TABLE daily_sales (
`sale_id` BIGINT,
`amount` DECIMAL(10,2),
`customer_id` BIGINT,
`sale_date` STRING,
PRIMARY KEY (`sale_id`) NOT ENFORCED
) PARTITIONED BY (`sale_date`)
WITH (
'table.datalake.enabled' = 'true',
'bucket.num' = '4',
'bucket.key' = 'sale_id'
);

Corresponding Iceberg table:

CREATE TABLE daily_sales (
sale_id BIGINT,
amount DECIMAL(10,2),
customer_id BIGINT,
sale_date STRING,
__bucket INT,
__offset BIGINT,
__timestamp TIMESTAMP_LTZ,
PRIMARY KEY (sale_id) NOT ENFORCED
) PARTITIONED BY (IDENTITY(sale_date), bucket(sale_id, 4))
SORTED BY (__offset ASC);

Read Tables

Reading with other Engines

Since data tiered to Iceberg from Fluss is stored as standard Iceberg tables, you can use any Iceberg-compatible engine. Below is an example using StarRocks:

StarRocks with Hadoop Catalog

StarRocks SQL
CREATE EXTERNAL CATALOG iceberg_catalog
PROPERTIES (
"type" = "iceberg",
"iceberg.catalog.type" = "hadoop",
"iceberg.catalog.warehouse" = "/tmp/iceberg"
);
Query Examples
-- Basic query
SELECT COUNT(*) FROM iceberg_catalog.fluss.orders;

-- Time travel query
SELECT * FROM iceberg_catalog.fluss.orders
FOR SYSTEM_VERSION AS OF 123456789;

-- Query with bucket filtering for efficiency
SELECT * FROM iceberg_catalog.fluss.orders
WHERE __bucket = 1 AND __offset >= 100;

NOTE: The configuration values must match those used when configuring Iceberg as the lakehouse storage for Fluss in server.yaml.

Data Type Mapping

When integrating with Iceberg, Fluss automatically converts between Fluss data types and Iceberg data types:

Fluss Data TypeIceberg Data TypeNotes
BOOLEANBOOLEAN
TINYINTINTEGERPromoted to INT
SMALLINTINTEGERPromoted to INT
INTINTEGER
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
STRINGSTRING
CHARSTRINGConverted to STRING
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP (without timezone)
TIMESTAMP WITH LOCAL TIMEZONETIMESTAMP (with timezone)
BINARYBINARY
BYTESBINARYConverted to BINARY

Maintenance and Optimization

Auto Compaction

The table option table.datalake.auto-compaction (disabled by default) provides per-table control over automatic compaction. When enabled for a specific table, compaction is automatically triggered during write operations to that table by the tiering service.

Snapshot Metadata

Fluss adds specific metadata to Iceberg snapshots for traceability:

  • commit-user: Set to __fluss_lake_tiering to identify Fluss-generated snapshots
  • fluss-bucket-offset: JSON string containing the Fluss bucket offset mapping to track the tiering progress:
    [
    {"bucket": 0, "offset": 1234},
    {"bucket": 1, "offset": 5678},
    {"bucket": 2, "offset": 9012}
    ]

Limitations

When using Iceberg as the lakehouse storage layer with Fluss, the following limitations currently exist:

  • Union Read: Union read of data from both Fluss and Iceberg layers is not supported
  • Complex Types: Array, Map, and Row types are not supported
  • Multiple bucket keys: Not supported until Iceberg implements multi-argument partition transforms