Getting Started with Spark Engine
Supported Spark Versions
| Fluss Connector Versions | Supported Spark Versions |
|---|---|
| 0.10 | 3.4, 3.5 |
Feature Support
Fluss supports Apache Spark's SQL API and Spark Structured Streaming.
| Feature Support | Spark | Notes |
|---|---|---|
| SQL Create Catalog | ✔️ | |
| SQL Create Database | ✔️ | |
| SQL Drop Database | ✔️ | |
| SQL Create Table | ✔️ | |
| SQL Drop Table | ✔️ | |
| SQL Describe Table | ✔️ | |
| SQL Show Tables | ✔️ | |
| SQL Alter Table | ✔️ | SET/UNSET TBLPROPERTIES |
| SQL Show Partitions | ✔️ | |
| SQL Add Partition | ✔️ | |
| SQL Drop Partition | ✔️ | |
| SQL Select (Batch) | ✔️ | Log table and primary-key table |
| SQL Insert Into | ✔️ | Log table and primary-key table |
| Structured Streaming Read | ✔️ | Log table and primary-key table |
| Structured Streaming Write | ✔️ | Log table and primary-key table |
Preparation when using Spark SQL
- Download Spark
Spark runs on all UNIX-like environments, i.e., Linux, Mac OS X. You can download the binary release of Spark from the Apache Spark Downloads page, then extract the archive:
tar -xzf spark-3.5.7-bin-hadoop3.tgz
- Copy Fluss Spark Bundled Jar
Download Fluss Spark Bundled jar and copy to the jars directory of your Spark home.
cp fluss-spark-3.5_2.12-0.10-SNAPSHOT.jar <SPARK_HOME>/jars/
- Start Spark SQL
To quickly start the Spark SQL CLI, you can use the provided script:
<SPARK_HOME>/bin/spark-sql \
--conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
--conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
--conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
Or start Spark Shell:
<SPARK_HOME>/bin/spark-shell \
--conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
--conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
--conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
Creating a Catalog
The Fluss catalog can be configured in spark-defaults.conf or passed as command-line arguments.
Using spark-defaults.conf:
spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog
spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123
spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
Or configure programmatically in Scala/Python:
val spark = SparkSession.builder()
.config("spark.sql.catalog.fluss_catalog", "org.apache.fluss.spark.SparkCatalog")
.config("spark.sql.catalog.fluss_catalog.bootstrap.servers", "localhost:9123")
.config("spark.sql.extensions", "org.apache.fluss.spark.FlussSparkSessionExtensions")
.getOrCreate()
- The
spark.sql.catalog.fluss_catalog.bootstrap.serversmeans the Fluss server address. Before you config thebootstrap.servers, you should start the Fluss server first. See Deploying Fluss for how to build a Fluss cluster. Here, it is assumed that there is a Fluss cluster running on your local machine and the CoordinatorServer port is 9123. - The
spark.sql.catalog.fluss_catalog.bootstrap.serversconfiguration is used to discover all nodes within the Fluss cluster. It can be set with one or more (up to three) Fluss server addresses (either CoordinatorServer or TabletServer) separated by commas.
Creating a Database
USE fluss_catalog;
CREATE DATABASE fluss_db;
USE fluss_db;
Creating a Table
CREATE TABLE pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) TBLPROPERTIES (
'primary.key' = 'shop_id,user_id',
'bucket.num' = '4'
);
Data Writing
To append new data to a table, you can use INSERT INTO:
INSERT INTO pk_table VALUES
(1234, 1234, 1, 1),
(12345, 12345, 2, 2),
(123456, 123456, 3, 3);
Data Reading
To retrieve data, you can use a SELECT statement:
SELECT * FROM pk_table ORDER BY shop_id;
To preview a subset of data from a log table with projection and filter:
SELECT shop_id, total_amount FROM pk_table WHERE num_orders > 1;
Type Conversion
Fluss's integration for Spark automatically converts between Spark and Fluss types.
Fluss -> Apache Spark
| Fluss | Spark |
|---|---|
| BOOLEAN | BooleanType |
| TINYINT | ByteType |
| SMALLINT | ShortType |
| INT | IntegerType |
| BIGINT | LongType |
| FLOAT | FloatType |
| DOUBLE | DoubleType |
| CHAR | CharType |
| STRING | StringType |
| DECIMAL | DecimalType |
| DATE | DateType |
| TIMESTAMP | TimestampNTZType |
| TIMESTAMP_LTZ | TimestampType |
| BYTES | BinaryType |
| ARRAY | ArrayType |
| MAP | MapType |
| ROW | StructType |
The MAP type is currently supported for table creation and schema mapping, but read and write operations on MAP columns are not yet supported. Full MAP type read/write support will be available soon.
Apache Spark -> Fluss
| Spark | Fluss |
|---|---|
| BooleanType | BOOLEAN |
| ByteType | TINYINT |
| ShortType | SMALLINT |
| IntegerType | INT |
| LongType | BIGINT |
| FloatType | FLOAT |
| DoubleType | DOUBLE |
| CharType | CHAR |
| StringType | STRING |
| VarcharType | STRING |
| DecimalType | DECIMAL |
| DateType | DATE |
| TimestampType | TIMESTAMP_LTZ |
| TimestampNTZType | TIMESTAMP |
| BinaryType | BYTES |
| ArrayType | ARRAY |
| MapType | MAP (read/write not yet supported) |
| StructType | ROW |