A brief description of the consistent hash algorithm

  • I won’t go into the origin of consistent hashing in detail here. There are many good articles on consistent hashing that can be easily found on the Internet. This paper mainly wants to implement the consistent hash algorithm and build an environment for practical test.
  • Before we start, let’s sort out the idea of the algorithm:
  • The consistent hash algorithm divides the hash ring into segments by typing the hash value of each server on the ring, and then calculates the hash value of the incoming request to find out which server the request belongs to. This solution solves the problem of recalculating hashes when adding or subtracting machines to traditional servers.
  • However, if the number of servers is small, the calculated hash values may differ slightly, and the hash ring may be distributed unevenly, resulting in a server overload. In order to solve the load balancing problem, we introduce the virtual node technology to allocate a certain number of nodes to each server and divide them on the hash ring by the hash value of the nodes. In this way, we can allocate nodes according to the performance of the machine, more for good performance, less for bad, so as to achieve load balance.

Two. Implement a consistent hash algorithm.

  • With the overall idea in place, we started thinking about implementation details
  1. Hash algorithm selection
  • The FNV algorithm that can hash 32 bit integers is selected. Because the hash function may produce negative numbers, the absolute value should be taken.
  1. The requesting node looks for the corresponding server’s policy on the hash ring
  • The strategy is: the new node looks for the nearest node that is larger than it. For example, there is a ring [0, 5, 7, 10] and a node with hash value 6 comes in, then it should be processed by the server with hash value 7. If the hash calculated by the request node is greater than all the nodes on the ring, the first node is taken. So if we have an 11, we’re going to assign it to the node that corresponds to the zero.
  1. The organization of a hash ring
  • Initially, I thought about using a sequential storage structure, but in a consistent hash, the most frequent operation is to find the nearest number in the set that is larger than the target. The time complexity converges to O(N) for the sequential storage structure, and O(logN) for the tree structure.
  • But there are two sides to the coin. The cost of using a tree structure is less efficient data initialization, and less efficient if nodes are inserted or deleted during runtime. In reality, however, the server is basically unchanged from the time it is registered, and the frequency of machine additions, machine outages, machine fixes, and so on is trivial compared to node queries. So this case decided to use the tree structure storage.
  • The first thing that comes to mind to fit these requirements and provide ordered storage is a red-black tree, and an implementation of red-black trees is provided in JavaTreeMap.
  1. Mapping between virtual nodes and real nodes
  • It is also a problem to determine which virtual nodes correspond to the real ones. In theory, a table should be maintained to record the mapping between real and virtual nodes. This example is introduced to demonstrate the use of simple string processing. For example, if the server 192.168.0.1:8888 is assigned 1000 virtual nodes, its virtual node names range from 192.168.0.1:8888@1 to 192.168.0.1:8888@1000. Through this process, we only need to crop the string to find the real node through the virtual node.

  • Now that the plan is customized, let’s start the code

public class ConsistentHashTest {
    /** * List of servers. A total of three servers provide services. Virtual nodes will be allocated based on performance */
    public static String[] servers = {
            "192.168.0.1 # 100".// Server 1: performance index 100, will get 1000 virtual nodes
            "192.168.0.2 # 100".// Server 2: performance index 100, will get 1000 virtual nodes
            "192.168.0.3 # 30"   // Server 3: performance index 30, will get 300 virtual nodes
    };
    /** * A list of real servers, because the frequency of adding and deleting is higher than traversal, it is more cost-effective to use a linked list */
    private static List<String> realNodes = new LinkedList<>();
    /** * Virtual node list */
    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

    static{
        for(String s : servers){
            // Add the server to the real server list
            realNodes.add(s);
            String[] strs = s.split("#");
            // The server name, omitting the port number
            String name = strs[0];
            // Assign virtual nodes to each real server based on server performance and put the virtual nodes in the virtual node list.
            int virtualNodeNum = Integer.parseInt(strs[1]) * 10;
            for(int i = 1; i <= virtualNodeNum; i++){
                virtualNodes.put(FVNHash(name + "@" + i), name + "@"+ i); }}}public static void main(String[] args) {
        new Thread(new RequestProcess()).start();
    }

