“This is the 21st day of my participation in the August Gwen Challenge

start

pom.xml

< the dependency > < groupId > IO. Vertx < / groupId > < artifactId > vertx Cassandra - client < / artifactId > < version > 4.1.2 < / version > </dependency>Copy the code

Gradle

The compile 'IO. Vertx: vertx Cassandra - client: 4.1.2'Copy the code

Cassandra is a distributed system with many nodes. Linking Cassandra requires adding nodes of the cluster

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);
Copy the code

Bind keySpace 30 stars

API

Examples

Edit

v4.1.2

Cassandra Client for Vert.x

A Vert.x client allowing applications to interact with an Apache Cassandra service.

Getting started

To use this module, add the following to the dependencies section of your Maven POM file:

< the dependency > < groupId > IO. Vertx < / groupId > < artifactId > vertx Cassandra - client < / artifactId > < version > 4.1.2 < / version > </dependency>Copy the code

Or, if you use Gradle:

The compile 'IO. Vertx: vertx Cassandra - client: 4.1.2'Copy the code

Creating a client

Client options

Cassandra is a distributed system that can have many nodes. To connect to Cassandra, you need to specify the address of some cluster node when you create the object: CassandraClientOptions

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);
Copy the code

By default, vert. x’s Cassandra client connects to the port of the local machine and is not bound to any particular key space. However, you can set one or both of these options: 9042

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("localhost", 9142)
  .setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.create(vertx, options);
Copy the code

How clusters in the same keySpace are linked

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142)
  .setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);
Copy the code

The life cycle

After the client is created, it does not connect until the first query is executed. A client created in VertX will stop automatically when it is not deployed, so there is no need to call close in the syntax to stop it. Cassandra Klein (vert. x stack – Documentation 4.1.2 API) (vertx.io)

In other cases, the client must be shut down manually

API

Streaming

Streaming queries are good for queries that produce iterative results, especially for large numbers of rows.

cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'", queryStream -> { if (queryStream.succeeded()) { CassandraRowStream stream = queryStream.result(); // resume stream when queue is ready to accept buffers again response.drainHandler(v -> stream.resume()); stream.handler(row -> { String value = row.getString("my_string_col"); response.write(value); // pause row stream when we buffer queue is full if (response.writeQueueFull()) { stream.pause(); }}); // end request when we reached end of the stream stream.endHandler(end -> response.end()); } else { queryStream.cause().printStackTrace(); // response with internal server error if we are not able to execute given query response .setStatusCode(500) .end("Unable to execute the query"); }});Copy the code

In the above code, the query is executed and the results are streamed over HTTP.

Process all rows in batches

cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", executeWithFullFetch -> { if (executeWithFullFetch.succeeded()) { List<Row> rows = executeWithFullFetch.result(); for (Row row : rows) { // handle each row here } } else { System.out.println("Unable to execute the query"); executeWithFullFetch.cause().printStackTrace(); }});Copy the code

Ps: use bulk pickups only when you can withstand the complete results set in memory.

Collector queries

cassandraClient.execute("SELECT * FROM users", listCollector, ar -> { if (ar.succeeded()) { // Get the string created by the collector String list = ar.result(); System.out.println("Got " + list); } else { System.out.println("Failure: " + ar.cause().getMessage()); }});Copy the code

Preloaded query

Use prepared statements to resolve all queries that are used multiple times

cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ", preparedStatementResult -> {
  if (preparedStatementResult.succeeded()) {
    System.out.println("The query has successfully been prepared");
    PreparedStatement preparedStatement = preparedStatementResult.result();
    // now you can use this PreparedStatement object for the next queries
  } else {
    System.out.println("Unable to prepare the query");
    preparedStatementResult.cause().printStackTrace();
  }
});
Copy the code

Then use the PreparedStatement for all the next queries:

cassandraClient.execute(preparedStatement.bind("my_value"), done -> {
  ResultSet results = done.result();
  // handle results here
});

// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"), done -> {
  List<Row> results = done.result();
  // handle results here
});

// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"), done -> {
  CassandraRowStream results = done.result();
  // handle results here
});
Copy the code

RxJava 3 API

Create an Rxified Client

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.corp.int", 7000)
  .addContactPoint("node2.corp.int", 7000)
  .addContactPoint("node3.corp.int", 7000);
CassandraClient cassandraClient = CassandraClient.createShared(vertx, options);
Copy the code

The query

Streaming

cassandraClient.rxQueryStream("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
  // Convert the stream to a Flowable
  .flatMapPublisher(CassandraRowStream::toFlowable)
  .subscribe(row -> {
    // Handle single row
  }, t -> {
    // Handle failure
  }, () -> {
    // End of stream
  });
Copy the code

batch

cassandraClient.rxExecuteWithFullFetch("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
  .subscribe(rows -> {
    // Handle list of rows
  }, throwable -> {
    // Handle failure
  });
Copy the code