In the introduction of Netty processors in Nifty (I), one of the processors is the codec processor, which encapsulates the ChannelBuffer received by Netty into thrift specific message types. In this case, it would be ThriftMessage, which contains a ChannelBuffer. And then it goes to NiftyDispatcher for processing. So this article will be divided into two parts, codec and NiftyDispatcher.

First of all, the service implementation class we provide, EchoServiceImpl, provides a method, echo, that is assumed to be called by the client.

1. ThriftFrameCodec

Currently, ThriftFrameCodec has only one implementation class, DefaultThriftFrameCodec, which contains the encoder and decoder

public class DefaultThriftFrameCodec implements ThriftFrameCodec {
    private final ThriftFrameDecoder decoder;
    private final ThriftFrameEncoder encoder;

    public DefaultThriftFrameCodec(int maxFrameSize, TProtocolFactory inputProtocolFactory){
        this.decoder = new DefaultThriftFrameDecoder(maxFrameSize, inputProtocolFactory);
        this.encoder = new DefaultThriftFrameEncoder(maxFrameSize);
    }

    @Override
    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception{
        encoder.handleDownstream(ctx, e);
    }

    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception{ decoder.handleUpstream(ctx, e); }}Copy the code

The message is decoded using the handleUpstream when it is received and encoded using the encoder when handleDownstream is called. We’ll just focus on the decoder here for now.

When calling decoder.handleupstream, the subsequent call chain is as follows

SimpleChannelUpstreamHandler.handleUpstream -> FrameDecoder.messageReceived -> FrameDecoder.callDecode -> DefaultThriftFrameDecoder.decode
Copy the code

The decode method is as follows:

protected ThriftMessage decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
    // ...
    return new ThriftMessage(messageBuffer, ThriftTransportType.FRAMED);
}
Copy the code

The ThriftMessage contains a buffer of type ChannelBuffer and a transport type TTransportType.

If we again curious from FrameDecoder callDecode continue to look down, in fact, in the call this ThriftMessage after decode, finally will invoke the netty Channels. FireMessageReceived method, Encapsulate the result as a MessageEvent and pass it to the next handler

public static void fireMessageReceived(ChannelHandlerContext ctx, Object message, SocketAddress remoteAddress) {
    ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), message, remoteAddress));
}

// MessageEvent
public class UpstreamMessageEvent implements MessageEvent {
    private final Channel channel;
    private final Object message;
    private final SocketAddress remoteAddress;
}
Copy the code

The next handler receives the event and gets the message to process it. And then the handler, NiftyDispatcher.

2. NiftyDispatcher

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
        throws Exception
{
    if (e.getMessage() instanceof ThriftMessage) {
        ThriftMessage message = (ThriftMessage) e.getMessage();

        if (taskTimeoutMillis > 0) {
            message.setProcessStartTimeMillis(System.currentTimeMillis());
        }
        
        TNiftyTransport messageTransport = new TNiftyTransport(ctx.getChannel(), message);
        TTransportPair transportPair = TTransportPair.fromSingleTransport(messageTransport);
        TProtocolPair protocolPair = duplexProtocolFactory.getProtocolPair(transportPair);

        TProtocol inProtocol = protocolPair.getInputProtocol();

        TProtocol outProtocol = protocolPair.getOutputProtocol();
        outProtocol.setServerSide(true);
        processRequest(ctx, message, messageTransport, inProtocol, outProtocol);
    }
    else{ ctx.sendUpstream(e); }}Copy the code
  1. First, obtain message ThriftMessage, and set the start processing time to the current time.

  2. TNiftyTransport, the transport component, this class is very important. It will store the ChannelBuffer sent by the client internally and build a dynamically expanded ChannelBuffer. Subsequent reads and writes involving data depend on this class.

public TNiftyTransport(Channel channel, ThriftMessage message){
    this(channel, message.getBuffer(), message.getTransportType());
}

public TNiftyTransport(Channel channel, ChannelBuffer in, ThriftTransportType thriftTransportType) {
    this.channel = channel;
    this.in = in;
    this.thriftTransportType = thriftTransportType;
    this.out = ChannelBuffers.dynamicBuffer(DEFAULT_OUTPUT_BUFFER_SIZE);
    this.initialReaderIndex = in.readerIndex();

    buffer = in.array();
    initialBufferPosition = bufferPosition = in.arrayOffset() + in.readerIndex();
    bufferEnd = bufferPosition + in.readableBytes();
    in.readerIndex(in.readerIndex() + in.readableBytes());
}
Copy the code

