Skip to main content
Version: Next

Getting Started with Spark Engine

Supported Spark Versions

Fluss Connector VersionsSupported Spark Versions
0.103.4, 3.5

Feature Support

Fluss supports Apache Spark's SQL API and Spark Structured Streaming.

Feature SupportSparkNotes
SQL Create Catalog✔️
SQL Create Database✔️
SQL Drop Database✔️
SQL Create Table✔️
SQL Drop Table✔️
SQL Describe Table✔️
SQL Show Tables✔️
SQL Alter Table✔️SET/UNSET TBLPROPERTIES
SQL Show Partitions✔️
SQL Add Partition✔️
SQL Drop Partition✔️
SQL Select (Batch)✔️Log table and primary-key table
SQL Insert Into✔️Log table and primary-key table
Structured Streaming Read✔️Log table and primary-key table
Structured Streaming Write✔️Log table and primary-key table

Preparation when using Spark SQL

  • Download Spark

Spark runs on all UNIX-like environments, i.e., Linux, Mac OS X. You can download the binary release of Spark from the Apache Spark Downloads page, then extract the archive:

tar -xzf spark-3.5.7-bin-hadoop3.tgz
  • Copy Fluss Spark Bundled Jar

Download Fluss Spark Bundled jar and copy to the jars directory of your Spark home.

cp fluss-spark-3.5_2.12-0.10-SNAPSHOT.jar <SPARK_HOME>/jars/
  • Start Spark SQL

To quickly start the Spark SQL CLI, you can use the provided script:

<SPARK_HOME>/bin/spark-sql \
--conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
--conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
--conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions

Or start Spark Shell:

<SPARK_HOME>/bin/spark-shell \
--conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
--conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
--conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions

Creating a Catalog

The Fluss catalog can be configured in spark-defaults.conf or passed as command-line arguments.

Using spark-defaults.conf:

spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog
spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123
spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions

Or configure programmatically in Scala/Python:

val spark = SparkSession.builder()
.config("spark.sql.catalog.fluss_catalog", "org.apache.fluss.spark.SparkCatalog")
.config("spark.sql.catalog.fluss_catalog.bootstrap.servers", "localhost:9123")
.config("spark.sql.extensions", "org.apache.fluss.spark.FlussSparkSessionExtensions")
.getOrCreate()
note
  1. The spark.sql.catalog.fluss_catalog.bootstrap.servers means the Fluss server address. Before you config the bootstrap.servers, you should start the Fluss server first. See Deploying Fluss for how to build a Fluss cluster. Here, it is assumed that there is a Fluss cluster running on your local machine and the CoordinatorServer port is 9123.
  2. The spark.sql.catalog.fluss_catalog.bootstrap.servers configuration is used to discover all nodes within the Fluss cluster. It can be set with one or more (up to three) Fluss server addresses (either CoordinatorServer or TabletServer) separated by commas.

Creating a Database

Spark SQL
USE fluss_catalog;
Spark SQL
CREATE DATABASE fluss_db;
USE fluss_db;

Creating a Table

Spark SQL
CREATE TABLE pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) TBLPROPERTIES (
'primary.key' = 'shop_id,user_id',
'bucket.num' = '4'
);

Data Writing

To append new data to a table, you can use INSERT INTO:

Spark SQL
INSERT INTO pk_table VALUES
(1234, 1234, 1, 1),
(12345, 12345, 2, 2),
(123456, 123456, 3, 3);

Data Reading

To retrieve data, you can use a SELECT statement:

Spark SQL
SELECT * FROM pk_table ORDER BY shop_id;

To preview a subset of data from a log table with projection and filter:

Spark SQL
SELECT shop_id, total_amount FROM pk_table WHERE num_orders > 1;

Type Conversion

Fluss's integration for Spark automatically converts between Spark and Fluss types.

Fluss -> Apache Spark

FlussSpark
BOOLEANBooleanType
TINYINTByteType
SMALLINTShortType
INTIntegerType
BIGINTLongType
FLOATFloatType
DOUBLEDoubleType
CHARCharType
STRINGStringType
DECIMALDecimalType
DATEDateType
TIMESTAMPTimestampNTZType
TIMESTAMP_LTZTimestampType
BYTESBinaryType
ARRAYArrayType
MAPMapType
ROWStructType
note

The MAP type is currently supported for table creation and schema mapping, but read and write operations on MAP columns are not yet supported. Full MAP type read/write support will be available soon.

Apache Spark -> Fluss

SparkFluss
BooleanTypeBOOLEAN
ByteTypeTINYINT
ShortTypeSMALLINT
IntegerTypeINT
LongTypeBIGINT
FloatTypeFLOAT
DoubleTypeDOUBLE
CharTypeCHAR
StringTypeSTRING
VarcharTypeSTRING
DecimalTypeDECIMAL
DateTypeDATE
TimestampTypeTIMESTAMP_LTZ
TimestampNTZTypeTIMESTAMP
BinaryTypeBYTES
ArrayTypeARRAY
MapTypeMAP (read/write not yet supported)
StructTypeROW