Introduction of communication bottom layer

Xxl-job uses netty HTTP to communicate. Mina, Jetty, and Netty TCP are also supported, but netty HTTP is always written in the code

Ii. Overall communication process

I take the scheduler informing the executor to execute the task as an example and draw the activity graph:

Activity diagrams

3. Amazing design

After looking at the whole process code, the design can be said to be unique, netty, multi-threaded knowledge of the use of flowing water, I will now these design points out of color summarized as follows:

1. Use the dynamic proxy mode to hide communication details

The ExecutorBiz interface contains operations such as heartbeat, pause, and trigger execution, while the AdminBiz interface contains callback, registration, and unregistration operations. There is no communication related processing in the implementation class of the interface. The getObject() method of the XxlRpcReferenceBean class generates a proxy class that communicates remotely.

2. Asynchronous processing

The executor receives the message and deserializes it. Instead of synchronizing the execution of the task code, it stores the task information in

In LinkedBlockingQueue, asynchronous threads fetch task information from this queue and execute it. The result of the task is not returned synchronously, but asynchronously sent back to the blocking queue of the callback thread. The benefits of this process are reduced processing time for netty worker threads and increased throughput.

3. Packaging for asynchronous processing

Asynchronous processing is wrapped so that the code appears to be invoked synchronously.

The XxlJobTrigger class triggers the code to execute the task:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ ReturnT<String> runResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); RunResult = executorbiz.run (triggerParam); executorBiz.run(executorBiz.run); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } StringBuffer runResultSB = new StringBuffer(i18nutil. getString(" jobConf_trigger_run ") + ": "); RunResultSB. Append (" < br > address: "), append (address); RunResultSB. Append (" < br > code: "), append (runResult, getCode ()); RunResultSB. Append (" < br > MSG: "), append (runResult. The getMsg ()); runResult.setMsg(runResultSB.toString()); return runResult; }Copy the code

Executorbiz. run is a dynamic agent that communicates with the executor, and the executor returns results asynchronously, whereas the run method waits synchronously for results to return.

Let’s look at how xxl-job synchronously obtains processing results:

The thread blocks after the scheduler sends a message to the executor. Wait until the executor processing is completed, the processing results will be returned, wake up

The blocked thread gets the return value at the call.

The dynamic proxy code is as follows:

SYNC == CallType) {// Future-Response set XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null); try { // do invoke client.asyncSend(finalAddress, xxlRpcRequest); // future get XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS); if (xxlRpcResponse.getErrorMsg() ! = null) { throw new XxlRpcException(xxlRpcResponse.getErrorMsg()); } return xxlRpcResponse.getResult(); } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest); throw (e instanceof XxlRpcException)? e:new XxlRpcException(e); } finally{ // future-response remove futureResponse.removeInvokerFuture(); }}Copy the code

The XxlRpcFutureResponse class implements thread waiting and thread awakening:

Public void setResponse(XxlRpcResponse Response) {this.response = response; synchronized (lock) { done = true; lock.notifyAll(); } } @Override public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (! Done) {synchronized (lock) {try {if (timeout < 0) {// thread blocking lock.wait(); } else { long timeoutMillis = (TimeUnit.MILLISECONDS==unit)? timeout:TimeUnit.MILLISECONDS.convert(timeout , unit); lock.wait(timeoutMillis); } } catch (InterruptedException e) { throw e; } } } if (! done) { throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString()); } return response; }Copy the code

Some of you might ask, how does the scheduler decide which thread to wake up when it receives the return result?

With each remote call, the request ID of the UUID is generated, and this ID is passed throughout the call, like a key that opens the door when you return home. Here, with the request ID key, you find the corresponding XxlRpcFutureResponse and call the setResponse method, setting the return value and waking up the thread.

public void notifyInvokerFuture(String requestId, Final XxlRpcResponse XxlRpcResponse){// Find XxlRpcFutureResponse with requestId,  final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId); if (futureResponse == null) { return; } if (futureResponse.getInvokeCallback()! =null) { // callback type try { executeResponseCallback(new Runnable() { @Override public void run() { if (xxlRpcResponse.getErrorMsg() ! = null) { futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg())); } else { futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult()); }}}); }catch (Exception e) { logger.error(e.getMessage(), e); }} else {/ / notify method call lock inside futureResponse. SetResponse (xxlRpcResponse); } // do remove futureResponsePool.remove(requestId); }Copy the code