Seata communication module analysis

Server

Overall overview

In seATA project, client and server use NetTY to complete the communication based on TCP. AbstractNettyRemoting implements remote message processing, synchronous/asynchronous sending, task timeout management and other basic functions. AbstractNettyRemotingServer based on AbstractNettyRemoting further encapsulate, defines the processing of remote message ServerHandler, realize the business interface in the RemotingServer. The NettyRemotingServer is responsible for the assembly of business processors.

AbstractNettyRemoting

In AbstractNettyRemoting, all messages that need to be sent synchronically are placed in a ConcurrentHashMap, where the key is the ID of the synchronization message and the value is a custom MessageFuture. The synchronous message will first initialize a MessageFuture when it is sent, and CompletableFuture in MessageFuture realizes the functions of blocking waiting for asynchronous results and setting asynchronous results (same as ChannelPromise in Netty). The AbstractNettyRemoting constructor initializes a scheduled task to periodically clean up all synchronous message requests that have timed out from these maps.

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    if (timeoutMillis <= 0) {
        throw new FrameworkException("timeout should more than 0ms");
    }
    if (channel == null) {
        LOGGER.warn("sendSync nothing, caused by null channel.");
        return null;
    }

    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);

    channelWritableCheck(channel, rpcMessage.getBody());

    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    doBeforeRpcHooks(remoteAddr, rpcMessage);

    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if(! future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if(messageFuture1 ! =null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); }});try {
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        return result;
    } catch (Exception exx) {
        LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
            rpcMessage.getBody());
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        } else {
            throw newRuntimeException(exx); }}}Copy the code

Before sending the request, the corresponding channel is checked to see if it is writable, and then a custom hook is called. The hook is loaded using the EnhancedServiceLoader implementation SPI. Finally, a send listener is registered and the channel is closed when sending fails. It is worth mentioning in the check whether the channel can be used when writing to the lock, when the channel is not available will be calling wait releases the lock, and when the channel channelWritabilityChanged callback and then notify all write operations. From a global perspective, different channels use the same lock. Although certain concurrency efficiency is sacrificed, when a channel is not writable, it usually means that I/O has hit a bottleneck. If data is written to a channel without control, The data will be queued in the channel’s ChannelOutboundBuffer, causing a vicious cycle of data backlogs and triggering OOM. This problem is avoided in the simplest and most straightforward way.

private void channelWritableCheck(Channel channel, Object msg) {
    int tryTimes = 0;
    synchronized (lock) {
        while(! channel.isWritable()) {try {
                tryTimes++;
                if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
                    destroyChannel(channel);
                    throw new FrameworkException("msg:" + ((msg == null)?"null" : msg.toString()),
                        FrameworkErrorCode.ChannelIsNotWritable);
                }
                lock.wait(NOT_WRITEABLE_CHECK_MILLS);
            } catch(InterruptedException exx) { LOGGER.error(exx.getMessage()); }}}}Copy the code

Asynchronous message sending is similar to synchronous message sending, eliminating the need to put it into futureMap. In fact, the processing process of Sentinel is similar to that of Sentinel. It is to get the type of the message body, and then get the corresponding processor from the map of the processor. Unlike Sentinel, Sentinel processes the request directly in the NETty I/O thread. Seata Server provides the ability to execute code asynchronously within the ExecutorService, primarily to handle blocking I/O operations that include databases and avoid blocking COMMUNICATION I/O threads.

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s".this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if(pair ! =null) {
            if(pair.getSecond() ! =null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally{ MDC.clear(); }}); }catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@") [0];
                        int idx = new Random().nextInt(100);
                        try {
                            Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false; }}}else {
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch(Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); }}}else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); }}else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); }}Copy the code

AbstractNettyRemotingServer

AbstractNettyRemotingServer inherited from AbstractNettyRemoting, responsible for the initialization of Netty Server endpoint (including registered ServerHandler) and registration message handler. There are two main things to focus on here: the ServerBootstrap setup and the ChannelManager for link management.

Compared to Sentinel, Seata incorporates TCP KeepAlive Settings and IdleStateHandler to coordinate the removal of TCP idle links. Note that Seata sets the write buffer water level. In AbstractNettyRemoting, the writability of a Channel is determined by the difference between the current buffer content and the water level. It cannot be written above the highest water mark until it is below the lowest water mark.

