* Author: Ni Ze, RocketMQ Senior contributor, one of the maintainers of RocketMQ-Streams, ali Cloud technology expert. * RocketMQ-Streams is a lightweight streaming engine that is embedded and launched as an SDK for streaming computation, independent of other components, with a minimum of 1 gb for deployment, and great advantages in resource-sensitive scenarios. It also supports UTF/UTAF/UTDF calculation types. At present, it has been widely used in security, risk control, edge computing and other scenarios.

This installment will take you through the rocketMQ-Streams build and data flow process from a source code perspective. It also discusses how RocketMQ-Streams implements failover and scaling.

Use the sample

Code examples:

public class RocketMQWindowExample {
    public static void main(String[] args) {
        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
        source.fromRocketmq(
                "topicName",
                "groupName",
                false,
                "namesrvAddr")
                .map(message -> JSONObject.parseObject((String) message))
                .window(TumblingWindow.of(Time.seconds(10)))
                .groupBy("groupByKey")
                .sum(" Field name ", "output alias ").count("total")
                .waterMark(5)
                .setLocalStorageOnly(true)
                .toDataSteam(a).toPrint(1)
                .start();
    }
}
Copy the code

Pom file dependencies:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-streams-clients</artifactId>
  <version>1.0.1-preview</version>
</dependency>
Copy the code

The above code is a simple example of use. Its main function is to read data from RocketMQ specified topic, convert to JSON format, group with groupByKey field values, 10 second window, accumulate OutFlow field values, output to total field and print to console. In the above calculation, input is allowed to be out-of-order for 5 seconds, that is, it will not be triggered immediately after the window time reaches, but wait 5 seconds. If the window data arrives within this period, it is still valid. If setLocalStorageOnly is true, RocksDB is used for local storage instead of remote storage. The current rocketMQ-Streams release of 1.0.1 still uses Mysql as the remote state store, and the next release will use RocketMQ as the remote state store.

RocketMQ overall architecture diagram

Rocketmq-streams, a lightweight stream processing engine, essentially consumes data as a client of RocketMQ. A stream processing instance can process multiple queues, whereas a queue can only be consumed by one instance. Several RocketMQ-Streams instances form consumer groups to consume data together. Expand the number of instances to increase processing power. Reduce the number of instances and rebalance the consumption. From the above figure, we can also see that there is no need to exchange any data directly between computing instances, which can independently complete all computing processes. This architecture simplifies the design of RocketMQ-Streams itself, while also making it easy to scale up instances.

Processing topology A processor topology defines the computational logic of a stream processing process for an application. It consists of a series of processor nodes and data streams. For example, in the code example at the beginning, the entire processing topology consists of source, Map, groupBy, sum, Count, print, and other processing nodes. There are two special processing nodes:

• The source node does not have any upstream nodes and reads data externally into RocketMQ-Streams for downstream processing. • Sink node He does not have any downstream nodes, and he writes out the processed data externally.

The processing topology is simply a logical abstraction of the flow processing code that will be instantiated when the flow computation is started. For simplicity of design, there is currently only one computational topology in a stream processing instance. Among all stream operators, there are two special operators, one is groupBy, which involves data grouping, and the other is stateful calculation such as count. These two operators affect the construction of the entire computational topology, and how RocketMQ-Streams deals with them is discussed below.

For example, if sum is added after groupBy(” grade “), it is a sum of groups by grade. If sum is added after groupBy(” grade “), it is a sum of groups by grade. This requires rerouting data of the same “grade” to a stream compute instance, otherwise the results on each instance will be incomplete and the overall output will be incorrect.

Rocketmq-streams uses shuffle Topic. Specifically, the compute instance resends groupBy data back to a RocketMQ topic, selects the target queue based on the hash value of the key, and reads data from this topic for subsequent streaming processing. The same key must be in a queue after the key hash, and a queue will only be consumed by one stream processing instance. In this way, the same key will be routed to one instance for processing.

Stateful operators Stateful operators are the opposite of stateless operators. If the calculation result is only related to the current input and has nothing to do with the last input, it is a stateless operator. For example, the results of filter, map and foreach are only related to the current input. There is also an operator whose output depends not only on the current operator but also on the last input, such as sum, which needs to be summed over a period of time, which is a stateful operator. Rocketmq-streams uses RocksDB as a local store and Mysql as a remote store to hold state data. His specific approach is:

  1. When a message is found to be coming from a new queue, check whether the state needs to be loaded, and if asynchronously to RocksDB.
  2. When the data arrives at the stateful operator, the state in RocksDB is used for calculation if the load is complete; if not, the state in Mysql is used for calculation.
  3. After the calculation is complete, the status data is saved to RocksDB and Mysql.
  4. When the window fires, the status data is queried from RocksDB and the result is passed to the downstream operator.

The overall data flow diagram is as follows:

Capacity expansion and fault recovery

Scaling and failover are two sides of the same coin, namely, two expressions of the same thing. If a computing cluster can scale correctly, it is capable of failover, and vice versa. As you know, RocketMQ-Streams has excellent capacity scaling performance. You only need to deploy a new stream compute instance for capacity scaling and stop the compute instance for capacity scaling. Stateless computing is simpler. After capacity expansion, data calculation does not need the previous state. The scaling of stateful computing involves state migration. Stateful expansion and contraction capacity can be represented by the following figure:

When a compute instance shrinks from 3 to 2, MQ is redistributed between the compute instances using RocketMQ rebalance.

MQ2 and MQ3 consumed on Instance1 are allocated to Instance2 and Instance3, and the state data for these two MQS also needs to be migrated to Instance2 and Instance3, which implies that the state data is saved against the source data sharding; Expansion is the opposite process.

Implementationally, RocketMQ-Streams uses system messages to trigger state loading and persistence.

System message type:

RemoveSplitMessage // The client persists the consumption point to the MQ CheckPointMessageCopy the code

When a message is found to be coming from a new RocketMQ queue (MessageQueue), RocketMQ-Streams, which has not previously processed messages from this queue, sends the NewSplitMessage message before the data, which is delivered by processing the topology downstream operator, When the stateful operator receives the message, it will load the state corresponding to the new queue into local memory RocksDB. When the data actually arrives, it will continue to calculate according to this state.

After rebalance, the RemoveSplitMessage is sent when the instance is no longer consuming a MessageQueue because of an increase in instances or a change in the RocketMQ cluster. The ststate operator removes the state in the local RocksDB.

CheckPointMessage is a special system message whose role is related to implementing exactly-once. We need to achieve exactly-once during capacity expansion and reduction to ensure that capacity expansion or fault recovery has no impact on the calculation results. Rocketmq-streams will generate a CheckPointMessage before submitting the consumption offset to the broker, which will be transmitted to the downstream topology. Rocketmq-streams will ensure that all messages about to submit the consumption offset have been processed by Sink.

Rocketmq-streams repository:Github.com/apache/rock…RocketMQ Warehouse address:Github.com/apache/rock…

Join the Apache RocketMQ community

In the next release, you will be a contributor to Apache RocketMQ. In the community, you will not only meet the community leaders, improve your technical skills, but also increase your personal influence and promote your own growth.

Community 5.0 is in full development, and there are nearly 30 SIG (interest groups) waiting for you to join. Welcome students who aspire to build world-class distributed systems to join the community. Rocketmq666 can join the group to contribute to the next generation of message, event and stream fusion processing platform.