>>>> 😜😜😜 Github: πŸ‘‰ github.com/black-ant CASE Backup: πŸ‘‰ gitee.com/antblack/ca…

A. The preface

Suddenly, I found that the Server side accepts the request part is missing, this part is a bit loopy, build several threads to process, so it is necessary to patch up

Seata’s Server and Client communicate mainly through Netty

The main purpose of communication between Client and Server is to create a transaction GlobalSession and register Branch into GlobalSession.

2. Client segment request

The previous article described the Client call flow

/ / C - DefaultTransactionManager GlobalSession launched the Begin
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    // Initiate a Netty request
    // timeout=300000,transactionName=dubbo-gts-seata-example
    return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
}

Copy the code

3. Seata Server processing

Seata sends a GlobalLockRequest to the Server. Seata sends a GlobalLockRequest to the Server. Seata sends a GlobalLockRequest to the Server via Netty.


/ / entrance class: AbstractNettyRemotingServer
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
    / /...
}

// Call process:
C- AbstractNettyRemotingServer # channelRead
C- AbstractNettyRemoting # processMessage


// RM registration
RegRmProcessor
    
// 
BatchLogHandler
DefaultCoordinator    
AbstractNettyRemotingServer
    
Copy the code

3.1 Entry of process

// Step 1: Netty request entry
C- AbstractNettyRemotingServer
    M- channelRead(final ChannelHandlerContext ctx, Object msg)
        -  processMessage(ctx, (RpcMessage) msg)
    
    
// Step 2: Process Message
C- AbstractNettyRemoting
    M- processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)
    
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        
        // Prepare the Pair object, get the execution class -> 3.2
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        
        if(pair ! =null) {
            if(pair.getSecond() ! =null) {
                try {
                    // Step 1: Execute ExecutorService and prepare threads
                    pair.getSecond().execute(() -> {
                        try {
                            // Step 2: Execute process
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                        } finally{ MDC.clear(); }}); }catch (RejectedExecutionException e) {
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@") [0];
                        int idx = new Random().nextInt(100);
                        try {
                            Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                        } catch (IOException exx) {
                        }
                        allowDumpStack = false; }}}else{ pair.getFirst().process(ctx, rpcMessage); }}}}Copy the code

3.2 the ExecutorService perform

Initialization process of the ExecutorService

As you can see from the structure diagram above, the final abstract class is AbstractNettyRemoting, and there are two executorServices

// The ExecutorService is executed periodically after init
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1.new NamedThreadFactory("timeoutChecker".1.true));

// Used for Netty Request processing. This object is passed in through the constructor at Server initialization
ThreadPoolExecutor messageExecutor;

// PS: Here's a refresher
C- Server # main
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
        NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
        new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);


Copy the code

The process of using ExecutorService

// As you can see from above, the main thread used here is NamedThreadFactory
public Thread newThread(Runnable r) {
    String name = prefix + "_" + counter.incrementAndGet();
    if (totalSize > 1) {
        name += "_" + totalSize;
    }
    // Group : java.lang.ThreadGroup[name=main,maxpri=10]
    // name : ServerHandlerThread_1_18_500
    Thread thread = new FastThreadLocalThread(group, r, name);

    thread.setDaemon(makeDaemons);
    if(thread.getPriority() ! = Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); }return thread;
}

// PS: Why FastThreadLocalThread?FastThreadLocal throughput is ThreadLocal3Times!!!!!! < span style = "box-sizing: border-box; color: RGB (50, 50, 50); font-size: 14px! Important;Copy the code

3.3 Processor processing

3.3.1 Processor loading process

Processor loading is performed in The NettyRemotingServer, which is described in 3 steps:

Step 1: Enable nettyRemotingServer init in Server # main. Step 2: Register various processors. Build pairs are put into collections for runtime use

// Step 1 :Server # main
public static void main(String[] args) throws IOException {
     
    / /... Other logic is omitted and Netty is initialized
    try {
        nettyRemotingServer.init();
    } catch (Throwable e) {
        System.exit(-1);
    }
    System.exit(0);
}