Mainly some initialization work, such as in, out two ChannelBuffer; Get buffer array elements; Set the start and end positions of the buffer.

TTransportPair contains an inputTransport and an outputTransport. TProtocolPair holds an input and an output protocol, and TTransport.

TTransportPair:

public class TTransportPair {
    private final TTransport inputTransport;
    private final TTransport outputTransport;

    public static TTransportPair fromSingleTransport(final TTransport transport) {
        return newTTransportPair(transport, transport); }}Copy the code

TProtocolPair

public class TProtocolPair {
    private final TProtocol inputProtocol;
    private final TProtocol outputProtocol;
}
Copy the code

TProtocal: the protocol used here is TBinaryProtocal

public abstract class TProtocol {

  /** * Transport */
  protected TTransport trans_;
}
Copy the code

Moving on, you can see it in the method processRequest

processFuture = processorFactory.getProcessor(messageTransport).process(inProtocol, outProtocol, requestContext);
Copy the code

ProcessorFactory. GetProcessor (messageTransport) return is ThriftServiceProcessor, the service processor object is a series of data processing, These are described in Nifty (I), a High-performance RPC Framework based on Netty. So let’s go straight to its process method

public ListenableFuture<Boolean> process(final TProtocol in, TProtocol out, RequestContext requestContext) throws TException{
    TMessage message = in.readMessageBegin();
    String methodName = message.name;
    int sequenceId = message.seqid;
}
Copy the code

TMessage has three attributes, representing the method name, type, and ordinal number. For example, here we get name=”echo”, type Void, number 1

public final class TMessage {
  public final String name;
  public final byte type;
  public final int seqid;
}
Copy the code

A brief introduction to protocols and transport layers

The type of IN is TBinaryProtocal, which inherits from TProtocal and internally holds TTransport (which can be understood as the transport layer). Finally, TTransport is really involved in data reading and writing. In fact, nine times out of ten RPC frameworks have similar protocol interfaces and transmission interfaces. Take a look at what protocol interfaces are defined

/** * Protocol interface definition. * */
public abstract class TProtocol {

  /** * Transport */
  protected TTransport trans_;

  /** * Constructor */
  protected TProtocol(TTransport trans) {
    trans_ = trans;
  }

  /** * Transport accessor */
  public TTransport getTransport(a) {
    return trans_;
  }

  private boolean serverSide;
  private String serviceName;

  // getter, setter


  /** * Reading methods. */

  public abstract TMessage readMessageBegin(a) throws TException;
  public abstract void readMessageEnd(a) throws TException;
  
  public abstract TStruct readStructBegin(a) throws TException;
  public abstract void readStructEnd(a) throws TException;
  
  public abstract TField readFieldBegin(a) throws TException;
  public abstract void readFieldEnd(a) throws TException;

  public abstract TMap readMapBegin(a) throws TException;
  public abstract void readMapEnd(a) throws TException;

  public abstract TList readListBegin(a) throws TException;
  public abstract void readListEnd(a) throws TException;

  public abstract TSet readSetBegin(a) throws TException;
  public abstract void readSetEnd(a) throws TException;

  public abstract boolean readBool(a) throws TException;
  public abstract byte readByte(a) throws TException;
  public abstract short readI16(a) throws TException;
  public abstract int readI32(a) throws TException;
  public abstract long readI64(a) throws TException;
  public abstract double readDouble(a) throws TException;
  public abstract String readString(a) throws TException;
  public abstract ByteBuffer readBinary(a) throws TException;
  
  /** * Writing methods. */
   // ...
}

Copy the code

The main inside is the data read and write methods, write and read methods are not corresponding to the paste.

  • Start and end reading of messages;
  • Start and end reads of structures;
  • Reading of start and end parameters;
  • Start and end collection reads;

Let’s start with the implementation of readMessageBegin

public TMessage readMessageBegin(a) throws TException {
    int size = readI32();
    return new TMessage(readString(), (byte) (size & 0x000000ff), readI32());
}
Copy the code

The method is simplified a bit here for ease of reading. Four bytes are read to identify the type, then a string is read, and four bytes are read to identify the sequence number. So this is actually the protocol of Thrift itself, but we can also define the type of the first two bytes that we read. The readString method also reads 4 bytes as size and then builds the string by reading size bytes from transport’s ChannelBuffer held by the protocol.

