Welcome to follow my public account, the first time to receive Java and big data related boutique articles push.

Learn HDFS around an important knowledge point is the Hadoop RPC framework. But there is very little information that can be explained along the lines of the RPC framework’s communication from client to server and from server to client. It is difficult for readers to intuitively understand the Hadoop RPC framework. Therefore, I decided to write this article. I thought that if I read this article carefully and followed the process of this article, I would definitely be able to master the Hadoop RPC framework.

This paper introduces Hadoop RPC framework in detail through flow chart + text. This article starts with DFSClient to get a first look at the Hadoop RPC framework by tracing the propagation path of an RPC request. We then try to introduce the Hadoop RPC framework in detail in a holistic and then partial way.

This article covers the use of static inner classes and dynamic proxy design patterns

RPC call link

When we use the HDFS API for programming or the HDFS CLI command line input command execution, the internal will be implemented by calling DFSClient related methods. The following uses the HDFS API as an example:

Before using the HDFS API, obtain a FileSystem object. Call a method in the FileSystem object.

  // (1) Obtain a FileSystem object before using the HDFS API.
  public static FileSystem getFileSystem(String coreSitePath, String hdfsSitePath) throws IOException {
    Configuration conf = new Configuration();
    conf.addResource(new File(coreSitePath).toURI().toURL());
    conf.addResource(new File(hdfsSitePath).toURI().toURL());
    conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
    conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.setBoolean("fs.hdfs.impl.disable.cache".true);
    conf.setClassLoader(BaseUtil.class.getClassLoader());
    return FileSystem.get(conf);
  }

// ② Call a method in the FileSystem object.
FileSystem fileSystem = getFileSystem(coreSitePath, hdfsSitePath);
ContentSummary contentSummary = fileSystem.getContentSummary(new Path(hdfsFilePath));
Copy the code

The actual type of FileSystem in the code is actually DistributedFileSystem. The source code of the DistributedFileSystem class contains a DFSClient object DFS, as shown in the figure below:

When FileSystem methods are called, DFSClient methods are called, as shown below:

Let’s start with the DFSClient class and describe an RPC propagation adventure, as shown in the figure below. The large light blue rectangle represents the class, and the small dark blue rectangle represents the fields within the class. The arrow points to the real type of the field.

Use words to describe: DFSClient# namenode (actual type: ClientNamenodeProtocolTranslatorPB)

|

ClientNamenodeProtocolPB#rpcProxy (actual type: ClientNamenodeProtocolPB proxy object generated by JDK dynamic proxy)

|

By dynamic proxy Invoker class invoke method and ClientNamenodeProtocolServerSideTranslatorPB Socket connections (is actually with the Rpc Server connection is established, Then the method call entrusted to ClientNamenodeProtocolServerSideTranslatorPB)

|

ClientNamenodeProtocolServerSideTranslatorPB# server (actual type: NameNodeRpcServer or RouterRpcServer)

Already very easy to understand!!

That it actually involves three issues: (1) the namenode DFSClient member variable how to initialize ClientNamenodeProtocolTranslatorPB?

(2) How to assign the rpcProxy member variable of ClientNamenodeProtocolPB to a dynamic proxy object And how a proxy object through the invoke method and ClientNamenodeProtocolServerSideTranslatorPB Socket connection is established?

(3) ClientNamenodeProtocolServerSideTranslatorPB server member variable how to initialize NameNodeRpcServer or RouterRpcServer? And how does the Server resolve RPC requests?

Let’s answer these three questions in order:

First of all, the first question: (1) the namenode DFSClient member variable how to initialize ClientNamenodeProtocolTranslatorPB?

Are to be found through the Debug HDFS Client is ultimately calls the org.. Apache hadoop. HDFS. NameNodeProxiesClient# createNonHAProxyWithClientProtocol method.

The call stack is as follows:

Keep an eye on the call stack on the top of the stack NameNodeProxiesClient# createNonHAProxyWithClientProtocol method:

OK, that’s the first question.

Then answer the second question: (2) How to assign the rpcProxy member variable of ClientNamenodeProtocolPB to a dynamic proxy object And how a proxy object through the invoke method and ClientNamenodeProtocolServerSideTranslatorPB Socket connection is established?

In the picture above, NameNodeProxiesClient# createNonHAProxyWithClientProtocol by RPC. GetProtocolProxy return a proxy ClientNamenodeProtocolPB types of variables. This object is in the ClientNamenodeProtocolTranslatorPB rpcProxy members. (Because it was passed in by the constructor)

Let’s look at the ClientNamenodeProtocolPB type, which is an interface. This defines the Protocol Buffer class generated from the.proto file we provided.

We also see that the proxy is generated like this:

ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
        fallbackToSimpleAuth).getProxy();
Copy the code

Tracing RPC. GetProtocolProxy method:

   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy,
                                AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
        fallbackToSimpleAuth);
  }
Copy the code

The getProxy() method defined by the RpcEngine interface is used to set the proxy. In practice, Hadoop uses the Protocol Buffer as the serialization framework. So let’s look at the implementation in ProtobufRpcEngine, as shown below:

OK, we know the ClientNamenodeProtocolTranslatorPB# rpcProxy this object is in fact in ProtobufRpcEngine# getProxy generated after the proxy object (the first part of the first (2) a question is asked). Let’s see how the invoke method of the proxy object connects to the Server Socket.

We go directly to the invoke method of the ProtobufRpcEngine#Invoker inner class. (This is how dynamic proxies are designed, that is, to see the Invoke method)

