Today we are going to make a small dish, this dish is RPC communication framework. It uses Netty as raw material, FastJSON serialization tool as seasoning, to achieve a minimalist multi-threaded RPC service framework.

Let’s tentatively name the RPC framework RPCKids.

Eating guide

Before giving the reader the full recipe, let’s try it out and see if it tastes good and is easy to eat. If the reader finds it disgusting, the recipe behind it won’t make much sense. Why bother learning how to make a terrible dish that no one likes to eat?

In this example, I will use the remote RPC service provided by RPCKids to calculate Fibonacci numbers and exponents. The client sends parameters to the remote service through the RPC client provided by RPCKids, receives the returned results, and then renders them. You can customize any business RPC service using RPCKids.

Fibonacci numbers are simple inputs and outputs, one Integer and one Long. The exponential input has two values, and the output contains the calculation time in nanoseconds in addition to the result. The time is included only to render a complete custom input and output class.

Index service custom input and output classes

// Input to the exponential RPC
public class ExpRequest {
	private int base;
	private int exp;
    
    // constructor & getter & setter
}

// Output of exponential RPC
public class ExpResponse {

	private long value;
	private long costInNanos;

	// constructor & getter & setter
}
Copy the code

Fibonacci and exponential computation processing

public class FibRequestHandler implements IMessageHandler<Integer> {

	private List<Long> fibs = new ArrayList<>();

	{
		fibs.add(1L); // fib(0) = 1
		fibs.add(1L); // fib(1) = 1
	}

	@Override
	public void handle(ChannelHandlerContext ctx, String requestId, Integer n) {
		for (int i = fibs.size(); i < n + 1; i++) {
			long value = fibs.get(i - 2) + fibs.get(i - 1);
			fibs.add(value);
		}
		// Outputs the response
		ctx.writeAndFlush(new MessageOutput(requestId, "fib_res", fibs.get(n))); }}public class ExpRequestHandler implements IMessageHandler<ExpRequest> {

	@Override
	public void handle(ChannelHandlerContext ctx, String requestId, ExpRequest message) {
		int base = message.getBase();
		int exp = message.getExp();
		long start = System.nanoTime();
		long res = 1;
		for (int i = 0; i < exp; i++) {
			res *= base;
		}
		long cost = System.nanoTime() - start;
		// Outputs the response
		ctx.writeAndFlush(new MessageOutput(requestId, "exp_res".newExpResponse(res, cost))); }}Copy the code

Building the RPC Server

The RPC service class listens on the specified IP port, sets the number of IO threads and the number of business computation threads, and registers the Fibonacci service input class and exponential service input class, along with the corresponding computation processor.

public class DemoServer {

	public static void main(String[] args) {
		RPCServer server = new RPCServer("localhost".8888.2.16);
		server.service("fib", Integer.class, new FibRequestHandler())
			  .service("exp", ExpRequest.class, newExpRequestHandler()); server.start(); }}Copy the code

Build the RPC client

The RPC client connects to the remote IP port, registers the service output class (RPC response class), and then invoks the Fibonacci service and the index service 20 times, respectively, to output the results

public class DemoClient { private RPCClient client; public DemoClient(RPCClient client) { this.client = client; // Register service return type this.client.rpc("fib_res", Long.class).rpc("exp_res", ExpResponse.class);
	}

	public long fib(int n) {
		return (Long) client.send("fib", n);
	}

	public ExpResponse exp(int base, int exp) {
		return (ExpResponse) client.send("exp", new ExpRequest(base, exp));
	}

	public static void main(String[] args) {
		RPCClient client = new RPCClient("localhost", 8888);
		DemoClient demo = new DemoClient(client);
		for (int i = 0; i < 20; i++) {
			System.out.printf("fib(%d) = %d\n", i, demo.fib(i));
		}
		for (int i = 0; i < 20; i++) {
			ExpResponse res = demo.exp(2, i);
			System.out.printf("exp2(%d) = %d cost=%dns\n", i, res.getValue(), res.getCostInNanos()); }}}Copy the code