this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
    .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
    .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
    .option(ChannelOption.SO_REUSEADDR, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childOption(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
        new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
            nettyServerConfig.getWriteBufferHighWaterMark()))
    .localAddress(new InetSocketAddress(listenPort))
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0.0))
                .addLast(new ProtocolV1Decoder())
                .addLast(new ProtocolV1Encoder());
            if(channelHandlers ! =null) { addChannelPipelineLast(ch, channelHandlers); }}});Copy the code

ChannelManager is used by the Server to manage remote connections. Three ConCurrenthashMaps are used to manage remote connections. These are IDENTIFIED_CHANNELS, RM_CHANNELS, and TM_CHANNELS. The Value of the Entry in these three maps is not the Channel itself, but the RpcContext. RpcContext encapsulates information about a Channel. Each RpcContext corresponds to a remote Channel and contains basic information about the remote Channel, such as the application ID and client ID corresponding to the current Channel. In particular, the ConcurrentMap, clientIDHolderMap, clientTMHolderMap, and clientRMHolderMap in RpcContext. ClientIdHolderMap stores all channels and their corresponding RpcContext. ClientTMHolderMap stores port numbers and their corresponding RpcContext. ClientRMHolderMap stores all resource ids and their corresponding RpcContext(RpcContext stores the port number and RpcContext). These maps are actually the same as the three maps from the ChannelManager.

private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS = new ConcurrentHashMap<>();

/** * resourceId -> applicationId -> ip -> port -> RpcContext */
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String,
    ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>();

/** * ip+appname,port */
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
    = new ConcurrentHashMap<>();
Copy the code

In fact, these maps are used to find the Channel corresponding to the remote application in the business process of Seata. For example, when the remote RM needs to be notified to carry out the two-stage submission, all the RM corresponding to the current transaction needs to be found to send the submission request.

public static Channel getChannel(String resourceId, String clientId) {
    Channel resultChannel = null;

    //client-id consists of the application ID: address
    String[] clientIdInfo = readClientId(clientId);

    if (clientIdInfo == null|| clientIdInfo.length ! =3) {
        throw new FrameworkException("Invalid Client ID: " + clientId);
    }

    String targetApplicationId = clientIdInfo[0];
    String targetIP = clientIdInfo[1];
    int targetPort = Integer.parseInt(clientIdInfo[2]);

    // First find all the applications that registered the resource
    ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
        RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);

    if (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("No channel is available for resource[{}]", resourceId);
        }
        return null;
    }

    ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);

    if(ipMap ! =null && !ipMap.isEmpty()) {
        // Firstly, try to find the original channel through which the branch was registered.
        // First try to find the corresponding Channel based on the ClientId at the time of registration
        ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
        if(portMapOnTargetIP ! =null && !portMapOnTargetIP.isEmpty()) {
            RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
            if(exactRpcContext ! =null) {
                Channel channel = exactRpcContext.getChannel();
                if (channel.isActive()) {
                    resultChannel = channel;
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Just got exactly the one {} for {}", channel, clientId); }}else {
                    if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Removed inactive {}", channel); }}}}// The original channel was broken, try another one.
            // If the original Channel is not found, try to find an available Channel from another port of the application
            if (resultChannel == null) {
                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP
                    .entrySet()) {
                    Channel channel = portMapOnTargetIPEntry.getValue().getChannel();

                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info(
                                "Choose {} on the same IP[{}] as alternative of {}", channel, targetIP, clientId);
                        }
                        break;
                    } else {
                        if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(),
                            portMapOnTargetIPEntry.getValue())) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel);
                            }
                        }
                    }
                }
            }
        }

        // No channel on the this app node, try another one.
        // Otherwise, try to search the IP address of another application node
        if (resultChannel == null) {
            for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap
                .entrySet()) {
                if (ipMapEntry.getKey().equals(targetIP)) { continue; }

                ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
                if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
                    continue;
                }

                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
                    Channel channel = portMapOnOtherIPEntry.getValue().getChannel();

                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Choose {} on the same application[{}] as alternative of {}", channel, targetApplicationId, clientId);
                        }
                        break;
                    } else {
                        if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(),
                            portMapOnOtherIPEntry.getValue())) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel); }}}}if(resultChannel ! =null) { break; }}}}// If the Channel cannot be found in the corresponding application, try to find an application registered with the RM from other applications. This method is fine for AT mode, but for TCC mode, there will be no two-phase related code for different applications with different resources
    if (resultChannel == null) {
        resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);

        if (resultChannel == null) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("No channel is available for resource[{}] as alternative of {}", resourceId, clientId); }}else {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Choose {} on the same resource[{}] as alternative of {}", resultChannel, resourceId, clientId); }}}return resultChannel;

}
Copy the code