// Step 2: init initialization
public void init(a) {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false.true)) {
        super.init(); }}Step 3: Register the Processor
private void registerProcessor(a) {
    ServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());
    
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    
    //------
    ServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

    //------
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

    //------
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

    //------
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    
}   

// PS: As you can see, there are roughly the following types of processors:- ServerOnRequestProcessor: processing client requests information - RM/TM ServerOnResponseProcessor: processing information - RegRmProcessor RM/TM client: RM Registration processor - RegTmProcessor: TM registration processor - ServerHeartbeatProcessor: Processes heartbeat information// PS: you can also see that a messageExecutor (ThreadPoolExecutor) is passed in, which will be used to create subsequent threads


// Step End: As you can see, the Pair has already been built for later use
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
    Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
    this.processorTable.put(messageType, pair);
}

// AbstractNettyRemoting (AbstractNettyRemoting)
protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

Copy the code

3.2 Processing requests

// Step 1 : θΏ›ε…₯Process 倄理 (ServerOnRequestProcessor)
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    onRegRmMessage(ctx, rpcMessage);
}

// Step 2: Process Request
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
		// omit the Log operation
        if(! (messageinstanceof AbstractMessage)) {
            return;
        }
        if (message instanceof MergedWarpMessage) {
            AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
            // If multiple requests are made
            for (int i = 0; i < results.length; i++) {
                final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
                // 3.3 Call Handler to process Message
                results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
            }
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results);
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
        } else {
            // If it is a separate request
            final AbstractMessage msg = (AbstractMessage) message;
            // 3.3 Call Handler to process MessageAbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext); remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result); }}Copy the code

3.3 Handlers Process Message respectively

As you can see, there are multiple MessageHandlers for processing

/ / the core processing for TransactionMessageHandler: in the upper processing the received RPC messages
I- TransactionMessageHandler
	M- AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)
	M- void onResponse(AbstractResultMessage response, RpcContext context)

C- DefaultCoordinator    
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
	if(! (requestinstanceof AbstractTransactionRequestToTC)) {
		throw new IllegalArgumentException();
	}
	AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
	transactionRequest.setTCInboundHandler(this);

	return transactionRequest.handle(context);
}


C- AbstractTCInboundHandler
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    // Initiate Gouble processing
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                        e);
                }
            }
        }, request, response);
        return response;
}

Copy the code

3.4 Starting Processing

As you can see, doGlobalBegin processing officially begins in 3.3, which is linked to the Session section

public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
Copy the code

Added four.

4.1. Seata RM end Registration

// The main object registered on the Server is RegRmProcessor

@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    onRegRmMessage(ctx, rpcMessage);
}

// Initiate Server registration
private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
    // RegisterRMRequest{resourceIds='null', applicationId='business-seata-example', transactionServiceGroup='business-service-seata-service-group'}
    RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
    String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
    boolean isSuccess = false;
    String errorInfo = StringUtils.EMPTY;
    try {
        if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
            // ChannelManager Registers the current channel
            ChannelManager.registerRMChannel(message, ctx.channel());
            // Control the Channel version
            Version.putChannelVersion(ctx.channel(), message.getVersion());
            isSuccess = true; }}catch (Exception exx) {
        isSuccess = false;
        errorInfo = exx.getMessage();
    }
    // Build returns the result
    RegisterRMResponse response = new RegisterRMResponse(isSuccess);
    if (StringUtils.isNotEmpty(errorInfo)) {
        response.setMsg(errorInfo);
    }
    remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
}
Copy the code

The same goes for RegTmProcessor. I won’t look at it here, but you can look at the channel and version control in the future

4.2 ServerHeartbeatProcessor Heartbeat detection

public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    try {
        // I just typed a log
        remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG);
    } catch (Throwable throwable) {
        LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
    }
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("received PING from {}", ctx.channel().remoteAddress()); }}Copy the code

conclusion

Finally, the link is complete, the whole Seata is through, the back is not ready to open a new article, ready to optimize the article well, add details!!

To summarize, ThreadPoolExecutor and NettyRemotingServer are initialized in Server # main. In the AbstractNettyRemotingServer NettyRequest for processing.