More details about readI32 and readString are covered here, but can be skipped if you’re not interested.

private byte[] i32rd = new byte[4];

public int readI32(a) throws TException {
    byte[] buf = i32rd;
    int off = 0;

    if (trans_.getBytesRemainingInBuffer() >= 4) {
        buf = trans_.getBuffer();
        off = trans_.getBufferPosition();
        trans_.consumeBuffer(4);
    } else {
        readAll(i32rd, 0.4);
    }
    return
            ((buf[off] & 0xff) < <24) |
                    ((buf[off + 1] & 0xff) < <16) |
                    ((buf[off + 2] & 0xff) < <8) |
                    ((buf[off + 3] & 0xff));
}
Copy the code

The trans_ here, as mentioned earlier, is TNiftyTransport, provided by the nifty package.

  • trans_.getBytesRemainingInBuffer()Bufferend-bufferposition represents the number of remaining bytes of channelBuffer that are held internally. If there are four bytes, read four bytes, otherwise read all;
  • buf = trans_.getBuffer()Getbuffer (); getBuffer ();
  • off = trans_.getBufferPosition();Is to get the current read position of the buffer inside transport, i.e. BufferPosition;
  • trans_.consumeBuffer(4);Transport internal buffer consumes 4 bytes, i.e. BufferPosition += 4;
  • Regarding the return value, notice that the hexadecimal 0xFF is the binary 11111111, and the end result is to concatenate four bytes together to form an int value

About the readByte readShort, readLong are similar.

ReadString:

public String readString(a) throws TException {
    int size = readI32();

    checkStringReadLength(size);

    if (trans_.getBytesRemainingInBuffer() >= size) {
        String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
        trans_.consumeBuffer(size);
        return s;
    }

    return readStringBody(size);
}
Copy the code

We first read the four-byte size, which indicates how many bytes we need to read to construct the string. After obtaining buffer and position, read size bytes from position position of buffer to construct string; Finally, you need to move position before returning string.

Go back to

TMessage message = in.readMessageBegin();
String methodName = message.name;
int sequenceId = message.seqid;
Copy the code

So now you know the method name and the sequence number, for example, “echo”,1. As we move on,

ThriftMethodProcessor method = methods.get(methodName);
ListenableFuture<Boolean> processResult = method.process(in, out, sequenceId, context);
Copy the code

Get method processor ThriftMethodProcessor from the Methods mapping table based on the method name. Notice that we are currently in the service processor (ThriftServiceProcessor), which holds multiple method processors, ThriftMethodProcessors, each corresponding to a method of the Thrift object.

During the initialization of ThriftServiceProcessor, all methods in the class are read to construct the method processor and set to Methods =Map[String, ThriftMethodProcessor], where key is the method name.

Once you get the method handler ThriftMethodProcessor, call the Process method to process it. So, let’s look at how we do data processing. Is it narrowing down by this point? From class level to method level.

public ListenableFuture<Boolean> process(TProtocol in, final TProtocol out, final int sequenceId, final ContextChain contextChain) throws Exception {
    // read args
    Object[] args = readArguments(in);
    in.readMessageEnd();

    // invoke method
    finalListenableFuture<? > invokeFuture = invokeMethod(args); }Copy the code
  • Read parameters from the protocol, this is similar to the previous read method name and sequence number;
  • Indicates that the message has been read. This is an empty implementation that doesn’t do anything.
  • Pass in the obtained parameters and call the method to get the result.

Let’s focus on getting parameters and method calls.

Gets the method parameter value readArguments

private final Method method;
private finalMap<Short, ThriftCodec<? >> parameterCodecs;private final Map<Short, Short> thriftParameterIdToJavaArgumentListPositionMap;


private Object[] readArguments(TProtocol in) throws Exception {
    int numArgs = method.getParameterTypes().length;
    Object[] args = new Object[numArgs];
    TProtocolReader reader = new TProtocolReader(in);
    
    reader.readStructBegin();
    while (reader.nextField()) {
        shortfieldId = reader.getFieldId(); ThriftCodec<? > codec = parameterCodecs.get(fieldId); args[thriftParameterIdToJavaArgumentListPositionMap.get(fieldId)] = reader.readField(codec); } reader.readStructEnd();return args;
}
Copy the code

