Fluss Java Client
Overview
Fluss Admin API that supports asynchronous operations for managing and inspecting Fluss resources. It communicates with the Fluss cluster and provides methods for:
- Managing databases (create, drop, list)
- Managing tables (create, drop, list)
- Managing partitions (create, drop, list)
- Retrieving metadata (schemas, snapshots, server information)
Fluss Table API allows you to interact with Fluss tables for reading and writing data.
Dependency
In order to use the client, you need to add the following dependency to your pom.xml file.
<!-- https://mvnrepository.com/artifact/org.apache.fluss/fluss-client -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>0.8.0-incubating</version>
</dependency>
Initialization
Connection is the main entry point for the Fluss Java client. It is used to create Admin and Table instances.
The Connection object is created using the ConnectionFactory class, which takes a Configuration object as an argument.
The Configuration object contains the necessary configuration parameters for connecting to the Fluss cluster, such as the bootstrap servers.
The Connection object is thread-safe and can be shared across multiple threads. It is recommended to create a
single Connection instance per application and use it to create multiple Admin and Table instances.
Table and Admin instances, on the other hand, are not thread-safe and should be created for each thread that needs to access them.
Caching or pooling of Table and Admin is not recommended.
Create a new Admin instance :
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "localhost:9123");
Connection connection = ConnectionFactory.createConnection(conf);
// obtain Admin instance from the Connection
Admin admin = connection.getAdmin();
admin.listDatabases().get().forEach(System.out::println);
// obtain Table instance from the Connection
Table table = connection.getTable(TablePath.of("my_db", "my_table"));
System.out.println(table.getTableInfo());
if you are using SASL authentication, you need to set the following properties:
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "localhost:9123");
conf.setString("client.security.protocol", "sasl");
conf.setString("client.security.sasl.mechanism", "PLAIN");
conf.setString("client.security.sasl.username", "alice");
conf.setString("client.security.sasl.password", "alice-secret");
Connection connection = ConnectionFactory.createConnection(conf);
// obtain Admin instance from the Connection
Admin admin = connection.getAdmin();
admin.listDatabases().get().forEach(System.out::println);
// obtain Table instance from the Connection
Table table = connection.getTable(TablePath.of("my_db", "my_table"));
System.out.println(table.getTableInfo());
Working Operations
All methods in FlussAdmin return CompletableFuture objects. You can handle these in two ways:
Blocking Operations
For synchronous behavior, use the get() method:
// Blocking call
List<String> databases = admin.listDatabases().get();
Asynchronous Operations
For non-blocking behavior, use the thenAccept, thenApply, or other methods:
admin.listDatabases()
.thenAccept(databases -> {
System.out.println("Available databases:");
databases.forEach(System.out::println);
})
.exceptionally(ex -> {
System.err.println("Failed to list databases: " + ex.getMessage());
return null;
});