NettyRemotingServer is the actual class that actually assembs the message registry and starts the Server endpoint. The code is relatively simple and won’t go into detail. One thing to note here is that in both Seata and Sentinel, The actual class responsible for starting the Server uses an atomic variable to ensure that it is started only once.

Client

Overall overview

On the Client side, basic message sending and message processing also uses functions provided by AbstractNettyRemoting. Unlike AbstractNettyRemotingServer AbstractNettyRemotingClient message sending and connection in the establishment of the added some client level optimization. Compared with RemotingServer, RemotingClient adds an additional interface for processing registered messages, which is implemented by the corresponding TmNettingRemotingClient and RmNettyRemotingClient.

AbstractNettyRemotingClient

Two additional functions have been added to the initialization function, one is timed disconnection reconnection function, and another is the client request batch sent thread pool initialization. In Seata’s design, each Client establishes a connection with all the current servers. This Server is usually a dynamic list obtained from the service registry, so the Client uses a timed pull list. Maintains a connection by checking whether a connection has been established, rather than by listening for disconnected events like Sentinel.

@Override
public void init(a) {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}
Copy the code

When sending synchronous messages, the Client looks up the address of a target Server node from the Server list before sending messages. If the batch sending mode is enabled, the messages are not only put into futureMap, but also into the blocking queue corresponding to the Server address. The message is eventually sent by MergedSendRunnable from the blocking queue’s Map. Notice that MergeLock is used here, which uses the wait method of the lock. When a message is sent, the MergeLock thread waits to avoid CPU load due to empty rotation, and then activates the thread based on the status. So we actually notice that the merge send happens when the thread goes from the wait state to the run state until the send process is complete.

@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
    String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
    int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

    // send batch message
    // put message into basketMap, @see MergedSendRunnable
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {

        // send batch message is sync request, needs to create messageFuture and put it in futures.
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        // put message into basketMap
        BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
            key -> new LinkedBlockingQueue<>());
        if(! basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                    serverAddress, rpcMessage);
            return null;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: {}", rpcMessage.getBody());
        }
        if(! isSending) {synchronized(mergeLock) { mergeLock.notifyAll(); }}try {
            return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}",
                exx.getMessage(), serverAddress, rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw newRuntimeException(exx); }}}else {
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis); }}Copy the code

Note that if the client is enbaleNative and not running on a MAC, epoll mode will be set to edge trigger mode. In addition to edge trigger mode, there is also horizontal trigger mode. Edge trigger mode refers to each incident will inform only once, level trigger mode refers to as long as the event does not finish consumption, will have been notified, for example, if read data in the buffer, but has not been completely finished reading, edge trigger mode will not trigger a readable again, and the level of trigger mode will trigger can be read again. The TCP_QUICKACK option is also enabled on the client side to speed up the response of ACK packets.

this.bootstrap.group(this.eventLoopGroupWorker).channel(
    nettyClientConfig.getClientChannelClazz()).option(
    ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
    ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
    ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
    nettyClientConfig.getClientSocketRcvBufSize());

if (nettyClientConfig.enableNative()) {
    if (PlatformDependent.isOsx()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("client run on macOS"); }}else {
        bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
            .option(EpollChannelOption.TCP_QUICKACK, true); }}Copy the code

Finally look at the client side is used to manage connection NettyClientChannelManager, in NettyClientChannelManager GenericKeyedObjectPool links to maintain a Channel pool, and includes three map, ChannelLocks are used to store the locks for each channel. The locks are used to ensure that only one connection is established for the same Server address. Channels are used to store all channels. PoolKeyMap is used to store the mapping between server addresses and NettyPoolKey, which is also the Key of the Channel connection pool. The NettyPoolKey is an object generated according to the Server address. Different Client classes implement different generation methods. The Key contains the role information of the current Client, the address, and a message, which is the message registered for the first time.

Let’s take a quick look at ChannelMananger’s main flow by fetching a connection. First, it obtains the corresponding Channel from Channels, and then it needs to judge whether the Channel is activated. In getExistAliveChannel, if the Channel is not activated, it will not immediately enter the connection process. Instead, you choose to wait some time to see if any connections are in progress. Otherwise, the system obtains a lock object corresponding to the server address, and then connects to the server.

Channel acquireChannel(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if(channelToServer ! =null) {
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if(channelToServer ! =null) {
            returnchannelToServer; }}if (LOGGER.isInfoEnabled()) {
        LOGGER.info("will connect to " + serverAddress);
    }
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
        returndoConnect(serverAddress); }}Copy the code

