instructions

Java handwritten RPC (01) based socket implementation from scratch

Java hand-written RPC (02) -Netty4 implements both client and server side from scratch

After writing the client and server, how to implement the client and server invocation?

Let’s take a look.

The interface definition

Calculation method

package com.github.houbb.rpc.common.service;

import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;

/** * <p> Computing Services Interface </p> ** <pre> Created: 2018/8/24 PM </pre> * <pre> Project: fake </pre> **@author houbinbin
 * @since0.0.1 * /
public interface Calculator {

    /** * computes addition *@paramRequest Request entry parameter *@returnReturns the result */
    CalculateResponse sum(final CalculateRequest request);

}
Copy the code

pojo

Corresponding parameter object:

  • CalculateRequest
package com.github.houbb.rpc.common.model;

import java.io.Serializable;

/ * * * < p > the request into the parameter < / p > * * < pre > Created: 2018/8/24 5:05 PM < / pre > * < pre > Project: fake < / pre > * *@author houbinbin
 * @since0.0.3 * /
public class CalculateRequest implements Serializable {

    private static final long serialVersionUID = 6420751004355300996L;

    /** ** /
    private int one;

    /** ** /
    private int two;

    public CalculateRequest(a) {}public CalculateRequest(int one, int two) {
        this.one = one;
        this.two = two;
    }

    //getter setter toString

}
Copy the code
  • CalculateResponse
package com.github.houbb.rpc.common.model;

import java.io.Serializable;

/ * * * < p > the request into the parameter < / p > * * < pre > Created: 2018/8/24 5:05 PM < / pre > * < pre > Project: fake < / pre > * *@author houbinbin
 * @since0.0.3 * /
public class CalculateResponse implements Serializable {

    private static final long serialVersionUID = -1972014736222511341L;

    /** * Whether successful */
   private boolean success;

    /** ** the sum of the two */
   private int sum;

    public CalculateResponse(a) {}public CalculateResponse(boolean success, int sum) {
        this.success = success;
        this.sum = sum;
    }

    //getter setter toString
}
Copy the code

The client

The core part of the

RpcClient needs to add the corresponding Handler as follows:

Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .handler(new ChannelInitializer<Channel>(){
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new LoggingHandler(LogLevel.INFO))
                        .addLast(new CalculateRequestEncoder())
                        .addLast(new CalculateResponseDecoder())
                        .addLast(new RpcClientHandler());
            }
        })
        .connect(RpcConstant.ADDRESS, port)
        .syncUninterruptibly();
Copy the code

The handler swimlanes in Netty are elegantly designed so that our code can be extended with great flexibility.

Let’s look at the corresponding implementation.

RpcClientHandler

