One, foreword

Some time ago, I saw a good article “After reading this article, you will be able to write RPC framework”, so I became interested in implementing it again, and I felt there were many optimization conveniences to improve it.

The main changes are as follows:

  • In addition to Java serialization protocols, protobuf and Kryo serialization protocols have been added, ready-to-use configuration.
  • Add a variety of load balancing algorithms (random, polling, weighted polling, and smooth-weighted polling).
  • The local service list cache is added on the client to improve performance.
  • Fixed memory leak caused by Netty in high concurrency
  • Instead of establishing a connection once per request, TCP long connections are established and reused multiple times.
  • The server adds thread pools to improve message processing capabilities

Second, the introduction

RPC, or Remote Procedure Call, invokes services on a Remote computer as if they were local. RPC can decouple the system well. For example, WebService is a KIND of RPC based on Http protocol.

In general, there are the following steps:

  1. When the client (ServerA) executes a remote method, the Client stub is called to pass information such as the class name, method name, and parameters.
  2. The Client stub serializes parameters and other information into a binary stream and sends it to the ServerB via Sockect.
  3. After the server receives the packet, the Server Stub resolves and deserializes the packet into the class name, method name, and parameters.
  4. The Server Stub invokes the corresponding local method and returns the execution result to the client

So an RPC framework has the following roles:

Service consumer

The caller of the remote method, the client. A service can be either a consumer or a provider.

Service provider

The provider of the remote service, namely the server. A service can be either a consumer or a provider.

The registry

Save information such as the service address of the service provider, which is generally implemented by ZooKeeper and Redis.

Monitoring O&M (Optional)

Monitor the response time of interfaces, count the number of requests, discover system problems in time, and send alarm notifications.

Three, implementation,

The RPC framework rpc-spring-boot-starter involves the following technical stacks:

  • Use ZooKeeper as the registry
  • Use Netty as the communication framework
  • Message codec: Protostuff, Kryo, Java
  • spring
  • Use SPI to dynamically select load balancing algorithms based on configuration, etc

Due to too much code, I’ll cover only a few changes here.

3.1 Dynamic load balancing algorithm

1. Write an implementation class for LoadBalance

2. Custom annotation @loadBalanceano

/** * load Balancing annotation */ @target (elementtype.type) @Retention(retentionPolicy.runtime) @documented Public @Interface LoadBalanceAno { String value() default ""; } /** ** LoadBalanceAno(rpcconstant.balance_round) public class FullRoundBalance implements LoadBalance { private static Logger logger = LoggerFactory.getLogger(FullRoundBalance.class); private volatile int index; @override public synchronized Service chooseOne(List<Service> services) {Override public synchronized Service chooseOne(List<Service> services) Size () if (index == services.size()) {index = 0; } return services.get(index++); }}Copy the code

3. Create the meta-INF/Servers folder in the resource directory and create the file

4.RpcConfig adds the loadBalance configuration item

/** * @author 2YSP * @date 2020/7/26 15:13 */ @ConfigurationProperties(prefix = "sp.rpc") public class RpcConfig { /** * Private String registerAddress = "127.0.0.1:2181"; private String registerAddress = "127.0.0.1:2181"; /** * Private Integer serverPort = 9999; /** * private String protocol = "Java "; /** * Load Balancing algorithm */ private String loadBalance = "random"; /** * private Integer weight = 1; // omit the getter setter}Copy the code

5. In the RpcAutoConfiguration class, select the algorithm implementation class based on the configuration

/** * use SPI to match the configured load balancing algorithm ** @param name * @return */ private LoadBalance getLoadBalance(String name) { ServiceLoader<LoadBalance> loader = ServiceLoader.load(LoadBalance.class); Iterator<LoadBalance> iterator = loader.iterator(); while (iterator.hasNext()) { LoadBalance loadBalance = iterator.next(); LoadBalanceAno ano = loadBalance.getClass().getAnnotation(LoadBalanceAno.class); Assert.notNull(ano, "load balance name can not be empty!" ); if (name.equals(ano.value())) { return loadBalance; } } throw new RpcException("invalid load balance config"); } @Bean public ClientProxyFactory proxyFactory(@Autowired RpcConfig rpcConfig) { ClientProxyFactory clientProxyFactory =  new ClientProxyFactory(); / / set the service discovery clientProxyFactory. SetServerDiscovery (new ZookeeperServerDiscovery (rpcConfig. GetRegisterAddress ())); / / set the supported protocols Map < String, MessageProtocol > supportMessageProtocols = buildSupportMessageProtocols (); clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols); // Set the load balancing algorithm LoadBalance LoadBalance = getLoadBalance(rpcconfig.getloadBalance ()); clientProxyFactory.setLoadBalance(loadBalance); / / set the network layer to achieve clientProxyFactory setNetClient (new NettyNetClient ()); return clientProxyFactory; }Copy the code

3.2 Cache of local Service list

Use maps to cache data

Public class ServerDiscoveryCache {/** * key: serviceName */ private static final Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>(); Public static final List<String> SERVICE_CLASS_NAMES = new ArrayList<>(); service class */ public static final List<String> SERVICE_CLASS_NAMES = new ArrayList<>(); public static void put(String serviceName, List<Service> serviceList) { SERVER_MAP.put(serviceName, serviceList); } @param serviceName * @param service */ public static void remove(String serviceName, String serviceName, String serviceName) Service service) { SERVER_MAP.computeIfPresent(serviceName, (key, value) -> value.stream().filter(o -> ! o.toString().equals(service.toString())).collect(Collectors.toList()) ); } public static void removeAll(String serviceName) { SERVER_MAP.remove(serviceName); } public static boolean isEmpty(String serviceName) { return SERVER_MAP.get(serviceName) == null || SERVER_MAP.get(serviceName).size() == 0; } public static List<Service> get(String serviceName) { return SERVER_MAP.get(serviceName); }}Copy the code