The Invoke method is very long and I have captured the main framework code, ignoring some details that don’t affect our understanding of the Hadoop RPC framework (such as parameter validation and so on). The code is as follows, with the core focus on a single line of client.call (XXX). This is where you end up calling the Client class’s call method.

    @Override
    public Message invoke(Object proxy, final Method method, Object[] args)
        throws ServiceException {
      // Construct the RPC request header
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      // RPC request parameters
      final Message theRequest = (Message) args[1];
      final RpcWritable.Buffer val;
      try {
        // Core method!
        val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

      } catch (Throwable e) {        
        throw new ServiceException(e);
      } finally {
        if(traceScope ! =null) traceScope.close();
      }
      // Asynchronous mode? We can ignore that
      if (Client.isAsynchronousMode()) {
        final AsyncGet<RpcWritable.Buffer, IOException> arr
            = Client.getAsyncRpcResponse();
        final AsyncGet<Message, Exception> asyncGet
            = new AsyncGet<Message, Exception>() {
          @Override
          public Message get(long timeout, TimeUnit unit) throws Exception {
            return getReturnMessage(method, arr.get(timeout, unit));
          }

          @Override
          public boolean isDone(a) {
            returnarr.isDone(); }}; ASYNC_RETURN_MESSAGE.set(asyncGet);return null;
      } else {
        // Construct the Messge object that returns the result based on the server response
        returngetReturnMessage(method, val); }}Copy the code

After tracking, the overall process of setting up a Socket is: Client#call -> Client#getConnection-> Client#Connection#setupIOstreams ->Client#Connection#setupConnection -> NetUtils. Connect. The entire call link is shown in the figure below:

OK, with the first two questions answered, let’s move on to the third question: (3) ClientNamenodeProtocolServerSideTranslatorPB server member variable how to initialize NameNodeRpcServer or RouterRpcServer?

To review the second step, Hadoop takes a proxy object of type ClientNamenodeProtocolPB via JDK dynamic proxy: rpcProxy, and delegates method calls to the proxy object. The proxy object will connect to the Namenode RPC Socket identifier (IP or Hostname + port number) that we provided in HDFS-site.xml to make RPC requests and get responses.

Therefore, the Namenode RPC Server will enable this listening port. Let’s take a look. Observed in the constructor of NamenodeRpcServer, new ClientNamenodeProtocolServerSideTranslatorPB object. The name of this object class has ServerSide, which means TranslatePB on the Server side, which corresponds to TranslatePB on the client side.

OK, we will start from the NameNode next class, then reasoning to NamenodeRpcServer and ClientNamenodeProtocolServerSideTranslatorPB.

First, an overall flow chart is given:

When the Namenode is initialized, a NamenodeRpcServer object rpcServer is constructed based on the various configuration items, and the rpcServer is started in the start service function. As shown in the figure below:

So let’s focus on the start method. Responsible for handling the RPC server object in the NameNodeRpcServer are RPC# server types, it inherits the org.. Apache hadoop. Ipc. The server class. As shown in the following code, so start method to jump into the org.. Apache hadoop. The ipc. The Server class start method:

// NameNodeRpcServer.java
protected final RPC.Server clientRpcServer;

// RPC.java
public abstract static class Server extends org.apache.hadoop.ipc.Server {}Copy the code

The start method of the Server class looks like this:

There are three main classes involved: Responder, Listener, and Handler. All three classes are essentially threaded classes.

The Listener is used to listen for requests from the client, deserialize the requests through the Reader, and then place the requests into the callQueue, waiting for the Handler to take the requests from the callQueue for processing. The Responder is the response to the request.

Here we only have one last question need to understand how the Server class through the Handler delegate RPC calls eventually to ClientNamenodeProtocolServerSideTranslatorPB?

Handler is a threaded class whose run method constantly takes Call objects from the callQueue and then executes the call.run () method to process RPC requests, which in turn calls the Call method to get the return value shown below:

Make track for to call method: the ProtobufRpcEngine. Server ProtoBufRpcInvoker# call method (only the key part, omit the try catch has nothing to do with other code) :

      public Writable call(RPC.Server server, String connectionProtocolName,
          Writable writableRequest, long receiveTime) throws Exception {
    
        // Fetch a Protocol from the registered Protocol
        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, 
                              declaringClassProtoName, clientVersion);
        // Retrieve the Blocking Service object
        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
        MethodDescriptor methodDescriptor = service.getDescriptorForType()
            .findMethodByName(methodName);
        
        // Call the method on the service object that passes through the NameNodeRpcServer
        / / ClientNamenodeProtocolServerSideTranslatorPB constructed object.
        result = service.callBlockingMethod(methodDescriptor, null, param);
    }
Copy the code

The first argument is the type rpc. Server, which corresponds to the three variables in the previous NameNodeRpcServer:

Then we come to understand BlockingService to look for the service object (ClientNamenodeProtocolServerSideTranslatorPB links!) .

The NameNodeRpcServer constructor contains the following code:

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {

    ClientNamenodeProtocolServerSideTranslatorPB 
       clientProtocolServerTranslator = 
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
    BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);
    
    // Remember that setInstance is passed in the BlockingService variable above. We're going to use it next.
    clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(rpcAddr.getPort())
        .setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();

}
Copy the code

During the construction of an RPC Server, the setInstance method is passed a BlockingService object. So when we build, we check that if the Instance is not empty, then we register it with the new Server, so that the protocolImpl that we get in the call method that we just did is we get this BlockingService from the Map that we registered. Also associated with ClientNamenodeProtocolServerSideTranslatorPB up!

The diagram below:

At this point, the whole process of Hadoop RPC framework is finished.

Of course, there are a lot of details, such as the Retry mechanism, Failover, exception handling, asynchronous RPC requests, thread safety, and so on.

reference

Hadoop-3.1 version source code