run

First run the server, the server output is as follows, you can see from the log that the client link came, then send a series of messages, and finally close the link and go.

server started @ localhost:8888
connection comes
read a message
read a message
...
connection leaves
Copy the code

When you run the client again, you can see that some columns of computed results have been successfully output.

fib(0) = 1
fib(1) = 1
fib(2) = 2
fib(3) = 3
fib(4) = 5
...
exp2(0) = 1 cost=559ns
exp2(1) = 2 cost=495ns
exp2(2) = 4 cost=524ns
exp2(3) = 8 cost=640ns
exp2(4) = 16 cost=711ns
...

Copy the code

complaints

I thought it was a piece of cake, but it took nearly a day to write the complete code and article. I felt that writing the code was much more time-consuming than cooking. Because it is only for teaching purposes, there are a lot of details that have not been carefully carved. If you want to do an open source project and try to be perfect. There are at least a few things to consider.

  1. Client connection pool
  2. Load balancing of multiple server processes
  3. Log output
  4. Parameter verification, exception handling
  5. Client traffic attacks
  6. Server pressure limit

If GRPC is anything to go by, you also have to implement streaming response processing. If you want to save network traffic, you need to work on the protocol. I’ll leave that to the reader.

Follow the public account “code hole” and send “RPC” to get the GitHub open source code link for the complete recipe above. If readers have any questions, the cave owner will answer them one by one.

Let’s move on to the elaborate manufacturing process of RPC server and client

Server recipe

Defines the message input and output format, the message type, the unique ID of the message, and the json serialized string content of the message. The message unique ID is used by the client to verify that the server request and response match.

public class MessageInput {
	private String type;
	private String requestId;
	private String payload;

	public MessageInput(String type, String requestId, String payload) {
		this.type = type;
		this.requestId = requestId;
		this.payload = payload;
	}

	public String getType(a) {
		return type;
	}

	public String getRequestId(a) {
		return requestId;
	}
    
    // Since we want to get the object directly, we need to provide the object's type parameter
	public <T> T getPayload(Class<T> clazz) {
		if (payload == null) {
			return null;
		}
		returnJSON.parseObject(payload, clazz); }}public class MessageOutput {

	private String requestId;
	private String type;
	private Object payload;

	public MessageOutput(String requestId, String type, Object payload) {
		this.requestId = requestId;
		this.type = type;
		this.payload = payload;
	}

	public String getType(a) {
		return this.type;
	}

	public String getRequestId(a) {
		return requestId;
	}

	public Object getPayload(a) {
		returnpayload; }}Copy the code

Message decoder, using Netty’s ReplayingDecoder implementation. To keep things simple, we don’t use checkpoint to optimize performance. If you’re interested, check out my previous post on wechat and add your own checkpoint logic.

public class MessageDecoder extends ReplayingDecoder<MessageInput> {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		String requestId = readStr(in);
		String type = readStr(in);
		String content = readStr(in);
		out.add(new MessageInput(type, requestId, content));
	}

	private String readStr(ByteBuf in) {
		// String length first byte array, unified UTF8 encoding
		int len = in.readInt();
		if (len < 0 || len > (1 << 20)) {
			throw new DecoderException("string too long len=" + len);
		}
		byte[] bytes = new byte[len];
		in.readBytes(bytes);
		return newString(bytes, Charsets.UTF8); }}Copy the code

Message handler interface, each custom service must implement the Handle method

public interface IMessageHandler<T> {

	void handle(ChannelHandlerContext ctx, String requestId, T message);

}

// The default handler is used for messages that cannot be found
public class DefaultHandler implements IMessageHandler<MessageInput> {

	@Override
	public void handle(ChannelHandlerContext ctx, String requesetId, MessageInput input) {
		System.out.println("unrecognized message type=" + input.getType() + " comes"); }}Copy the code