This ThriftServiceProcessor contains Method metadata. This ThriftServiceProcessor initializes this ThriftServiceProcessor at the same time.

  • Get the number of parameters in the method numArgs, create numArgs element Object array;
  • Build the TProtocolReader and loop to see if there are any arguments
  • Gets the parameter ID, the codec from the cached codec
  • Read the parameter values using the obtained codec and Reader and set them into an array

The following is also more detailed, if not interested can be skipped.

Construct TProtocalReader as follows. The class will hold the protocol object passed in internally. CurrentField represents the method parameter Field that is currently being read

public class TProtocolReader{
    private final TProtocol protocol;
    private TField currentField;

    public TProtocolReader(TProtocol protocol){
        this.protocol = protocol;
    }

    public void readStructBegin(a) throws TException {
        protocol.readStructBegin();
        currentField = null;
    }
    
    public boolean nextField(a) throws TException {
        currentField = protocol.readFieldBegin();
        return currentField.type != TType.STOP;
    }
}
Copy the code

NextFiled returns false if field.type== ttype. STOP is read in the protocol. ReadFieldBegin reads a byte and determines whether the type is STOP to set the ID and TField

public TField readFieldBegin(a) throws TException {
    byte type = readByte();
    short id = type == TType.STOP ? 0 : readI16();
    return new TField("", type, id);
}
Copy the code

This is set on the client side about when to stop reading parameters, so we’re not going to worry about that here.

If there are still parameters to read, get the fieldId, and then get the codec for the parameters from the codec.

public short getFieldId(a){
    return currentField.id;
}
Copy the code

This ThriftCodecManager ThriftCodecManager is used to build codec map (parameterCodecs), where key is parameter ID and value is parameter. Same thriftParameterIdToJavaArgumentListPositionMap id is the key parameter, the value for the parameter position (0 – (n – 1), n number) for the method parameter.

And finally, how do I read the parameter values

ThriftMethodProcessor:

args[thriftParameterIdToJavaArgumentListPositionMap.get(fieldId)] = reader.readField(codec);
Copy the code

TProtocalReader:

public Object readField(ThriftCodec
        codec) throws Exception{
    currentField = null;
    Object fieldValue = codec.read(protocol);
    protocol.readFieldEnd(); / / empty implementation
    return fieldValue;
}
Copy the code

If the parameter is String, the resulting codec is StringThriftCodec. Again, protocal=TNiftyProtocal. Based on this codec.read(protocol)

public class StringThriftCodec implements ThriftCodec<String> {

    @Override
    public String read(TProtocol protocol) throws Exception{
        return protocol.readString();
    }

    @Override
    public void write(String value, TProtocol protocol) throws Exception { protocol.writeString(value); }}Copy the code

Protocal.readstring () is actually called, so it’s easy to understand how to implement read and write methods in other codecs. Codecs are a layer of forwarders.

The readArguments method iterates through the argument list args and sets the defaultValue if the position is null. The defaultValue is set using Guava’s defaultValue(argumentClass)

Method invocation and response results

Here on the process of reading method parameter values is introduced, then look at the method call.

privateListenableFuture<? > invokeMethod(Object[] args) { Object response = method.invoke(service, args);return Futures.immediateFuture(response);
}
Copy the code

The first is to get the result through the reflection call, because the service implementation class Service and the parameter args are now available. Again, use the Guava package, which is used to build ListenableFuture.

Come back to continue to see ThriftMethodProcessor. The process method

finalListenableFuture<? > invokeFuture = invokeMethod(args);final SettableFuture<Boolean> resultFuture = SettableFuture.create();

Futures.addCallback(invokeFuture, new FutureCallback<Object>() {
    @Override
    public void onSuccess(Object result) {
        
        writeResponse(out,
                      sequenceId,
                      TMessageType.REPLY,
                      "success",
                      (short) 0,
                      successCodec,
                      result);

        re sultFuture.set(true);
    }

    @Override
    public void onFailure(Throwable t){
        
    }
}, Runnable::run);

return resultFuture;
Copy the code

Getting the result of the method call from invokeMethod(args), the problem now is how to respond to the execution result to the client. There are two main steps:

  1. Write the response result to the ChannelBuffer, namely the outbuffer in TNiftyTransport;
  2. The ChannelBuffer that has been written is sent to the client through the Netty API.