ClientProxyFactory: check the local cache. Check zooKeeper in the cache.

* @param serviceName * @return */ private List<Service> getServiceList(String serviceName) { List<Service> services; synchronized (serviceName){ if (ServerDiscoveryCache.isEmpty(serviceName)) { services = serverDiscovery.findServiceList(serviceName); if (services == null || services.size() == 0) { throw new RpcException("No provider available!" ); } ServerDiscoveryCache.put(serviceName, services); } else { services = ServerDiscoveryCache.get(serviceName); } } return services; }Copy the code

Problem: If the server goes offline due to downtime or network problems and the cache is still there, the client requests to the server are already unavailable, increasing the request failure rate. ** Solution: ** Since the server is registered as a temporary node, the node will be removed if the server goes offline. As long as you listen to the zooKeeper child node, if the child node is added or deleted, you can directly clear the local cache.

DefaultRpcProcessor

/** * Rpc handler, support service startup exposure, Service * @author 2YSP * @date 2020/7/26 14:46 */ public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { // Spring start after will receive a notification if (Objects. IsNull (event. GetApplicationContext (). The getParent ())) {ApplicationContext context = event.getApplicationContext(); // Start the service startServer(context); // injectService injectService(context); } } private void injectService(ApplicationContext context) { String[] names = context.getBeanDefinitionNames(); for(String name : names){ Class<? > clazz = context.getType(name); if (Objects.isNull(clazz)){ continue; } Field[] declaredFields = clazz.getDeclaredFields(); for(Field field : InjectService InjectService = field.getannotation (injectService.class); if (injectService == null){ continue; } Class<? > fieldClass = field.getType(); Object object = context.getBean(name); field.setAccessible(true); try { field.set(object,clientProxyFactory.getProxy(fieldClass)); } catch (IllegalAccessException e) { e.printStackTrace(); } / / add a local service cache ServerDiscoveryCache SERVICE_CLASS_NAMES. Add (fieldClass. The getName ()); }} / / registered child nodes to monitor the if (clientProxyFactory. GetServerDiscovery () instanceof ZookeeperServerDiscovery) { ZookeeperServerDiscovery serverDiscovery = (ZookeeperServerDiscovery) clientProxyFactory.getServerDiscovery(); ZkClient zkClient = serverDiscovery.getZkClient(); ServerDiscoveryCache.SERVICE_CLASS_NAMES.forEach(name ->{ String servicePath = RpcConstant.ZK_SERVICE_PATH + RpcConstant.PATH_DELIMITER + name + "/service"; zkClient.subscribeChildChanges(servicePath, new ZkChildListenerImpl()); }); logger.info("subscribe service zk node successfully"); } } private void startServer(ApplicationContext context) { ... }}Copy the code

ZkChildListenerImpl

/** * public class ZkChildListenerImpl implements IZkChildListener {private static Logger Logger = LoggerFactory.getLogger(ZkChildListenerImpl.class); /** * Listen for child node deletion and new events * @param parentPath/RPC /serviceName/service * @param childList * @throws Exception */ @override public void handleChildChange(String parentPath, List<String> childList) throws Exception { logger.debug("Child change parentPath:[{}] -- childList:[{}]", parentPath, childList); String[] arr = parentpath.split ("/"); String[] arr = parentpath.split ("/"); ServerDiscoveryCache.removeAll(arr[2]); }}Copy the code

3.3nettyClient Supports TCP Long Connections

This part has the most changes, starting with the new sendRequest interface.

The implementation class NettyNetClient