    static class RequestProcess implements Runnable{
        @Override
        public void run(a) {
            String client = null;
            while(true) {// Simulate to generate a request
                client = getN() + "." + getN() + "." + getN() + "." + getN() + ":" + (1000 + (int)(Math.random() * 9000));
                // Calculate the hash of the request
                int hash = FVNHash(client);
                // Determine which server will handle the request
                System.out.println(client + "The request will be made by" + getServer(client) + "Processing");
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) { e.printStackTrace(); }}}}private static String getServer(String client) {
        // Calculate the hash value requested by the client
        int hash = FVNHash(client);
        // Obtain the set of all maps greater than the hash value
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        // Find the first virtual node larger than this value. If there is no virtual node larger than this value, the first node is returned according to the hash loop.
        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
        // Get the name of the real node from the virtual node
        String virtualNodeName = virtualNodes.get(targetKey);
        String realNodeName = virtualNodeName.split("@") [0];
        return realNodeName;
    }

    public static int getN(a){
        return (int)(Math.random() * 128);
    }

    public static int FVNHash(String data){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for(int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash < 0? Math.abs(hash) : hash; }}/* Run result fragment 55.1.13.47:6240 will be processed by 192.168.0.1 5.49.56.126:1105 will be processed by 192.168.0.1 90.41.8.88:6884 will be processed by 192.168.0.2 The 26.107.104.81:2989 request will be processed by 192.168.0.2 the 114.66.6.56:8233 request will be processed by 192.168.0.1 the 123.74.52.94:5523 request will be processed by 192.168.0.1 A request of 104.59.60.2:7502 will be processed by 192.168.0.2 a request of 4.94.30.79:1299 will be processed by 192.168.0.1 a request of 10.44.37.73:9332 will be processed by 192.168.0.2 A request of 115.93.93.82:6333 will be processed by 192.168.0.2 a request of 15.24.97.66:9177 will be processed by 192.168.0.2 a request of 100.39.98.10:1023 will be processed by 192.168.0.2 A request of 61.118.87.26:5108 will be processed by 192.168.0.2 a request of 17.79.104.35:3901 will be processed by 192.168.0.1 a request of 95.36.5.25:8020 will be processed by 192.168.0.2 A request of 126.74.56.71:7792 will be handled by 192.168.0.2 a request of 14.63.56.45:8275 will be handled by 192.168.0.1 and a request of 58.53.44.71:2089 will be handled by 192.168.0.3 A request of 80.64.57.43:6144 will be processed by 192.168.0.2, a request of 46.65.4.18:7649 will be processed by 192.168.0.2, and a request of 57.35.27.62:9607 will be processed by 192.168.0.2 A request of 81.114.72.3:3444 will be processed by 192.168.0.1 a request of 38.18.61.26:6295 will be processed by 192.168.0.2 a request of 71.75.18.82:9686 will be processed by 192.168.0.2 A request of 26.11.98.111:3781 will be processed by 192.168.0.1 and a request of 62.86.23.37:8570 will be processed by 192.168.0.3 */
Copy the code

 

  • After the above test, we can see that the better performance server 1 and server 2 share most of the requests, only a small part of the requests fell to the poor performance server 3, the initial load balancing has been achieved.
  • Next, we’ll build a more realistic server cluster with ZooKeeper to see if the consistent hashing algorithm can still achieve load balancing when some servers go online or offline.

 

3. Build the environment with ZooKeeper

