An overview of the

In the previous article, we introduced how to use the stream keyword in the SELECT statement to perform a stream query, and simulated a simple data structure, interested students can go to see (streaming above). This article will continue to extend the case by combining Calcite with Kafka, using Kafka as a data provider and performing SQL queries.

What is a kafka

Kafka is a distributed message queue. It has high performance, persistence, multi-copy backup, and horizontal scaling capabilities. Producers write messages to queues and consumers fetch messages from queues for business logic. Generally, it plays the role of decoupling, peak cutting and asynchronous processing in architecture design. Kafka uses the concept of a topic externally, where producers write messages to a topic and consumers read messages from it. To achieve horizontal expansion, a topic is actually composed of multiple partitions. When encountering a bottleneck, you can increase the number of partitions to achieve horizontal expansion. The order of messages is guaranteed in a single PARition. Every time a new message is written, Kafka writes to the corresponding file append, so performance is very high. Kafka’s overall data flow looks like this:

The Producers write messages to the specified Topic in the Brokers, and the Consumers pull messages from the specified Topic in the Brokers, and then conduct business processing.

This part of the contents above reference from: https://www.jianshu.com/p/d3e963ff8b70

What is ZooKeeper? If you are interested, please do your own search.

Kafka environment setup

This chapter uses Kafka environment setup in Windows as an example. If you are already familiar with this part, skip this chapter. There are many ways to build a test, but here we use one that is relatively convenient and has a high success rate.

Creating a ZooKeeper environment

  • Download and unzip the zookeeper http://zookeeper.apache.org/releases.html#download

  • Go to the conf directory of the decompressed folder, copy zoo_sample. CFG, and rename it zoo.cfg

  • CFG file, change dataDir to dataDir=$zookeeper decompression path \data, this path can be configured, as long as you have permission to write

  • Add the environment variable ZOOKEEPER_HOME to the zooKeeper decompression path

  • Add ZOOKEEPER_HOME\bin to the PATH variable

  • Create a command line and execute zkServer

Kafka environment setup

  • Download and unzip the kafka http://kafka.apache.org/downloads, download, pay attention to the scala version, the follow-up development, may have influence

  • Go to the config directory of the decompressed folder

  • Edit the server.properties file and modify the log. Dirs =$kafka decompress path \kafka-logs, which is self-configurable as long as you have permission to write

  • Run the.\bin\ Windows \kafka-server-start. bat.config \server.properties command in the kafka decompression directory. You are advised to save the command as start. CMD in the directory for future use

Kafka environment test

Now that we have built a simple Kafka environment, we need to test the environment

First, add a dependency on Kafka in the previous project

Compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.1.0' compile group: 'org.apache.kafka', name: 'kafka_2.12' 'kafka-clients', version: '2.1.0' compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.1.0'Copy the code

Then create the theme

Create a topic