/** * @author 2YSP * @date 2020/7/25 20:12 */ public class NettyNetClient implements NetClient { private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); private static ExecutorService threadPool = new ThreadPoolExecutor(4, 10, 200, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("rpcClient-%d") .build()); private EventLoopGroup loopGroup = new NioEventLoopGroup(4); /** * Connected service cache * key: Service address, format: IP :port */ public static Map<String, SendHandlerV2> connectedServerNodes = new ConcurrentHashMap<>(); @Override public byte[] sendRequest(byte[] data, Service service) throws InterruptedException { .... return respData; } @Override public RpcResponse sendRequest(RpcRequest rpcRequest, Service service, MessageProtocol messageProtocol) { String address = service.getAddress(); synchronized (address) { if (connectedServerNodes.containsKey(address)) { SendHandlerV2 handler = connectedServerNodes.get(address); Logger. info(" Use existing connection "); return handler.sendRequest(rpcRequest); } String[] addrInfo = address.split(":"); final String serverAddress = addrInfo[0]; final String serverPort = addrInfo[1]; final SendHandlerV2 handler = new SendHandlerV2(messageProtocol, address); Threadpool.submit (() -> {// configure the client Bootstrap b = new Bootstrap(); b.group(loopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline .addLast(handler); }}); ChannelFuture = b.connect(serverAddress, integer.parseInt (serverPort)); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { connectedServerNodes.put(address, handler); }}); }); Logger. info(" Use the new connection..." ); return handler.sendRequest(rpcRequest); }}}Copy the code

SendRequest () is called for each request, and a TCP long connection is created asynchronously with the thread pool and the server. After the connection is successful, SendHandlerV2 is cached in ConcurrentHashMap for easy reuse. If the request address (IP +port) for subsequent requests is present in connectedServerNodes, the connection is not re-established using the Handler in connectedServerNodes.

SendHandlerV2

/** * @author 2YSP * @date 2020/8/19 20:06 */ public class SendHandlerV2 extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(SendHandlerV2.class); Static final int CHANNEL_WAIT_TIME = 4; static final int CHANNEL_WAIT_TIME = 4; Static final int RESPONSE_WAIT_TIME = 8; private volatile Channel channel; private String remoteAddress; private static Map<String, RpcFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>(); private MessageProtocol messageProtocol; private CountDownLatch latch = new CountDownLatch(1); public SendHandlerV2(MessageProtocol messageProtocol,String remoteAddress) { this.messageProtocol = messageProtocol; this.remoteAddress = remoteAddress; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { this.channel = ctx.channel(); latch.countDown(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.debug("Connect to server successfully:{}", ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.debug("Client reads message:{}", msg); ByteBuf byteBuf = (ByteBuf) msg; byte[] resp = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(resp); / / manual recovery ReferenceCountUtil. Release (byteBuf); RpcResponse response = messageProtocol.unmarshallingResponse(resp); RpcFuture<RpcResponse> future = requestMap.get(response.getRequestId()); future.setResponse(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); logger.error("Exception occurred:{}", cause.getMessage()); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); logger.error("channel inactive with remoteAddress:[{}]",remoteAddress); NettyNetClient.connectedServerNodes.remove(remoteAddress); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } public RpcResponse sendRequest(RpcRequest request) { RpcResponse response; RpcFuture<RpcResponse> future = new RpcFuture<>(); requestMap.put(request.getRequestId(), future); try { byte[] data = messageProtocol.marshallingRequest(request); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); if (latch.await(CHANNEL_WAIT_TIME,TimeUnit.SECONDS)){ channel.writeAndFlush(reqBuf); Response = future.get(RESPONSE_WAIT_TIME, timeunit.seconds); }else { throw new RpcException("establish channel time out"); } } catch (Exception e) { throw new RpcException(e.getMessage()); } finally { requestMap.remove(request.getRequestId()); } return response; }}Copy the code

RpcFuture

package cn.sp.rpc.client.net; import java.util.concurrent.*; /** * @author 2YSP * @date 2020/8/19 22:31 */ public class RpcFuture<T> implements Future<T> { private T response; /** * Private CountDownLatch CountDownLatch = new CountDownLatch(1); Private Long beginTime = System.currentTimemillis (); private Long beginTime = System.currentTimemillis (); @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { if (response ! = null) { return true; } return false; } /** * get the response, * @return * @throws InterruptedException * @throws ExecutionException */ @Override public T get() throws InterruptedException, ExecutionException { countDownLatch.await(); return response; } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (countDownLatch.await(timeout,unit)){ return response; } return null; } public void setResponse(T response) { this.response = response; countDownLatch.countDown(); } public long getBeginTime() { return beginTime; }}Copy the code

When SendHandlerV2#sendRequest() is executed for the first time, the channel needs to wait for the channel to be established before sending the request. Therefore, CountDownLatch is used to control the request and wait for the channel to be established. The RpcRequest object will generate a unique identifier for the request requestId when it is created. The RpcFuture will be cached in the requestMap before sending the request. The key is requestId. After reading the response information from the server (channelRead method), the response result is put into the corresponding RpcFuture. The SendHandlerV2#channelInactive() method clears the cache of the corresponding serverNode if the connected server is abnormally disconnected.

4. Stress test

Test environment:

  • Intel® Core™ I5-6300HQ CPU @ 2.30ghz 4-core
  • Windows10 home (64-bit)
  • 16 gb of memory

Locally start ZooKeeper. 2. Locally start one consumer, two servers, and polling algorithm 3. Stress test with AB, 10000 requests sent from 4 threads

ab -c 4 -n 10000 http://localhost:8080/test/user? id=1Copy the code

Test results:

As you can see from the picture, 10,000 requests took only 11s, more than 10 times less than the previous 130+ seconds.