Environment introduction

  • A cluster of servers is simulated by starting multiple virtual machines, each of which provides the same interface for consumers to consume.
  • At the same time, a consumer thread will continuously make requests to the server cluster, and these requests will be load-balanced across the servers through a consistent hash algorithm.
  • In order to simulate the above scenario, we must maintain a list of servers on the client, so that the client can select the server to send using a consistent hash algorithm (in reality, the consistent hash algorithm might be implemented on the front-end server, where the client accesses the front-end server before routing to the back-end server cluster).
  • But our focus is to simulate the server going down and going online to see if the consistent hashing algorithm can still achieve load balancing. So the client must be able to sense changes on the server side and dynamically adjust its server list.
  • To do this, we introducezookeeper.zookeeperData consistency algorithm to ensure real-time, accurate data, the client can passzookeeperGet real-time server status.
  • The operation is as follows: The server cluster is first connected as a temporary nodezookeeperAnd, inzookeeperRegister your interface service (register node) on. Client connects tozookeeperThen add the registered nodes (servers) to the server list.
  • If a server goes down, that server node will go down from thezookeeperWhen the client detects that the server node changes, the client dynamically adjusts its server list and deletes the server that goes down from the server list. Therefore, the client does not send requests to the server and the load balancing task is handed over to the remaining servers.
  • When a server reconnects to the cluster, the client server list is updated and the hash ring is changed to provide load balancing.

Specific operations:

I. structures,zookeeperCluster environment:

  1. Create 3zookeeperServices, which constitute a cluster. In their respectivedataFolder to add onemyidFile, each id is1, 2, 3.
  2. Copy another configuration file and configure each configuration filezookeeperIn this case, there are three portszookeeperRespectively in2181, 2182, 2183port
  3. Start thezookeeperThe cluster

Since ZooKeeper is not the focus of this case, I won’t go into the details.

 

II. Create a server cluster to provide RPC remote invocation services

  1. First create a server project (using Maven), addzookeeperRely on
  2. Creates a constant interface to store connectionszookeeperThe information of
public interface Constant {
    // Address of the ZooKeeper cluster
    String ZK_HOST = "192.168.117.129:2181192168 117.129:2182192168 117.129:2183";
    // Timeout period for connecting to ZooKeeper
    int ZK_TIME_OUT = 5000;
    // The server publishes the remote service registered address in ZooKeeper, that is, this node holds the interface provided by each server
    String ZK_REGISTRY = "/provider";
    // Instant node where the SERVICE URL is registered in the ZooKeeper cluster
    String ZK_RMI = ZK_REGISTRY + "/rmi";
}
Copy the code

3. Encapsulate the ZooKeeper operation and publish the remote service interface for your own invocation. In this case, publish the remote service using the RMI package provided by Java itself

public class ServiceProvider {

    private CountDownLatch latch = new CountDownLatch(1);

    /** * Connect to the ZooKeeper cluster */
    public ZooKeeper connectToZK(a){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    // If connected, wake up the current thread.latch.countDown(); }}); latch.await();// The current thread is waiting while the connection is not connected
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zk;
    }

