Dubbo performance tuning parameters and principles

  • 1 Dubbo call model
  • 2 Common performance tuning parameters
  • 3 source code and principle analysis
    • 3.1 the threads
    • 3.2 iothreads
    • 3.3 the queues
    • 3.4 connections
    • 3.5 actives
    • 3.6 accepts
    • 3.7 executes

1 Dubbo call model

2 Common performance tuning parameters

Parameter names scope The default value instructions note
threads provider 200 Business processing thread pool size
iothreads provider The number of CPU + 1 I/O thread pool size
queues provider 0 Thread pool queue size, when the thread pool is full, queue queue size waiting for execution, it is recommended not to set, when the thread pool should fail immediately, retry other service provider machines, rather than queue, unless there is a special requirement.
connections consumer 0 For the maximum number of connections per provider, short connection protocols such as RMI, HTTP, and Hessian represent the number of limited connections, while Dubbo represents the number of established long connections The Dubbo protocol shares a long connection by default
actives consumer 0 Maximum number of concurrent calls per service consumer per service per method 0 indicates no limit
accepts provider 0 Maximum number of connections acceptable to the service provider 0 indicates no limit
executes provider 0 The maximum number of requests that a service provider can execute in parallel per service per method 0 indicates no limit

3 source code and principle analysis

3.1 the threads

FixedThreadPool.java

public Executor getExecutor(URL url) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
            queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() : 
                            new LinkedBlockingQueue<Runnable>(queues)),
            new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
Copy the code

LimitedThreadPool.java

public Executor getExecutor(URL url) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
        	queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() : 
        			        new LinkedBlockingQueue<Runnable>(queues)),
        	new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
Copy the code

Constants.DEFAULT_QUEUES = 200. The Threads parameter configures the maximum (or core) number of threads in the business processing thread pool.

3.2 iothreads

NettyServer.java

Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; }}); // bind channel = bootstrap.bind(getBindAddress()); }Copy the code

3.3 the queues

Java, LimitedThreadPool. Java, and CachedThreadPool. Java, respectively. See Section 3.2 for code details. As can be seen from the code, the default value is 0, indicating the use of synchronous blocking queues; If queues are set to a value less than 0, use blocking linked list queues with capacity integer.max_value. If it is any other value, the blocking list queue of the specified size is used.

3.4 connections

DubboProtocol.java

Private ExchangeClient[] getClients(URL URL){// Whether to share the connection Boolean Service_SHARE_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); If (connections == 0){service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }Copy the code

DubboInvoker.java

Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }}Copy the code

The default value is 0, indicating that all clients share one long connection for each Provider. Otherwise, a specified number of long connections are established. When called, if there are more than one long connection, polling is used to obtain one long connection.

3.5 actives

ActiveLimitFilter.java

public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);  long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if(max>0){ synchronized (count) { count.notify(); }}}}Copy the code

When a Consumer call is made, the service and method dimensions are counted, and if the number of concurrent calls exceeds the set maximum, the current thread is blocked until the request is processed.

3.6 accepts

AbstractServer.java

@Override
public void connected(Channel ch) throws RemotingException {
    Collection<Channel> channels = getChannels();
    if (accepts > 0 && channels.size() > accepts) {
        logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
        ch.close();
        return;
    }
    super.connected(ch);
}
Copy the code

When the number of connections exceeds the maximum value, the current connection is closed.

3.7 executes

ExecuteLimitFilter.jvava

public Result invokeOrg(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
    if (max > 0) {
        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
        if (count.getActive() >= max) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
        }
    }
    long begin = System.currentTimeMillis();
    boolean isException = false;
    RpcStatus.beginCount(url, methodName);
    try {
        Result result = invoker.invoke(invocation);
        return result;
    } catch (Throwable t) {
        isException = true;
        if(t instanceof RuntimeException) {
            throw (RuntimeException) t;
        }
        else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
    }
    finally {
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
    }
}
Copy the code

When the Provider processes requests, it collects statistics on method dimension invocations. If the number of concurrent requests exceeds the maximum value, the Provider does not throw exceptions directly.