This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

In the RocketMQ configuration, there is a configuration item maxMessageSize that changes the size limits for sending and receiving messages, and here is a way to handle this without changing this configuration item.

In the RoketMQ document, an example of a large number of message cutting encoding method, but this method can only handle the total number of messages is greater than 4M but the single message is less than 4M, it is not suitable for the single message itself is greater than 4M, this paper discusses a cutting method in this case.

Realize the basic

For a single message cutting, after the cutting is completed, the producer sends the message to the consumer end, and the consumer end needs to perform message splicing and recovery. We use MessageBuilder to build the message, and then use rocketMQTemplate to send the message. Take asynchronous sending asyncSend as an example, inspect what kind of processing is done to the message before sending, to determine the preparation before cutting the message, the source code is as follows:

public static Message convertToRocketMessage(MessageConverter messageConverter, String charset, String destination, org.springframework.messaging.Message
        message) {
    Object payloadObj = message.getPayload();
    byte[] payloads;
    try {
        if (null == payloadObj) {
            throw new RuntimeException("the message cannot be empty");
        }
        if (payloadObj instanceof String) {
            payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
        } else if (payloadObj instanceof byte[]) {
            payloads = (byte[]) ((byte[])message.getPayload());
        } else {
            String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
            if (null == jsonObj) {
                throw new RuntimeException(String.format("empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]", messageConverter.getClass(), payloadObj.getClass(), payloadObj)); } payloads = jsonObj.getBytes(Charset.forName(charset)); }}catch (Exception var7) {
        throw new RuntimeException("convert to RocketMQ message failed.", var7);
    }
    return getAndWrapMessage(destination, message.getHeaders(), payloads);
}
Copy the code

The above source code is a necessary step to process message payloads before sending them. As you can see, message payloads are eventually converted to byte [], so they should be converted to byte [] before cutting.

Cutting and sending

After clear cutting way, need to change the message into byte [], here with com. Alibaba. Fastjson. The JSON getBytes () method, cutting class in the reference documentation, build a suitable Iterator implementation:

public class SplitMessage implements Iterator<byte[] >{
    // Cut the size
    public final int SPLIT_SIZE = 1024 * 1024 * 4 - 80;
    public final byte [] message;
    private int cursor = 0;
    public SplitMessage(byte [] message){
        this.message = message;
    }
    @Override
    public boolean hasNext(a){
        return cursor < size();
    }
    @Override
    public byte [] next(){
        byte [] r;
        int len;
        if(cursor < size() - 1){
            len = SPLIT_SIZE;
        } else {
            len = message.length - cursor * SPLIT_SIZE;
        }
        r = new byte[len];
        for(int i = 0 ; i < len; i++){
            r[i] = message[i + cursor * SPLIT_SIZE];
        }
        cursor++;
        return r;
    }
    public int size(a){
        int s = message.length / SPLIT_SIZE;
        int y = message.length % SPLIT_SIZE;
        if(y ! =0){
            s++;
        }
        returns; }}Copy the code

The sending method can be

public String sendRefundExmMessage(Map<String, String> map, int id){
    byte[] m = JSON.toJSONString(map).getBytes();
    SplitMessage sm = new SplitMessage(m);
    int len = sm.size();
    int i = 0;
    while (sm.hasNext()){
        byte [] now = sm.next();
        // Set the total length of the message header to the position of the message, for costumer end stitching
        Message msg = MessageBuilder.withPayload(now).setHeader(MessageConst.PROPERTY_KEYS, id + "-" + i + "-" + len).build();
        // Send it asynchronously
        rocketMQTemplate.asyncSend("testTopic:tag", msg, new SendCallback(){
            @Override
            public void onSuccess(SendResult sendResult){
                // Successfully processed
            }
            @Override
            public void onException(Throwable throwable){
                // Error handling}}); i++; }return "done";
}
Copy the code
Receiving and splicing

Since the message is sent asynchronously, each message arrives at the consumer end in a different order. Therefore, it needs to be cached and then determine whether all messages are received. Consumer can be written as follows:

@Component
@RocketMQMessageListener( consumerGroup = "test", topic = "testTopic", selectorExpression = "tag", messageModel = MessageModel.CLUSTERING, selectorType = SelectorType.TAG )
public class ContentConsumer implements RocketMQListener<MessageExt> {
    // Inject the Redis service
    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    public void onMessage(MessageExt msg) {
        String key = msg.getKeys();
        String[] keyValues = key.split("-");
        int id = new Integer(keyValues[0]).intValue();
        boolean repeat = false;
        try {
            // Check whether messages in the cache are consumed
            repeat = redisTemplate.opsForValue().get(id) == null ? false : (boolean) redisTemplate.opsForValue().get(id);
        } catch (Exception e) {
            // Error handling
        }
        if (repeat) {
            // If already consumed, avoid repeated consumption
            return;
        } else {
            // Get the current message fragment location
            int cursor = new Integer(keyValues[1]).intValue();
            // Get the total number of message fragments
            int size = new Integer(keyValues[2]).intValue();
            if (cursor < size - 1) {
                // If the location is cached before the last item
                redisTemplate.opsForValue().set(id + "-" + cursor, msg.getBody());
                // Set the save time
                redisTemplate.boundValueOps(id + "-" + cursor).expire(5L, TimeUnit.MINUTES);
            } else {
                // If it is the last fragment, proceed to the next step
                byte[] all = new byte[0];
                for (int i = 0; i < size - 1; i++) {
                    // Query Redis every 500 milliseconds to see if valuable messages are received
                    int max = 100;
                    while (redisTemplate.opsForValue().get(id + "-" + i) == null && max > 0) {
                        try {
                            Thread.sleep(500L);
                            max--;
                        } catch (InterruptedException e) {
                            / / an error}}// If the number of messages is less than the total number after 50 seconds, an error is returned.
                    if (max == 0) return;
                    // Get the I message
                    byte[] temp = (byte[]) redisTemplate.opsForValue().get(id + "-" + i);
                    all = ByteBuffer.allocate(all.length + temp.length).put(all).put(temp).array();
                    redisTemplate.delete(id + "-" + i);
                }
                all = ByteBuffer.allocate(all.length + msg.getBody().length).put(all).put(msg.getBody()).array();
                JSONObject json = JSON.parseObject(new String(all));
                Map<String, String> map = (Map<String, String>) json;
                // Message processing
                
                // Set the message consumption success flag
                redisTemplate.opsForValue().set(id, true);
                redisTemplate.boundValueOps(id).expire(5L, TimeUnit.MINUTES); }}return; }}Copy the code

The main requirement of the above consumer side is to have a proper way to verify and concatenate messages. The method provided here is for reference only. This method can be extended to multiple server deployments, as long as one Redis service is shared.

This paper only discusses one possible solution. For large volume messages, not only the method discussed in this paper can be used, and the solution provided in this paper is not optimal.