YARN high concurrency network communication RPC architecture design and source code implementation

1. YARN architecture Evolution

1. Previous MapReduce architecture design and implementation of Hadoop-2.x:

Primary node: JobTracker manages and schedules global resources

Slave node: TaskTracker manages and schedules resources for a single host

Architecture issues:

  • Single point of failure, low reliability

    There is only one JobTracker, and if that one host crashes, the entire MapReduce cluster collapses

  • Single point of bottleneck, poor scalability

    The applicationMaster of each node is on JobTracker. When there are a large number of slave nodes, JobTracker will be under great pressure and the stability of the cluster will be very poor

  • Resource management and task execution are strongly coupled

    Resource management and task scheduling use MapReduce, which means that the cluster can only run code written using mapReduce apis, and code written using MapReduce apis cannot run anywhere else

  • Low resource utilization

    Use Slot for resource management, but Slot is divided into MapSlot and ReduceSlot, and the two can not use each other’s resources, so there may be idle slots, (Map and Reduce have four each, but now there are six Map tasks and two Reduce tasks, As a result, the task that wants resources cannot get resources, and the task that does not need resources cannot be completed.

  • Multiple distributed computing frameworks are not supported

    Only the MapReduce program is supported. Other computing frameworks are not supported

2.Hadoop architecture changes after the Hadoop-2.x release:

Hadoop YARN architecture advantages:

  • Greatly reduces resource consumption for JobTracker

    The new architecture, where each node’s ApplicationMaster resides on its own slave node, greatly reduces JobTracker’s resource consumption

  • ApplicationMaster in YARN is only a specification

    ApplicationMaster is a specification, which means that any computing framework can use YARN for resource management and scheduling as long as it meets the requirements of the specification

  • The resource abstraction of Container in YARN is more reasonable than that of Slot

    A Container is a specification that is not classified. You can run applications that meet the specification in a Container

  • Integrate ZooKeeper to solve THE SPOF problem in RM

    In combination with Zookeeper, HA is implemented to avoid single point of failure

2. High concurrency YARN RPC architecture design

1. Hadoop RPC concepts

Remote Procedure Call (RPC) is a protocol that requests services from Remote computers over the network without understanding the underlying network technology. RPC refers to remote procedure call, that is, two servers A and B, one application deployed on server A, want to call the function/method provided by the application on server B, because they do not have the same memory space, they cannot call directly, so they need to express the call semantics and transfer the call data over the network.

Simply put, the client tells the server the method and parameters it wants to use. The server runs the method and returns the result to the client.

The figure shows a simple example. The RPC server is China, and various components of the server represent various departments in China. When other countries want to send messages to specific departments in China, they only need to inform the Chinese ambassador, who will deliver the messages on behalf of the Chinese ambassador. Foreign countries do not need to know the specific ways of communication between Chinese ambassadors and specific departments in China, which is transparent.

As a mature and stable distributed system with complex and powerful functions, Hadoop inevitably involves the communication between a large number of components at the bottom. A high-performance, scalable, and maintainable RPC network communication framework must be the foundation of Hadoop mansion. So look at the current Hadoop project structure:

  • Hadoop Common: RPC network communication framework, various toolkits, etc
  • HDFS: distributed file system (C/S distributed system with master/slave architecture)
  • Hadoop MapReduce: Distributed computing application programming framework
  • Hadoop YARN: distributed resource scheduling system (C/S master-slave distributed system)

2. Implementation of Hadoop RPC architecture

In hadoop-1. x version, the Writable protocol implemented by default is used as the RPC protocol, while in Hadoop-2.x version, the RPC framework is rewritten and Protobuf protocol is used as the default RPC communication protocol of Hadoop. In YARN, there is only one RPC protocol between any two components that need to communicate with each other. For any RPC protocol, one end is a Client and the other end is a Server. The Client always proactively connects to the Server. YARN actually uses the pull-based communication model.

The work of the YARN RPC server can be divided into four stages:

  • Phase 1: Server initialization and startup

  • Phase two: Receive the request and encapsulate the RpcCall

  • Phase 3: Processing RpcCall requests

  • In fact, this place is talking about a RpcServer, RpcServer does not refer to a server, in fact, a hardware server can run multiple RPCServers.

  • NameNode (Multiple RPCServers)

    • The first RpcServer: provides services to clients
    • Second RpcServer: provides services for DataNode
  • ResourceManager (Multiple RPCServers)

3. High-concurrency YARN RPC actual case

There are two implementations of serialization mechanism in Hadoop RPC framework:

  • Avro Writable interface implementation, easy to understand
  • Google Protobuf cross-language implementation, cross-language, high scalability, high efficiency

YARN RPC Writable case implementation

Let’s start with the code structure:

Step 1: Define an RPC protocol first

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: RPC protocol: used to define the service * to achieve VersionedProtocol this interface: Different versions of the Server and Client cannot communicate with each other before
public interface BussinessProtocol {

    void mkdir(String path);
    String getName(String name);

    long versionID = 345043000L;
}
Copy the code

Step 2: Define a service implementation component for the RPC BussinessProtocol communication protocol:

public class BusinessIMPL implements BussinessProtocol {
    @Override
    public void mkdir(String path) {
        System.out.println("Folder created successfully:" + path);
    }

    @Override
    public String getName(String name) {
        System.out.println("成功打了招呼: hello :" + name);
        return "bigdata"; }}Copy the code

Step 3: Build an RPC server through Hadoop RPC and provide services externally through BussinessProtocol

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;
/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: simulate the Hadoop build an RPC server * /
public class MyServer {
    public static void main(String[] args) {
        try {
            / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: Build an RPC server * server that provides a Business PL service implementation of the BussinessProtocol */
            RPC.Server server = new RPC.Builder(new Configuration())
                    .setProtocol(BussinessProtocol.class)
                    .setInstance(new BusinessIMPL())
                    .setBindAddress("localhost")
                    .setPort(6789)
                    .build();

            // Comment: RPC Server started
            server.start();
        } catch(IOException e) { e.printStackTrace(); }}}Copy the code

Step 4: Build an RPC client to send RPC requests to the RPC server

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;
import java.net.InetSocketAddress;

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * TODO_MA Ma Zhonghua https://blog.csdn.net/zhongqi2513 * comment: build the RPC client * /
public class MyClient {
    public static void main(String[] args) {
        try {
            / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: access to the server in the exposed a proxy service agreement. * This proxy allows clients to invoke server-side methods for logical processing */
            BussinessProtocol proxy = RPC.getProxy(BussinessProtocol.class, BussinessProtocol.versionID,
                    new InetSocketAddress("localhost".6789), new Configuration());
            / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: the client calls the server-side code execution, the real code is in the service side * /
            proxy.mkdir("/home/bigdata/apps");
            String rpcResult = proxy.getName("bigdata");
            System.out.println("Response result of getName RPC request received from RPC server:" + rpcResult);

        } catch(IOException e) { e.printStackTrace(); }}}Copy the code

Step 5: Test

  • Run the server first
  • Then run the client, initiate an RPC request, and check whether the response result of the RPC request is obtained. Server output:

The /home/bigdata/apps folder is successfully created. Hello: Bigdata is successfully created

The client receives the result:

Response result of the getName RPC request received from the RPC server: BigData

YARN RPC Protobuf case implementation

First, let’s look at the project structure:

Step 1: Define the protocol MyResourceTracker

package com.mazh.rpc.protobuf.demo2;

import com.mazh.rpc.protobuf.demo2.proto.MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto;
import com.mazh.rpc.protobuf.demo2.proto.MyResourceTrackerMessage.MyRegisterNodeManagerRequestProto;

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: the agreement of the parameters and return values, is a Java object generated by the Protobuf * /
public interface MyResourceTracker {
    MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}
Copy the code

Step 2: Define Protobuf file:

option java_package = "com.mazh.rpc.protobuf.demo2.proto";
option java_outer_classname = "MyResourceTracker";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

import "MyResourceTrackerMessage.proto";

service MyResourceTrackerService {
  rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}
Copy the code
option java_package="com.mazh.rpc.protobuf.demo2.proto";
option java_outer_classname="MyResourceTrackerMessage";
option java_generic_services=true;
option java_generate_equals_and_hash=true;

message MyRegisterNodeManagerRequestProto{
    required string hostname=1;
    required int32 cpu=2;
    required int32 memory=3;
}
message MyRegisterNodeManagerResponseProto{
    required string flag=1;
}
Copy the code

Step 3: Compile the Proto file to generate a Java file. How to install Protobuf is not described here. Specific compilation commands:

protoc --proto_path=./ --java_out .. /.. /.. /.. /.. /.. / ./MyResourceTrackerMessage.proto protoc --proto_path=./ --java_out .. /.. /.. /.. /.. /.. / ./MyResourceTracker.protoCopy the code

The fourth step: specific logical implementation of MyResourceTracker communication protocol

package com.mazh.rpc.protobuf.demo2;

import com.mazh.rpc.protobuf.demo2.proto.MyResourceTrackerMessage;
/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: the realization of a communication protocol * /
public class MyResourceTrackerService implements MyResourceTracker {

    @Override
    public MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto registerNodeManager( MyResourceTrackerMessage.MyRegisterNodeManagerRequestProto request) throws Exception {

        // Comment: Build a response object to return
        MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto.Builder builder = 
MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto
                .newBuilder();

        // Comment: outputs the registered message
        String hostname = request.getHostname();
        int cpu = request.getCpu();
        int memory = request.getMemory();
        System.out.println("NodeManager registration message: hostname =" + hostname + ", cpu = " + cpu + ", memory = " + memory);

        // Comment: Direct violence returns True
        builder.setFlag("true");
        MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto response = builder.build();
        returnresponse; }}Copy the code

Step 6: Write the protocol interface of PROto

package com.mazh.rpc.protobuf.demo2;

import com.mazh.rpc.protobuf.demo2.proto.MyResourceTracker;
import org.apache.hadoop.ipc.ProtocolInfo;

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: write the proto protocol interface * /
@ProtocolInfo(protocolName = "com.mazh.rpc.protobuf.demo2.ResourceTrackerPB", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTracker.MyResourceTrackerService.BlockingInterface {}Copy the code

Step 7: Write proto protocol interface implementation

package com.mazh.rpc.protobuf.demo2;

import com.google.protobuf.RpcController;
import com.mazh.rpc.protobuf.demo2.proto.MyResourceTrackerMessage;

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: write the proto protocol interface * /
public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {

    final private MyResourceTracker server;

    public MyResourceTrackerServerSidePB(MyResourceTracker server) {
        this.server = server;
    }

    @Override
    public MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerMessage.MyRegisterNodeManagerRequestProto request){

        try {
            return server.registerNodeManager(request);
        } catch(Exception e) {
            e.printStackTrace();
        }
        return null; }}Copy the code

Step 8: Write the RPC Server implementation

package com.mazh.rpc.protobuf.demo2.server;
import com.google.protobuf.BlockingService;
import com.mazh.rpc.protobuf.demo2.MyResourceTrackerPB;
import com.mazh.rpc.protobuf.demo2.MyResourceTrackerServerSidePB;
import com.mazh.rpc.protobuf.demo2.MyResourceTrackerService;
import com.mazh.rpc.protobuf.demo2.proto.MyResourceTracker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: the RPC based on Hadoop API to write a RpcServer * /
public class ProtobufRpcServer {

    public static void main(String[] args) throws IOException {
        // Please begin your performance!
        Configuration conf = new Configuration();
        String hostname = "localhost";
        int port = 9998;
        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // Comment: Build Rpc Server
        RPC.Server  server = new RPC.Builder(conf)
                .setProtocol(MyResourceTrackerPB.class)
                .setInstance((BlockingService) MyResourceTracker.MyResourceTrackerService
                        .newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new 
MyResourceTrackerService())))
                .setBindAddress(hostname)
                .setPort(port)
                .setNumHandlers(1)
                .setVerbose(true)
                .build();

        // Comment: Rpc Server startedserver.start(); }}Copy the code

Step 9: Write the RPC Client implementation

package com.mazh.rpc.protobuf.demo2.client;

import com.google.protobuf.ServiceException;
import com.mazh.rpc.protobuf.demo2.MyResourceTrackerPB;
import com.mazh.rpc.protobuf.demo2.proto.MyResourceTrackerMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;
import java.net.InetSocketAddress;

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * note: * / Rpc Client side implementation
public class ProtobufRpcClient {

    public static void main(String[] args) throws IOException {
        // Please begin your performance!

        // Note: Set RPC engine to ProtobufRpcEngine
        Configuration conf = new Configuration();
        String hostname = "localhost";
        int port = 9998;
        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // Comment: get proxy
        MyResourceTrackerPB protocolProxy = RPC
                .getProxy(MyResourceTrackerPB.class, 1.new InetSocketAddress(hostname, port), conf);

        // Comment: Build the request object
        MyResourceTrackerMessage.MyRegisterNodeManagerRequestProto.Builder builder = 
MyResourceTrackerMessage.MyRegisterNodeManagerRequestProto.newBuilder();
        MyResourceTrackerMessage.MyRegisterNodeManagerRequestProto bigdata02 = 
builder.setHostname("bigdata02").setCpu(64)
                .setMemory(128).build();

        // Comment: send an RPC request to get the response
        MyResourceTrackerMessage.MyRegisterNodeManagerResponseProto response = null;
        try {
            response = protocolProxy
                    .registerNodeManager(null, bigdata02);
        } catch(ServiceException e) {
            e.printStackTrace();
        }

        // Comment: process the response
        String flag = response.getFlag();
        System.out.println("Final registration result: flag ="+ flag); }}Copy the code

Step 10: Test the execution result of the server:

NodeManager registration message: Hostname = Bigdata02, CPU = 64, Memory = 128

Client execution result:

Final registration result: flag = true

This is the end of this article, mainly explained the RPC network communication architecture design, please continue to pay attention to the subsequent content

I am a little white in this field, on the way of learning, if there are mistakes in the above article, please point out criticism.