In the doConnect function, we see that the Channel is checked again before the connection. If not, the connection is confirmed. The connection is fetched from the object pool, and if the connection is fetched from the object pool, the Key is updated if it is RM’s NettyPoolKey.

private Channel doConnect(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if(channelToServer ! =null && channelToServer.isActive()) {
        return channelToServer;
    }
    Channel channelFromPool;
    try {
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
        if(previousPoolKey ! =null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
            RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
            ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
        }
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
        LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}
Copy the code

NettyPoolableFactory is the object factory class that actually creates and destroys connections. When creating a connection, it also sends the corresponding Request information in NettyPoolKey to check whether the registration is successful.

@Override
public Channel makeObject(NettyPoolKey key) {
    InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyPool create channel to " + key);
    }
    Channel tmpChannel = clientBootstrap.getNewChannel(address);
    long start = System.currentTimeMillis();
    Object response;
    Channel channelToServer = null;
    if (key.getMessage() == null) {
        throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
    }
    try {
        response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
        if(! isRegisterSuccess(response, key.getTransactionRole())) { rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage()); }else{ channelToServer = tmpChannel; rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage()); }}catch (Exception exx) {
        if(tmpChannel ! =null) {
            tmpChannel.close();
        }
        throw new FrameworkException(
            "register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
            response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
            + channelToServer);
    }
    return channelToServer;
}
Copy the code

The TmNettyRemotingClient and RmNettyRemotingClient do not do too much work, just define some basic parameters, similar to the Server side, no more details.

agreement

In Seata, the communication messages between Client and Server are finally abstracted into an RpcMessage object, which contains the message ID, message type, message encoding, compressor, header, and the real message body.

Used for decoding ProtocolV1Decoder is actually inherited from LengthFieldBasedFrameDecoder, decoding process is relatively straightforward, parsing the head content first, and then obtain the message body body, according to the compression field judgment if decompression, Which deserializer to use depends on the serialization field.

public Object decodeFrame(ByteBuf frame) {
    byte b0 = frame.readByte();
    byte b1 = frame.readByte();
    if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
            || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
        throw new IllegalArgumentException("Unknown magic code: " + b0 + "," + b1);
    }

    byte version = frame.readByte();
    // TODO check version compatible here

    int fullLength = frame.readInt();
    short headLength = frame.readShort();
    byte messageType = frame.readByte();
    byte codecType = frame.readByte();
    byte compressorType = frame.readByte();
    int requestId = frame.readInt();

    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setCodec(codecType);
    rpcMessage.setId(requestId);
    rpcMessage.setCompressor(compressorType);
    rpcMessage.setMessageType(messageType);

    // direct read head with zero-copy
    int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
    if (headMapLength > 0) {
        Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
        rpcMessage.getHeadMap().putAll(map);
    }

    // read body
    if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
        rpcMessage.setBody(HeartbeatMessage.PING);
    } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
        rpcMessage.setBody(HeartbeatMessage.PONG);
    } else {
        int bodyLength = fullLength - headLength;
        if (bodyLength > 0) {
            byte[] bs = new byte[bodyLength]; frame.readBytes(bs); Compressor compressor = CompressorFactory.getCompressor(compressorType); bs = compressor.decompress(bs); Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name()); rpcMessage.setBody(serializer.deserialize(bs)); }}return rpcMessage;
}
Copy the code

For Protobuf, different from Sentinel, Sentinel uses the outer layer of Protobuf serialization, and uses any type to define the content of the message body, while Seata outer layer is customized serialization, and the inner layer of content adopts different serialization methods, which is more flexible. With Protobuf, different serialization methods can be used for different field types, making transfer more efficient.

@Override
public <T> T deserialize(byte[] bytes) {
    if (bytes == null) {
        throw new NullPointerException();
    }
    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
    // Get the class name
    int clazzNameLength = byteBuffer.getInt();
    byte[] clazzName = new byte[clazzNameLength];
    byteBuffer.get(clazzName);
    // Get object data
    byte[] body = new byte[bytes.length - clazzNameLength - 4];
    byteBuffer.get(body);
    final String descriptorName = new String(clazzName, UTF8);
    Class protobufClazz = ProtobufConvertManager.getInstance().fetchProtoClass(descriptorName);
    Object protobufObject = ProtobufInnerSerializer.deserializeContent(protobufClazz.getName(), body);
    //translate back to core model
    final PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(protobufClazz.getName());
    Object newBody = pbConvertor.convert2Model(protobufObject);
    return (T)newBody;
}
Copy the code