package com.dafei1288.calcite.stream.kafka; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.ExecutionException; Public class CreateTopic {public static void main(String[] args) {// CreateTopic Properties = new Properties(); props.put("bootstrap.servers", "localhost:2181"); AdminClient adminClient = AdminClient.create(props); ArrayList<NewTopic> topics = new ArrayList<NewTopic>(); NewTopic newTopic = new NewTopic("calcitekafka", 1, (short) 1); topics.add(newTopic); CreateTopicsResult result = adminClient.createTopics(topics); try { result.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }}}Copy the code

After creating the topic, let’s build a basic producer producter.

Create producter

package com.dafei1288.calcite.stream.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; public class Producter { private static KafkaProducer<String, String> producer; Private final static String topic = "calcitekafka"; public Producter(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Set the partition class, partition data according to the key producer = new KafkaProducer<String, String>(props); } public void produce(){ int i = 0; Random r = new Random(); for(;;) Send (new ProducerRecord<String, String>(TOPIC,i+++ +"", r.extboolean ()+"")); // create a random Boolean value every second. try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // producer.close(); } public static void main(String[] args) { new Producter().produce(); }}Copy the code

Since there is no formal business scenario, we run a simple simulation that generates a random Boolean value every second, and so on. With a producer, let’s build a consumer.

To create a consumer

package com.dafei1288.calcite.stream.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { private static KafkaConsumer<String, String> consumer; private final static String TOPIC = "calcitekafka"; public Consumer(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // give props. Put ("group.id", "test2"); // If value is valid, props. Put (" enable.auto.mit ", "true"); // Set how often to update the offset of the consumed message. Put (" auto.mit.interval ", "1000"); // Set the session response time, after which kafka can choose to abort consumption or consume the next message. Put ("session.timeout.ms", "30000"); // Automatically reset offset props. Put (" auto-.offset. Reset ","earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); } public void consume(){ consumer.subscribe(Arrays.asList(TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); System.out.println(); } } } public static void main(String[] args) { new Consumer().consume(); }}Copy the code

Here is a simple output of data in the console, the fragment is as follows:

offset = 328, key = 0, value = false
offset = 329, key = 1, value = false
offset = 330, key = 2, value = true
offset = 331, key = 3, value = true
offset = 332, key = 4, value = false
offset = 333, key = 5, value = false
offset = 334, key = 6, value = true
offset = 335, key = 7, value = true
offset = 336, key = 8, value = false
offset = 337, key = 9, value = true
offset = 338, key = 10, value = true
offset = 339, key = 11, value = trueCopy the code

This shows that the kafka environment we set up is successful. Let’s integrate calcite with calcite to replace the storage we wrote in the previous example

Integration of calcite kafka

Instead of using Java files to store data, we want to use Kafka as the data provider. First we need to rebuild a Schema file

Create kafkaStream. Json

{"version": "1.0", "defaultSchema": "bookshopstream", "schemas": [{"name": "bookshopstream", "tables": [{"name": "bookshopstream", "tables": [{"name": "bookshopstream", "tables": [{"name": "KF", "type": "custom", "factory": "com.dafei1288.calcite.stream.kafka.KafkaStreamTableFactory", "stream": { "stream": true }, "operand": { "topic": "calcitekafka", "bootstrap.servers": "localhost:9092", "group.id": "test2", "enable.auto.commit": "true", "auto.commit.interval.ms": "1000", "session.timeout.ms": "30000", "auto.offset.reset":"earliest", "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer", "colnames": "KK,VV", "timeouts": "2000" } } ] } ] }Copy the code

Here, we have to build a factory, it is com dafei1288. Calcite. Stream. Kafka. KafkaStreamTableFactory, the specific content of this class, we will be described in detail below.

Note that in the configuration of operand, we add a series of configurations, and for commonality, we write kafka and all the other necessary configurations in there.

Next, we look at the com. Dafei1288. Calcite. Stream. Kafka. KafkaStreamTableFactory, did some what?

KafkaStreamTableFactory

package com.dafei1288.calcite.stream.kafka; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TableFactory; import java.util.Map; public class KafkaStreamTableFactory implements TableFactory { @Override public Table create(SchemaPlus schema, String name, Map operand, RelDataType rowType) { System.out.println(operand); System.out.println(name); return new KafkaStreamTable(name,operand); }}Copy the code

Return new KafkaStreamTable(name,operand); KafkaStreamTable(name,operand); We’re passing operand as a parameter directly to the Table implementation. We’re passing the responsibility to make the Table more flexible. As opposed to the previous cases.

KafkaStreamTable

The KafkaStreamTable class is the most interesting part of this case, so let’s take a look at the code

package com.dafei1288.calcite.stream.kafka; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.StreamableTable; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Iterator; import java.util.Map; public class KafkaStreamTable extends AbstractTable implements StreamableTable, ScannableTable { @Override public Table stream() { return this; } private String name; private RelDataType dataType; private Map operand; private static KafkaConsumer<String, String> consumer; public KafkaStreamTable(String name){ System.out.println("KafkaStreamTable !!!!!! "+name ); this.name = name; } public KafkaStreamTable(String name, Map operand) { System.out.println("KafkaStreamTable !!!!!! "+name +" , "+operand); this.name = name; this.operand = operand; } @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { // System.out.println("RelDataType !!!!!!" ); if(dataType == null) { RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder(); // We need to store the stream table metadata. For example, I wrote the configuration information in kafkastream. json file. operand.get("colnames").toString().split(",")) { RelDataType sqlType = typeFactory.createJavaType(String.class); sqlType = SqlTypeUtil.addCharsetAndCollation(sqlType, typeFactory); fieldInfo.add(col, sqlType); } this.dataType = typeFactory.createStructType(fieldInfo); } return this.dataType; } @Override public Enumerable<Object[]> scan(DataContext root) { System.out.println("scan ...... "); consumer = new KafkaConsumer<String, String>(operand); consumer.subscribe(Arrays.asList(operand.get("topic").toString())); return new AbstractEnumerable<Object[]>() { public Enumerator<Object[]> enumerator() { return new Enumerator<Object[]>(){producter <Object[]>(){ ConsumerRecords<String, String> records = consumer.poll(Integer.parseInt(operand.get("timeouts").toString())); Iterator it =records.iterator(); private int cur = 0; @Override public Object[] current() { ConsumerRecord<String, String> reco = (ConsumerRecord<String, String>) it.next(); return new String[]{reco.key(),reco.value()}; } @Override public boolean moveNext() { //ConsumerRecord<String, String> record : records return it.hasNext(); } @Override public void reset() { } @Override public void close() { consumer.close(); }}; }}; }}Copy the code

The responsibility of this class is similar to that of the previous InMemoryTable, which provides how data is traversed and how data types are converted.

The previous mention of assigning the responsibility of defining a streaming service to this class is intended to increase flexibility, meaning that if kafka is not used to provide the data, it will be relatively easy to construct the data using other streaming tools.

In the public RelDataType getRowType(RelDataTypeFactory typeFactory) method, we need to provide a type mapping for the metadata in the stream. As mentioned earlier, I am using the metadata, Json file in the operand section of the kafkastream. json file. Here, producter data provides only a key and a Boolean, so we only create two columns KK and VV. And for the sake of illustration, we’re also going to crudely define the data type as string.

Next, we’ll subscribe to Kafka’s theme in the public Enumerable
scan(DataContext root) method and consume the emitted data. Since our producer generates data once per second, the consumer.poll(integer.parseint (operand.get(“timeouts”).toString())); We can control the amount of data per page by adding a similar parameter to operand: “max.poll.records”: 20.[]>

Now that we’re done with the basics, let’s test it out

test

package com.dafei1288.calcite.stream.kafka; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.util.ConversionUtil; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.Properties; public class TestKafkaStreamJDBC { public static void main(String[] args) { try { Class.forName("org.apache.calcite.jdbc.Driver"); } catch (ClassNotFoundException e1) { e1.printStackTrace(); } System.setProperty("saffron.default.charset", ConversionUtil.NATIVE_UTF16_CHARSET_NAME); System.setProperty("saffron.default.nationalcharset",ConversionUtil.NATIVE_UTF16_CHARSET_NAME); System.setProperty("saffron.default.collation.name",ConversionUtil.NATIVE_UTF16_CHARSET_NAME + "$en_US"); Properties info = new Properties(); String jsonmodle = "E:\\ working \\ thers\\ writing \\calcit etuto rial\\s rc\ m a in\\resour ces\ ka fkaStream. Json "; try { Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + jsonmodle, info); CalciteConnection calciteConn = connection.unwrap(CalciteConnection.class); ResultSet result = null; Statement st = connection.createStatement(); st = connection.createStatement(); //where b.name = 'datafile' result = st.executeQuery("select stream kf.kk,kf.vv from kf as kf "); // Where b.name = 'datafile' result = st.executeQuery("select stream kf.kk,kf.vv from kf as kf "); while(result.next()) { System.out.println(result.getString(1)+" \t "+result.getString(2)); } result.close(); }catch(Exception e){ e.printStackTrace(); }}}Copy the code

See our test statement select stream kf.kk,kf.vv from kf as kf

{topic=calcitekafka, bootstrap.servers=localhost:9092, group.id=test2, enable.auto.commit=true, auto.commit.interval.ms=1000, session.timeout.ms=30000, auto.offset.reset=earliest, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, colnames=key,value, timeouts=2000, ModelUri =E:\ working \ thers\ writing \calcit etuto rial\s rc\m a in\resour ces\k a fkastream. json, BaseDirectory =E:\ working \ thers\ writing \calcit etuto rial\s rc\m a in\resour ces} KF KafkaStreamTable!!!!!! KF , {topic=calcitekafka, bootstrap.servers=localhost:9092, group.id=test2, enable.auto.commit=true, auto.commit.interval.ms=1000, session.timeout.ms=30000, auto.offset.reset=earliest, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, colnames=key,value, timeouts=2000, ModelUri =E:\ working \ thers\ writing \calcit etuto rial\s rc\m a in\resour ces\k a fkastream. json, BaseDirectory =E:\ working \ thers\ writing \calcit etuto rial\s rc\m a in\resour ces} scan...... 283 false 284 false 285 false 286 true 287 true 288 true 289 false 290 false 291 false 292 true 293 false 294 true 295 false 296 true 297 trueCopy the code

At this point, the basic integration is complete.

At the end

The current case has only completed the preliminary integration, and will continue to expand this case, such as time window sliding, etc., please look forward to it

TBD