package com.github.houbb.rpc.client.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/** * <p> Client processing class </p> ** <pre> Created: 2019/10/1611:30pm </pre> * <pre> Project: RPC </pre> **@author houbinbin
 * @sinceHundreds * /
public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        CalculateRequest request = new CalculateRequest(1.2);

        ctx.writeAndFlush(request);
        log.info("[Client] request is :{}", request);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        CalculateResponse response = (CalculateResponse)msg;
        log.info("[Client] response is :{}", response); }}Copy the code

This is easier. In channelActive we call directly, and the input object is fixed here for simplicity.

ChannelRead0 listens to the corresponding results of the server and generates logs.

CalculateRequestEncoder

The request parameter is an object that netty cannot transfer directly, so we convert it to a basic object:

package com.github.houbb.rpc.client.encoder;

import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/ * * *@author binbin.hou
 * @since0.0.3 * /
public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
        int one = msg.getOne();
        inttwo = msg.getTwo(); out.writeInt(one); out.writeInt(two); }}Copy the code

CalculateResponseDecoder

The same is true for the server response.

We need to convert the basic type encapsulation into the object that we need.

package com.github.houbb.rpc.client.decoder;

import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/** * Response parameter decoding *@author binbin.hou
 * @since0.0.3 * /
public class CalculateResponseDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        boolean success = in.readBoolean();
        int sum = in.readInt();

        CalculateResponse response = newCalculateResponse(success, sum); out.add(response); }}Copy the code

The service side

Setup handler class

The processing classes in RpcServer will be tweaked slightly, but everything else will remain the same.

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
        .channel(NioServerSocketChannel.class)
        // Prints logs
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new CalculateRequestDecoder())
                        .addLast(new CalculateResponseEncoder())
                        .addLast(newRpcServerHandler()); }})// This parameter affects connections that have not yet been accepted
        .option(ChannelOption.SO_BACKLOG, 128)
        // The server will send an ACK packet to determine if the client is still alive after a period of time when the client does not respond.
        .childOption(ChannelOption.SO_KEEPALIVE, true);
Copy the code

RpcServerHandler

We started with an empty implementation, so let’s add the corresponding implementation.

package com.github.houbb.rpc.server.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/ * * *@author binbin.hou
 * @since0.0.1 * /
public class RpcServerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        final String id = ctx.channel().id().asLongText();

        CalculateRequest request = (CalculateRequest)msg;
        log.info("[Server] receive channel {} request: {} from ", id, request);

        Calculator calculator = new CalculatorService();
        CalculateResponse response = calculator.sum(request);

        // Write back to the client
        ctx.writeAndFlush(response);
        log.info("[Server] channel {} response {}", id, response); }}Copy the code

After reading the access to the client, we get the CalculateRequest of the calculated entry parameter, and then call the sum method to get the corresponding CalculateResponse and notify the client of the result.

CalculateRequestDecoder

There is a one-to-one correspondence with the client, and we first convert the basic types passed by Netty to CalculateRequest objects.

package com.github.houbb.rpc.server.decoder;

import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/** * Request parameter decoding *@author binbin.hou
 * @since0.0.3 * /
public class CalculateRequestDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int one = in.readInt();
        int two = in.readInt();

        CalculateRequest request = newCalculateRequest(one, two); out.add(request); }}Copy the code

CalculateResponseEncoder

Here, similar to the client, we need to convert the response to a basic type for network transmission.

package com.github.houbb.rpc.server.encoder;

import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/ * * *@author binbin.hou
 * @since0.0.3 * /
public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
        boolean success = msg.isSuccess();
        intresult = msg.getSum(); out.writeBoolean(success); out.writeInt(result); }}Copy the code

CalculatorService

The corresponding implementation class of the server.

public class CalculatorService implements Calculator {

    @Override
    public CalculateResponse sum(CalculateRequest request) {
        int sum = request.getOne()+request.getTwo();

        return new CalculateResponse(true, sum); }}Copy the code

test

The service side

Start the server:

new RpcServer().start();
Copy the code

Server startup logs:

[the DEBUG] [the 2021-10-05 11:53:11. 795] [the main] [C.G.H.L.I.C.L ogFactory. SetImplementation] - Logging the initialized using 'class Com. Making. Houbb. Log. Integration. Adaptors. Stdout. StdOutExImpl 'adapter. [INFO] [the 2021-10-05 11:53:11. 807] [Thread - 0] [C.G.H.R.S.C.R pcServer. Run] xml-rpc service to start the service side On October 5, 2021 io.net ty 11:53:13 morning. Handler. Logging. LoggingHandler channelRegistered information: [id: 0 xd399474f] REGISTERED on October 5, 2021 io.net ty 11:53:13 morning. Handler. Logging. LoggingHandler bind information: [id: 0 xd399474f] bind: 0.0.0.0/0.0.0.0:05, 9527 October 2021 io.net ty 11:53:13 morning. Handler. Logging. LoggingHandler channelActive information: [id: 0xd399474f, L: / 0:0:0:0:0:0: ACTIVE 0:0:9 527] [INFO] [11:53:13 2021-10-05. 101] [Thread - 0] [C.G.H.R.S.C.R pcServer. Run] xml-rpc server startup is complete, Listen on port [9527]Copy the code

The client

Start the client:

new RpcClient().start();
Copy the code

The log is as follows:

[the DEBUG] [11:54:12 2021-10-05. 158] [the main] [C.G.H.L.I.C.L ogFactory. SetImplementation] - Logging the initialized using 'class Com. Making. Houbb. Log. Integration. Adaptors. Stdout. StdOutExImpl 'adapter. [INFO] [11:54:12 2021-10-05. 164] [Thread - 0] [C.G.H.R.C.C.R pcClient. Run] xml-rpc service to start the client On October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelRegistered information: [id: 0 x4d75c580] REGISTERED on October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler connect information: [id: 0 x4d75c580] CONNECT: / 127.0.0.1:05, 9527 October 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelActive information: [id: 0x4d75c580, L: / 127.0.0.1:54030 R: / 127.0.0.1:9527] the ACTIVE [INFO] [the 2021-10-05 11:54:13. 403] [Thread - 0] [C.G.H.R.C.C.R pcClient. Run] - RPC service start client completes, listen on port: 9527 October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler write information: [id: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527] WRITE: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 00000000 00 00 00 00 00 00 02 01 |... | + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 5, October 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler flush information: [id: 0x4d75c580, L:/ 127.0.0.1:54030-r :/127.0.0.1:9527] FLUSH [INFO] [2021-10-05 11:54:13.450] [nioEventLoopgroup-2-1] [C.G.H.R.C.C.R pcClient. ChannelActive] - [Client] request is: CalculateRequest {one = 1, two = 2} on October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelRead information: [id: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527] READ: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527] 5B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 00000000 01 00 00 00 03 |... | + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 5, October 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelReadComplete information: [id: 0x4d75c580, L:/ 127.0.0.1:54030-r :/127.0.0.1:9527] READ COMPLETE [INFO] [2021-10-05 11:54:13.508] [nioEventLoopgroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}Copy the code

As you can see, the corresponding request parameters and response results are printed out.

Of course, the server also has the corresponding new log:

October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelRead information: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9F5927, L:/127.0.0.1:9527 -r :/127.0.0.1:54030] 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelReadComplete information: [id: 0xd399474f, L:/ 0:0:0:0:0:0:0:0:9:927] READ COMPLETE [INFO] [2021-10-05 11:54:13.432] [nioEventLoopgroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cffFE360988-00001d34-00000001-2a80D950D8166c0C-bc9F5927 [INFO] [2021-10-05 11:54:13.495] [nioEventLoopgroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, Two = 2} the from [INFO] [the 2021-10-05 11:54:13. 505] [nioEventLoopGroup - 2-1] [C.G.H.R.S.H.R pcServerHandler. ChannelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}Copy the code

summary

In order to facilitate learning, the above source code has been open source:

github.com/houbb/rpc

I hope this article is helpful to you. If you like it, please click to collect and forward a wave.

I am an old horse, looking forward to meeting with you next time.