sequence

This article focuses on Dubbo’s AllDispatcher

Dispatcher

Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo remoting/Dispatcher. Java

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    /**
     * dispatch the message to threadpool.
     *
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispather"."channel.handler"})
    // The last two parameters are reserved for compatibility with the old configuration
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}
Copy the code
  • The Dispatcher interface defines the Dispatch method, which returns ChannelHandler

AllDispatcher

Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo/remoting/transport/dispatcher/all/AllDispat cher.java

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        returnnew AllChannelHandler(handler, url); }}Copy the code
  • AllDispatcher implements the Dispatcher interface and its Dispatch method returns the AllChannelHandler

AllChannelHandler

Dubbo – 2.7.3 / dubbo – remoting/dubbo – remoting API/SRC/main/Java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChanne lHandler.java

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the  opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does notreturn, and causes the consumer to wait for time out
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); }}}Copy the code
  • AllChannelHandler inherits WrappedChannelHandler, whose Connected, disconnected, received, and Caught get the thread pool from its parent getExecutorService. Then execute the created ChannelEventRunnable; Received method RejectedExecutionException when the capture the abnormal and the message is the Request, and when the Request is a twoWay returns SERVER_THREADPOOL_EXHAUSTED_ERROR

summary

  • The Dispatcher interface defines the Dispatch method, which returns ChannelHandler
  • AllChannelHandler inherits WrappedChannelHandler, whose Connected, disconnected, received, and Caught get the thread pool from its parent getExecutorService. Then execute the created ChannelEventRunnable
  • AllChannelHandler received method RejectedExecutionException when the capture the abnormal and the message is the Request, And server_threadpool_hausted_error is returned when request is twoWay

doc

  • AllDispatcher