Message type registries and message processor registries, both of which use static fields and methods, are actually for convenience, but it might be more elegant to write non-static ones.

public class MessageRegistry {
	private staticMap<String, Class<? >> clazzes =new HashMap<>();

	public static void register(String type, Class
        clazz) {
		clazzes.put(type, clazz);
	}

	public staticClass<? > get(String type) {returnclazzes.get(type); }}public class MessageHandlers {

	private staticMap<String, IMessageHandler<? >> handlers =new HashMap<>();
	public static DefaultHandler defaultHandler = new DefaultHandler();

	public static void register(String type, IMessageHandler
        handler) {
		handlers.put(type, handler);
	}

	public staticIMessageHandler<? > get(String type) { IMessageHandler<? > handler = handlers.get(type);returnhandler; }}Copy the code

The encoder that responds to the message is relatively simple

@Sharable
public class MessageEncoder extends MessageToMessageEncoder<MessageOutput> {

	@Override
	protected void encode(ChannelHandlerContext ctx, MessageOutput msg, List<Object> out) throws Exception {
		ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();
		writeStr(buf, msg.getRequestId());
		writeStr(buf, msg.getType());
		writeStr(buf, JSON.toJSONString(msg.getPayload()));
		out.add(buf);
	}

	private void writeStr(ByteBuf buf, String s) { buf.writeInt(s.length()); buf.writeBytes(s.getBytes(Charsets.UTF8)); }}Copy the code

Now, the key step is to build a complete RPC server framework by putting the above small modules together. Here you need to have the necessary basic knowledge of Netty, need to write Netty event callback classes and service building classes.

@Sharable
public class MessageCollector extends ChannelInboundHandlerAdapter {
    // Business thread pool
	private ThreadPoolExecutor executor;

	public MessageCollector(int workerThreads) {
		// The maximum number of service queues is 1000 to avoid accumulation
		// If the child thread can't handle it, the IO thread will also process the business logic (callerRunsPolicy).
		BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
		// Name the business thread
		ThreadFactory factory = new ThreadFactory() {

			AtomicInteger seq = new AtomicInteger();

			@Override
			public Thread newThread(Runnable r) {
				Thread t = new Thread(r);
				t.setName("rpc-" + seq.getAndIncrement());
				returnt; }};// Threads that have been idle for more than 30 seconds are automatically destroyed
		this.executor = new ThreadPoolExecutor(1, workerThreads, 30, TimeUnit.SECONDS, queue, factory,
				new CallerRunsPolicy());
	}

