Real-Time Analytics With Flink
This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss. The guide is derived from TPC-H Q5.
For more information on working with Flink, refer to the Apache Flink Engine section.
Environment Setup
Prerequisites
Before proceeding with this guide, ensure that Docker and the Docker Compose plugin are installed on your machine. All commands were tested with Docker version 27.4.0 and Docker Compose version v2.30.3.
We encourage you to use a recent version of Docker and Compose v2 (however, Compose v1 might work with a few adaptions).
Starting required components
We will use docker compose to spin up the required components for this tutorial.
- Create a working directory for this guide.
mkdir fluss-quickstart-flink
cd fluss-quickstart-flink
- Create a
docker-compose.ymlfile with the following content:
services:
#begin Fluss cluster
coordinator-server:
image: apache/fluss:0.8.0-incubating
command: coordinatorServer
depends_on:
- zookeeper
environment:
- |
FLUSS_PROPERTIES=
zookeeper.address: zookeeper:2181
bind.listeners: FLUSS://coordinator-server:9123
remote.data.dir: /tmp/fluss/remote-data
tablet-server:
image: apache/fluss:0.8.0-incubating
command: tabletServer
depends_on:
- coordinator-server
environment:
- |
FLUSS_PROPERTIES=
zookeeper.address: zookeeper:2181
bind.listeners: FLUSS://tablet-server:9123
data.dir: /tmp/fluss/data
remote.data.dir: /tmp/fluss/remote-data
kv.snapshot.interval: 0s
zookeeper:
restart: always
image: zookeeper:3.9.2
#end
#begin Flink cluster
jobmanager:
image: apache/fluss-quickstart-flink:1.20-0.8.0-incubating
ports:
- "8083:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: apache/fluss-quickstart-flink:1.20-0.8.0-incubating
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.off-heap.size: 256m
#end
The Docker Compose environment consists of the following containers:
- Fluss Cluster: a Fluss
CoordinatorServer, a FlussTabletServerand aZooKeeperserver. - Flink Cluster: a Flink
JobManagerand a FlinkTaskManagercontainer to execute queries.
Note: The apache/fluss-quickstart-flink image is based on flink:1.20.1-java17 and
includes the fluss-flink and
flink-connector-faker to simplify this guide.
- To start all containers, run:
docker compose up -d
This command automatically starts all the containers defined in the Docker Compose configuration in detached mode.
Run
docker container ls -a
to check whether all containers are running properly.
You can also visit http://localhost:8083/ to see if Flink is running normally.
- If you want to additionally use an observability stack, follow one of the provided quickstart guides here and then continue with this guide.
- If you want to run with your own Flink environment, remember to download the fluss-flink connector jar, flink-connector-faker and then put them to
FLINK_HOME/lib/. - All the following commands involving
docker composeshould be executed in the created working directory that contains thedocker-compose.ymlfile.
Congratulations, you are all set!
Enter into SQL-Client
First, use the following command to enter the Flink SQL CLI Container:
docker compose exec jobmanager ./sql-client
Note:
To simplify this guide, three temporary tables have been pre-created with faker connector to generate data.
You can view their schemas by running the following commands:
SHOW CREATE TABLE source_customer;
SHOW CREATE TABLE source_order;
SHOW CREATE TABLE source_nation;
Create Fluss Tables
Create Fluss Catalog
Use the following SQL to create a Fluss catalog:
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'coordinator-server:9123'
);
USE CATALOG fluss_catalog;
By default, catalog configurations are not persisted across Flink SQL client sessions. For further information how to store catalog configurations, see Flink's Catalog Store.
Create Tables
Running the following SQL to create Fluss tables to be used in this guide:
CREATE TABLE fluss_order (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME(),
PRIMARY KEY (`order_key`) NOT ENFORCED
);
CREATE TABLE fluss_customer (
`cust_key` INT NOT NULL,
`name` STRING,
`phone` STRING,
`nation_key` INT NOT NULL,
`acctbal` DECIMAL(15, 2),
`mktsegment` STRING,
PRIMARY KEY (`cust_key`) NOT ENFORCED
);
CREATE TABLE fluss_nation (
`nation_key` INT NOT NULL,
`name` STRING,
PRIMARY KEY (`nation_key`) NOT ENFORCED
);
CREATE TABLE enriched_orders (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`cust_name` STRING,
`cust_phone` STRING,
`cust_acctbal` DECIMAL(15, 2),
`cust_mktsegment` STRING,
`nation_name` STRING,
PRIMARY KEY (`order_key`) NOT ENFORCED
);
Streaming into Fluss
First, run the following SQL to sync data from source tables to Fluss tables:
EXECUTE STATEMENT SET
BEGIN
INSERT INTO fluss_nation SELECT * FROM `default_catalog`.`default_database`.source_nation;
INSERT INTO fluss_customer SELECT * FROM `default_catalog`.`default_database`.source_customer;
INSERT INTO fluss_order SELECT * FROM `default_catalog`.`default_database`.source_order;
END;
Fluss primary-key tables support high QPS point lookup queries on primary keys. Performing a lookup join is really efficient and you can use it to enrich
the fluss_orders table with information from the fluss_customer and fluss_nation primary-key tables.
INSERT INTO enriched_orders
SELECT o.order_key,
o.cust_key,
o.total_price,
o.order_date,
o.order_priority,
o.clerk,
c.name,
c.phone,
c.acctbal,
c.mktsegment,
n.name
FROM fluss_order o
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON o.cust_key = c.cust_key
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
ON c.nation_key = n.nation_key;
Run Ad-hoc Queries on Fluss Tables
You can now perform real-time analytics directly on Fluss tables. For instance, to calculate the number of orders placed by a specific customer, you can execute the following SQL query to obtain instant, real-time results.
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
-- switch to batch mode
SET 'execution.runtime-mode' = 'batch';
-- execute DML job synchronously
SET 'table.dml-sync' = 'true';
-- use limit to query the enriched_orders table
SELECT * FROM enriched_orders LIMIT 2;
Sample Output
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
| order_key | cust_key | total_price | order_date | order_priority | clerk | cust_name | cust_phone | cust_acctbal | cust_mktsegment | nation_name |
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
| 23199744 | 9 | 266.44 | 2024-08-29 | high | Clerk1 | Joe King | 908.207.8513 | 124.28 | FURNITURE | JORDAN |
| 10715776 | 2 | 924.43 | 2024-11-04 | medium | Clerk3 | Rita Booke | (925) 775-0717 | 172.39 | FURNITURE | UNITED |
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
If you are interested in a specific customer, you can retrieve their details by performing a lookup on the cust_key.
-- lookup by primary key
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
Sample Output
+----------+---------------+--------------+------------+---------+------------+
| cust_key | name | phone | nation_key | acctbal | mktsegment |
+----------+---------------+--------------+------------+---------+------------+
| 1 | Al K. Seltzer | 817-617-7960 | 1 | 533.41 | AUTOMOBILE |
+----------+---------------+--------------+------------+---------+------------+
Note: Overall the query results are returned really fast, as Fluss enables efficient primary key lookups for tables with defined primary keys.
Update/Delete rows on Fluss Tables
You can use UPDATE and DELETE statements to update/delete rows on Fluss tables.
Update
-- update by primary key
UPDATE fluss_customer SET `name` = 'fluss_updated' WHERE `cust_key` = 1;
Then you can lookup the specific row:
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
Sample Output
+----------+---------------+--------------+------------+---------+------------+
| cust_key | name | phone | nation_key | acctbal | mktsegment |
+----------+---------------+--------------+------------+---------+------------+
| 1 | fluss_updated | 817-617-7960 | 1 | 533.41 | AUTOMOBILE |
+----------+---------------+--------------+------------+---------+------------+
Notice that the name column has been updated to fluss_updated.
Delete
DELETE FROM fluss_customer WHERE `cust_key` = 1;
The following SQL query should return an empty result.
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
Clean up
After finishing the tutorial, run exit to exit Flink SQL CLI Container and then run
docker compose down -v
to stop all containers.
Learn more
Now that you're up and running with Fluss and Flink, check out the Apache Flink Engine docs to learn more features with Flink or this guide to learn how to set up an observability stack for Fluss and Flink.