    /** * Create a znode@param zk
     * @paramData */ written to the URL node
    public void createNode(ZooKeeper zk, String url){
        try{
            // To convert the data written to a byte array
            byte[] data = url.getBytes();
            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch(Exception e) { e.printStackTrace(); }}/** * Publish rmI service */
    public String publishService(Remote remote, String host, int port){
        String url = null;
        try{
            LocateRegistry.createRegistry(port);
            url = "rmi://" + host + ":" + port + "/rmiService";
            Naming.bind(url, remote);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return url;
    }

    /** * Publish the RMI service and register the SERVICE URL in the ZooKeeper cluster */
    public void publish(Remote remote, String host, int port){
        // Call publishService to get the URL of the service
        String url = publishService(remote, host, port);
        if(null! = url){ ZooKeeper zk = connectToZK();// Connect to ZooKeeper
            if(null! = zk){ createNode(zk, url); }}}}Copy the code

4. Customize remote services. The service provides a simple method: the client sends a string, the server prefixes the string with Hello, and returns the string.

//UserService
public interface UserService extends Remote {
    public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {

    public UserServiceImpl(a) throws RemoteException{
        super(a); }@Override
    public String helloRmi(String name) throws RemoteException {
        return "Hello " + name + "!"; }}Copy the code

5. Change the port number and start multiple Java VMS to simulate server clusters. For demonstration purposes, customize ports 7777, 8888, and 9999 to start three server processes that simulate server downtime and recovery reconnection on port 7777.

public static void main(String[] args) throws RemoteException {
    // Create a tool class object
    ServiceProvider sp = new ServiceProvider();
    // Create the remote service object
    UserService userService = new UserServiceImpl();
    // The release is complete
    sp.publish(userService, "localhost".9999);
}
Copy the code

 

III. Write client program (use consistent hash algorithm to achieve load balancing

  1. Encapsulate the client interface.
public class ServiceConsumer {
    /** * List of servers that provide remote services. Only the URL of the remote service */ is recorded
    private volatile List<String> urls = new LinkedList<>();
    /** * Set of virtual nodes for the remote service */
    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

    public ServiceConsumer(a){
        ZooKeeper zk = connectToZK();// The client connects to ZooKeeper
        if(null! = zk){// Connect to zooKeeper and watch for node changes (server changes)watchNode(zk); }}private void watchNode(final ZooKeeper zk) {
        try{
            // Check whether the child node under the /provider node changes (whether the server is logged in or logged out)
            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    // If the server node changes, retrieve it again
                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
                        System.out.println("Server side changes, old servers may be down or new servers may be added to the cluster..."); watchNode(zk); }}});// Save the obtained server node data to the collection, that is, obtain the remote service access URL address
            List<String> dataList = new LinkedList<>();
            TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
            for(String nodeStr : nodeList){
                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false.null);
                // The url to put into the server list
                String url = new String(data);
                // Assign virtual nodes to each server. To facilitate simulation, the server with port 9999 enabled by default has poor performance. Allocate only 300 virtual nodes and 1000 others.
                if(url.contains("9999")) {for(int i = 1; i <= 300; i++){
                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@"+ i); }}else{
                    for(int i = 1; i <= 1000; i++){
                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
                    }
                }
                dataList.add(url);
            }
            urls = dataList;
            virtualNodes = newVirtualNodesList;
            dataList = null;// Let the garbage collector collect as soon as possible
            newVirtualNodesList = null;
        } catch(Exception e) { e.printStackTrace(); }}/** * Obtain the remote service object by URL */
    public <T> T lookUpService(String url){
        T remote = null;
        try{
            remote = (T)Naming.lookup(url);
        } catch (Exception e) {
            // If the url fails to connect, it is most likely that the server is down. In this case, use the first server URL in the server list to retrieve the remote object.
            if(e instanceof ConnectException){
                if(urls.size() ! =0){
                    url = urls.get(0);
                    returnlookUpService(url); }}}return remote;
    }

    /** * Use a consistent hash algorithm to select a URL and return a remote service object
    public <T extends Remote> T lookUp(a){
        T service = null;
        // Calculate a random hash value
        int hash = FVNHash(Math.random() * 10000 + "");
        // Obtain the set of all maps greater than the hash value
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        // Find the first virtual node larger than this value. If there is no virtual node larger than this value, the first node is returned according to the hash loop.
        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
        // Obtain the server URL through the virtual node
        String virtualNodeName = virtualNodes.get(targetKey);
        String url = virtualNodeName.split("@") [0];
        // Obtain the remote service object based on the server URL
        service = lookUpService(url);
        System.out.print("The address for providing this service is:" + url + ", return the result:");
        return service;
    }

    private CountDownLatch latch = new CountDownLatch(1);

    public ZooKeeper connectToZK(a){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    // Check whether the ZK cluster is connected
                    latch.countDown();// Wake up the current thread in the waiting state}}); latch.await();// The current thread is in the waiting state when the connection is not connected.
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }


    public static int FVNHash(String data){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for(int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash < 0? Math.abs(hash) : hash; }}Copy the code

2. Start the client and test it

public static void main(String[] args){
    ServiceConsumer sc = new ServiceConsumer();// Create a tool class object
    while(true) {// Obtain the RMI remote service object
        UserService userService = sc.lookUp();
        try{
            // Call the remote method
            String result = userService.helloRmi("Charcoal roasted oysters");
            System.out.println(result);
            Thread.sleep(100);
        }catch(Exception e){ e.printStackTrace(); }}}Copy the code

3. After the client runs up, print constantly on the display platform… The data will be analyzed below.

IV. Perform statistical analysis on the server call data

  • To review the simulation, you first started three servers at ports 7777, 8888, and 9999. Set the performance index of the two servers to 1000 for port 7777 and 8888, and set the performance index of the server for port 9999 to 300.
  • During the running of the client, I manually shut down the server of port 8888, and the client printed the server change information normally. Theoretically, there is no server that accesses port 8888. When I restart the server at port 8888, the client prints out the server change information, and the access can normally reach the server at port 8888.
  • In this section, the traffic volume of each server is counted to check whether the load is balanced.
  • The test procedure is as follows:
public class DataStatistics {
    private static float ReqToPort7777 = 0;
    private static float ReqToPort8888 = 0;
    private static float ReqToPort9999 = 0;

    public static void main(String[] args) {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("C://test.txt"));
            String line = null;
            while(null! = (line = br.readLine())){if(line.contains("7777")){
                    ReqToPort7777++;
                }else if(line.contains("8888")){
                    ReqToPort8888++;
                }else if(line.contains("9999")){
                    ReqToPort9999++;
                }else{
                    print(false);
                }
            }
            print(true);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(null! = br){try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                br = null; }}}private static void print(boolean isEnd){
        if(! isEnd){ System.out.println("------------- Server cluster changes -------------");
        }else{
            System.out.println("------------- last count -------------");
        }
        System.out.println("Interception from last server change to now:");
        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
        System.out.println("The number of visits to the server on port 7777 is:" + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
        System.out.println("The number of visits to the server on port 8888 is:" + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
        System.out.println(The number of visits to the server on port 9999 is: + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
        ReqToPort7777 = 0;
        ReqToPort8888 = 0;
        ReqToPort9999 = 0; }}/ * below is the output -- -- -- -- -- -- -- -- -- -- -- -- -- server cluster change -- -- -- -- -- -- -- -- -- -- -- -- -- capture changes since the last time the server to now: port 7777 server traffic as follows: 198.0, accounted for 0.4419643 8888 port server traffic is: 184.0, accounted for 0.4107143 9999 port server traffic as follows: 66.0, accounting for more than 0.14732143 -- -- -- -- -- -- -- -- -- -- -- -- -- server cluster change -- -- -- -- -- -- -- -- -- -- -- -- -- capture changes since the last time the server to now: The number of server visits for port 7777 is 510.0, accounting for 0.7589286. The number of server visits for port 8888 is 1.0, accounting for 0.0014880953. 161.0, accounting for 0.23958333 ------------- Last statistics ------------- Intercepted from the last server change to the present: The server access volume of port 7777 is: 410.0, accounting for 0.43248945 server access volume of port 8888 is: 398.0, 0.41983122 Port 9999 Server access: 140.0, 0.14767933 */
Copy the code

 

V. the results

  • As can be seen from the test data, whether before or after the downtime of port 8888 server, the access volume received by the three servers is in direct proportion to the performance index. The load balancing effect of consistent hash algorithm is verified successfully.

 

4. Expand your thinking

  • When I first saw consistent hashing algorithms, I was impressed by the strange way of thinking. But consistent hashing algorithms have a feature that other load balancing algorithms may not have in addition to their ability to load balance back end servers.
  • This feature is based on the hash function, and we know that with the hash function, a fixed input can produce a fixed output. In other words, the same request will be routed to the same server. This is great because we can combine consistent hashing algorithms with caching to provide backend server performance.
  • For example, in a distributed system, where a cluster of servers provides a way to query user information, each request will carry the user’s informationuidWe can use hash functions (as you can see from the above demo code, this can be easily achieved) to make the sameuidRoute to a specific server. So we can match it on the serveruidThe user information behind the cache, thereby reducing the operation of the database or other middleware, thereby improving the efficiency of the system.
  • Of course, if you use this strategy, you may have to consider caching updates and other operations, but as a good strategy, we can consider using it flexibly in appropriate situations.
  • These thoughts are inspired byDubboFour load balancing strategies are described in the framework.