	public void closeGracefully(a) {
		// Close gracefully, notify first, wait, and force the close
		this.executor.shutdown();
		try {
			this.executor.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
		}
		this.executor.shutdownNow();
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// A new link is coming from the client
		System.out.println("connection comes");
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		// The client is gone
		System.out.println("connection leaves");
		ctx.close();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof MessageInput) {
			System.out.println("read a message");
			// Use a business thread pool to process messages
			this.executor.execute(() -> {
				this.handleMessage(ctx, (MessageInput) msg); }); }}private void handleMessage(ChannelHandlerContext ctx, MessageInput input) {
		// The business logic is hereClass<? > clazz = MessageRegistry.get(input.getType());if (clazz == null) {
			// Unregistered messages are processed with the default handler
			MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input);
			return;
		}
		Object o = input.getPayload(clazz);
		// There is a flaw in xiao Xian's code. The code looks ugly, but the chef says that he is not talented enough
		// Readers can figure it out for themselves if they are interested
		@SuppressWarnings("unchecked")
		IMessageHandler<Object> handler = (IMessageHandler<Object>) MessageHandlers.get(input.getType());
		if(handler ! =null) {
			handler.handle(ctx, input.getRequestId(), o);
		} else {
			// Use the default handlerMessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input); }}@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// There may be an unexpected client machine restart
		// ReadTimeoutHandler throws an exception when the client connection is idle
		// There may be a message protocol error, serialization exception
		// etc.
		// Regardless of it, all links are closed, the client has a reconnection mechanism anyway
		System.out.println("connection error"); cause.printStackTrace(); ctx.close(); }}public class RPCServer {

	private String ip;
	private int port;
	private int ioThreads; // The read-write thread used to process the network flow
	private int workerThreads; // Compute thread for business processing

	public RPCServer(String ip, int port, int ioThreads, int workerThreads) {
		this.ip = ip;
		this.port = port;
		this.ioThreads = ioThreads;
		this.workerThreads = workerThreads;
	}

	private ServerBootstrap bootstrap;
	private EventLoopGroup group;
	private MessageCollector collector;
	private Channel serverChannel;

    // A shortcut to register the service
	public RPCServer service(String type, Class
        reqClass, IMessageHandler
        handler) {
		MessageRegistry.register(type, reqClass);
		MessageHandlers.register(type, handler);
		return this;
	}

    // Start the RPC service
	public void start(a) {
		bootstrap = new ServerBootstrap();
		group = new NioEventLoopGroup(ioThreads);
		bootstrap.group(group);
		collector = new MessageCollector(workerThreads);
		MessageEncoder encoder = new MessageEncoder();
		bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipe = ch.pipeline();
				// If there is no request from the client for 60 seconds, the client link is closed
				pipe.addLast(new ReadTimeoutHandler(60));
				// Hang the decoder
				pipe.addLast(new MessageDecoder());
				// Attach the encoder
				pipe.addLast(encoder);
				// Put the business processor lastpipe.addLast(collector); }}); bootstrap.option(ChannelOption.SO_BACKLOG,100)  // The client suite word accepts the queue size
		         .option(ChannelOption.SO_REUSEADDR, true) Reuse addr to avoid port conflicts
		         .option(ChannelOption.TCP_NODELAY, true) // Turn off small stream merging to ensure timeliness of messages
		         .childOption(ChannelOption.SO_KEEPALIVE, true); // Links that have not been active for a long time are closed automatically
		serverChannel = bootstrap.bind(this.ip, this.port).channel();
		System.out.printf("server started @ %s:%d\n", ip, port);
	}

	public void stop(a) {
		// Close the server suite word
		serverChannel.close();
		// Cut off the message source and stop the IO thread pool
		group.shutdownGracefully();
		// Finally stop the business threadcollector.closeGracefully(); }}Copy the code

Above is the complete server recipe, more code, if the reader does not have Netty foundation, may be dazzled. If you don’t use the JDK Executors framework often, it can be a tough read. If readers need relevant learning materials, they can ask me for them.

Client menu

NIO is used by the server, and NIO can be used by the client, but it is not necessary, and synchronous socket implementation is not a problem. More importantly, the synchronized code is short and easy to understand. So for simplicity, synchronous IO is used.

Define RPC request objects and response objects, corresponding to the server.

public class RPCRequest {

	private String requestId;
	private String type;
	private Object payload;

	public RPCRequest(String requestId, String type, Object payload) {
		this.requestId = requestId;
		this.type = type;
		this.payload = payload;
	}

	public String getRequestId(a) {
		return requestId;
	}

	public String getType(a) {
		return type;
	}

	public Object getPayload(a) {
		returnpayload; }}public class RPCResponse {

	private String requestId;
	private String type;
	private Object payload;

	public RPCResponse(String requestId, String type, Object payload) {
		this.requestId = requestId;
		this.type = type;
		this.payload = payload;
	}

	public String getRequestId(a) {
		return requestId;
	}

	public void setRequestId(String requestId) {
		this.requestId = requestId;
	}

	public String getType(a) {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public Object getPayload(a) {
		return payload;
	}

	public void setPayload(Object payload) {
		this.payload = payload; }}Copy the code

Defines client exceptions that are used to uniformly throw RPC errors

public class RPCException extends RuntimeException { private static final long serialVersionUID = 1L; public RPCException(String message, Throwable cause) { super(message, cause); } public RPCException(String message) { super(message); } public RPCException(Throwable cause) { super(cause); }}Copy the code

Request ID generator, simple UUID64

public class RequestId {

	public static String next() {
		returnUUID.randomUUID().toString(); }}Copy the code

Response type registry, corresponding to the server

public class ResponseRegistry { private static Map<String, Class<? >> clazzes = new HashMap<>(); public static void register(Stringtype, Class<? > clazz) { clazzes.put(type, clazz); } public static Class<? > get(Stringtype) {
		return clazzes.get(type); }}Copy the code

Ok, so let’s move on to the client side. Link management, reading and writing messages, and link reconnection are all here

public class RPCClient { private String ip; private int port; private Socket sock; private DataInputStream input; private OutputStream output; public RPCClient(String ip, int port) { this.ip = ip; this.port = port; } public void connect() throws IOException { SocketAddress addr = new InetSocketAddress(ip, port); sock = new Socket(); sock.connect(addr, 5000); // 5s timeout input = new DataInputStream(sock.getinputStream ()); output = sock.getOutputStream(); } public voidclose() {// close the link try {sock.close(); sock = null; input = null; output = null; } catch (IOException e) { } } public Object send(Stringtype, Object payload) {// Common RPC request, get response try {return this.sendInternal(type, payload, false);
		} catch (IOException e) {
			throw new RPCException(e);
		}
	}

	public RPCClient rpc(String type, Class<? > clazz) {/ / registered RPC response type quick entry ResponseRegistry register (type, clazz);
		return this;
	}

	public void cast(String type, Object payload) {// This. SendInternal (type, payload, true);
		} catch (IOException e) {
			throw new RPCException(e);
		}
	}

	private Object sendInternal(String type, Object payload, boolean cast) throws IOException {
		if (output == null) {
			connect();
		}
		String requestId = RequestId.next();
		ByteArrayOutputStream bytes = new ByteArrayOutputStream();
		DataOutputStream buf = new DataOutputStream(bytes);
		writeStr(buf, requestId);
		writeStr(buf, type); writeStr(buf, JSON.toJSONString(payload)); buf.flush(); byte[] fullLoad = bytes.toByteArray(); Try {// Send the request output.write(fullLoad); } catch (IOException e) {close(); connect(); output.write(fullLoad); }if(! Cast) {// RPC common request, immediate response String reqId =readStr(); // Verify that the request ID matchesif(! requestId.equals(reqId)) { close(); throw new RPCException("request id mismatch");
			}
			String typ = readStr(); Class<? > clazz = ResponseRegistry.get(typ); // Response types must be pre-registeredif (clazz == null) {
				throw new RPCException("unrecognized rpc response type="+ typ); } // deserialize json String String payld =readStr();
			Object res = JSON.parseObject(payld, clazz);
			return res;
		}
		return null;
	}

	private String readStr() throws IOException {
		int len = input.readInt();
		byte[] bytes = new byte[len];
		input.readFully(bytes);
		returnnew String(bytes, Charsets.UTF8); } private void writeStr(DataOutputStream out, String s) throws IOException { out.writeInt(s.length()); out.write(s.getBytes(Charsets.UTF8)); }}Copy the code

Complaining is repeat

I thought it was a piece of cake, but it took nearly a day to write the complete code and article. I felt that writing the code was much more time-consuming than cooking. Because it is only for teaching purposes, there are a lot of details that have not been carefully carved. If you want to do an open source project and try to be perfect. There are at least a few things to consider.

  1. Client connection pool
  2. Load balancing of multiple server processes
  3. Log output
  4. Parameter verification, exception handling
  5. Client traffic attacks
  6. Server pressure limit

If GRPC is anything to go by, you also have to implement streaming response processing. If you want to save network traffic, you need to work on the protocol. I’ll leave that to the reader.

Follow the public account “code hole” and send “RPC” to get the GitHub open source code link for the complete recipe above. If readers have any questions, the cave owner will answer them one by one.