Let’s take a look at the first step, writeResponse, which has very little content, as follows:

private <T> void writeResponse(TProtocol out,
                                   int sequenceId,
                                   byte responseType,
                                   String responseFieldName,
                                   short responseFieldId,
                                   ThriftCodec<T> responseCodec,
                                   T result) throws Exception {

    out.writeMessageBegin(new TMessage(name, responseType, sequenceId));

    TProtocolWriter writer = new TProtocolWriter(out);
    writer.writeStructBegin(resultStructName);
    writer.writeField(responseFieldName, (short) responseFieldId, responseCodec, result);
    writer.writeStructEnd();

    out.writeMessageEnd();
    out.getTransport().flush();
}
Copy the code

This corresponds to the previous various read methods

  • The readMessageBegin method reads a TMessage first, so the TMessage name is the method name. So here’s “echo”;
public void writeMessageBegin(TMessage message) throws TException {
    int version = VERSION_1 | message.type;
    writeI32(version);
    writeString(message.name);
    writeI32(message.seqid);
}
Copy the code

ReadMessageBegin (name, type, and sequence number); Here the data is written to an empty Protocal.channelbuffer.

  • ReadStructBegin in front, writeStructBegin in this case, but this is the empty implementation;
  • Since multiple parameters are read, we need to use the while loop, but only one result is returned, so we only need to use writeField once.
public <T> void writeField(String name, short id, ThriftCodec<T> codec, T value) throws Exception{
    protocol.writeFieldBegin(new TField(name, codec.getType().getProtocolType().getType(), id));
    codec.write(value, protocol);
    protocol.writeFieldEnd();
}
Copy the code

WriteFieldBegin, which writes the type and ID to the buffer. Don’t mention codec.write. For example, if the return type is String, codec is StringThriftCodec, which we’ve already covered; WriteFieldEnd is an empty implementation.

Writer.writestructend () : writeFieldStop (); writeFieldStop () : writeFieldStop (); As follows:

public void writeFieldStop(a) throws TException {
    writeByte(TType.STOP);
}
Copy the code

About the out. WriteMessageEnd (); Is a null implementation;

Out.gettransport ().flush(); In TNiftyTransport it’s an empty implementation because it’s done in NiftyDispatcher to keep the order, which is what I’m going to talk about in step 2 of responding to the data.

Back to NiftyDispather. The processRequest, there is

ThriftMessage response = message.getMessageFactory().create(messageTransport.getOutputBuffer());

writeResponse(ctx, response, requestSequenceId, DispatcherContext.isResponseOrderingRequired(ctx));
Copy the code

The response contains the outbuffer in TNiftyTransport, and the response data has been written in before.

public Factory getMessageFactory(a){
    return new Factory() {
        @Override
        public ThriftMessage create(ChannelBuffer messageBuffer)
        {
            return newThriftMessage(messageBuffer, getTransportType()); }}; }Copy the code

Then look at the writeResponse method, all the way down to the code below

private void writeResponseInOrder(ChannelHandlerContext ctx, ThriftMessage response, int responseSequenceId) {
    // Ensure responses to requests are written in the same order the requests were received.
    synchronized (responseMap) {
        int currentResponseId = lastResponseWrittenId.get() + 1;
        if(responseSequenceId ! = currentResponseId) { responseMap.put(responseSequenceId, response); }else {
            do {
                Channels.write(ctx.getChannel(), response);
                lastResponseWrittenId.incrementAndGet();
                ++currentResponseId;
                response = responseMap.remove(currentResponseId);
            } while (null! = response); }}}Copy the code

If you don’t need to ensure that the response order is the same as the request order, you can simply use the following code

Channels.write(ctx.getChannel(), response);
Copy the code

If order is required, the currentResponseId! =responseSequenceId) is saved when a currentResponseId is received. =responseSequenceId when the request is written again. For example, say lastResponseWrittenId=1. When a response with serial number 4 or 5 is received, save it to the map. When a response with serial number 2 is received, write it out and look for the response with serial number =3 in the map. There is no 3 here, so the loop is closed. When a subsequent response with serial number 3 is received, the responses with serial number 3, 4 and 5 are written out in sequence.

Write (channel,message) is a method provided by Netty to write out a message. Eventually, the result is passed to the codec DefaultThriftFrameCodec, which is encoded and converted to the format specified by thrift.