Click “like” to see, form a habit, wechat search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.

preface

  • Zookeeper features and node description
  • Using the Zookeeper client and working principles of clustering

The first two articles covered Zookeeper features, client usage, and clustering, because Zookeeper is a basic system that is common in distributed systems. What are the usage scenarios of ZooKeeper? See if you know some basic usage scenarios. But in fact, Zookeeper dug deep nature can ask very deep. This article will focus on the main usage scenarios of Zookeeper.

  1. Distributed Cluster Management
  2. Distributed registry
  3. A distributed JOB
  4. A distributed lock

Distributed Cluster Management

Requirements for distributed cluster management

  1. Proactively check online service nodes
  2. View the resource usage of service nodes
  3. Service Offline Notification
  4. Service resources (CPU, memory, hard disk) exceed the threshold

Architecture design

The node structure

  1. Niuh-manger // Root node
  2. Server00001: // Service node 1
  3. Server00002 :// Service node 2
  4. server…….. N :// Service node N

Service status information

  1. ip
  2. cpu
  3. memory
  4. disk

Function implementation

Data generation and reporting

  1. Create temporary nodes:
  2. Periodically change node status information:

Take the initiative to query

  1. Query ZooKeeper in real time to obtain cluster node status information.

Passive notice

  1. Monitor the changes of sub-nodes of the root node, and issue an alarm if hardware resources such as CPU are lower than the warning level.

Key sample code

package com.niuh.os; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.I0Itec.zkclient.ZkClient; import java.lang.instrument.Instrumentation; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.net.InetAddress; import java.net.UnknownHostException; public class Agent { private static Agent ourInstance = new Agent(); Private String server = "127.0.0.1:2181"; private ZkClient zkClient; private static final String rootPath = "/niuh-manger"; private static final String servicePath = rootPath + "/service"; private String nodePath; ///niuh-manger/service0000001 Current node path private Thread stateThread; public static Agent getInstance() { return ourInstance; } private Agent() {// JavaAgent data monitoring public static void premain(String args, Instrumentation instrumentation) { Agent.getInstance().init(); } public void init() { zkClient = new ZkClient(server, 5000, 10000); System.out.println("zk connection successful "+ server); // create the root node buildRoot(); // Create temporary createServerNode(); StateThread = new Thread(() -> {while (true) {updateServerNode(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }, "zk_stateThread"); stateThread.setDaemon(true); stateThread.start(); } // writeData to the current temporary node public void updateServerNode() {zkclient.writedata (nodePath, getOsInfo()); } / / generated service node public void createServerNode () {nodePath = zkClient. CreateEphemeralSequential (servicePath, getOsInfo ()); System.out.println(" create node :" + nodePath); } // Update service node status public String getOsInfo() {OsBean bean = new OsBean(); bean.lastUpdateTime = System.currentTimeMillis(); bean.ip = getLocalIp(); bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu(); MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024; bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024; bean.pid = ManagementFactory.getRuntimeMXBean().getName(); ObjectMapper mapper = new ObjectMapper(); try { return mapper.writeValueAsString(bean); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } public static String getLocalIp() { InetAddress addr = null; try { addr = InetAddress.getLocalHost(); } catch (UnknownHostException e) { throw new RuntimeException(e); } return addr.getHostAddress(); } public void buildRoot() { if (! zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); }}}Copy the code

Implementation effect

Start parameter Setting

Run the test case:

package com.niuh.test;

import com.niuh.os.Agent;
import org.junit.Ignore;
import org.junit.Test;

public class AgentTest {

    @Test
    @Ignore
    public void initTest(a) {
        Agent.premain(null.null);
        runCPU(2); / / occupied 20%
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch(InterruptedException e) { e.printStackTrace(); }}//
    private void runCPU(int count) {
        for (int i = 0; i < count; i++) {
            new Thread(() -> {
                while (true) {
                    long bac = 1000000;
                    bac = bac >> 1; } }).start(); ; }}}Copy the code

Console output:

CPU alarm... 22.55120088850181 CPU Alarm... 46.06592086097357 CPU Alarm... 47.87206766163349 CPU Alarm... 49.49176420213768 CPU Alarm... 48.967942479969004 CPU Alarm... 49.193921607021565 CPU Alarm... 48.806604284784676 CPU Alarm... 48.63229912951865 CPU Alarm... 49.34509647972038 CPU Alarm... 47.07551108884401 CPU Alarm... 49.18489236134496 CPU Alarm... 49.903007346777066 CPU Alarm... 49.28868795953268 // Close the test case service Offline :OsBean{IP ='192.168.43.11', CPU =49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842}Copy the code

This Demo is not applicable to production environments. The Demo involves zookeeper-agent and ZooKeeper-Web components. The source code is submitted at github: github.com/Niuh-Frame/…

Distributed registry

In monomer type service, usually by multiple clients to invoke a service, as long as the only service node configuration in the client address, after upgrading to a distributed service node more, like a wire service node is a consortium of thousands of many, so many nodes cannot be manually configured on the client side, here need an intermediate service, Designed to help clients discover service nodes, service discovery is often referred to in many technical books.

 

A complete registry covers the following features:

  • Service registration: Providers submit their self-provided services to the registry when they go online.
  • Service logout: Notifies the registry provider to go offline.
  • Service subscription: Dynamic real-time receipt of service change messages.
  • Reliable: The registration service itself is clustered and data is stored redundantly. Avoid single point of failure and data loss.
  • Fault tolerance: The registry is dynamically aware of and notifies clients of the status of a service provider in the event of downtime, power outages, etc.

Dubbo’s use of Zookeeper

Dubbo, ali’s famous open source project, is a Java-based RCP framework, in which the essential registry can be implemented based on a variety of third-party components, but Zookeeper is officially recommended as the registry service.

Dubbo Zookeeper registry storage structure

Nodes that

category attribute instructions
Root Persistent node Root node name, default is “dubbo”
Service Persistent node Service name, the full service class name
type Persistent node Optional values: Providers, consumers, Configurators, and routers
URL Temporary node The URL name contains the IP port and configuration information of the service provider.

The process that

  1. Service provider: when starting to dubbo/com. Foo BarService/will write your own URL directory
  2. Service consumers started: subscription/dubbo/com. Foo BarService/will provider URL address in the directory. And to dubbo/com. Foo. BarService/consumers directory into your URL address
  3. Monitoring center startup: subscription/dubbo/com. Foo BarService URL directory of all providers and consumers.

The sample Demo

Server code

package com.niuh.zk.dubbo;

import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.alibaba.dubbo.config.ServiceConfig;

import java.io.IOException;

public class Server {
    public void openServer(int port) {
        // Build the application
        ApplicationConfig config = new ApplicationConfig();
        config.setName("simple-app");

        // Communication protocol
        ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
        protocolConfig.setThreads(200);

        ServiceConfig<UserService> serviceConfig = new ServiceConfig();
        serviceConfig.setApplication(config);
        serviceConfig.setProtocol(protocolConfig);
        serviceConfig.setRegistry(new RegistryConfig(Zookeeper: / / "127.0.0.1:2181"));
        serviceConfig.setInterface(UserService.class);
        UserServiceImpl ref = new UserServiceImpl();
        serviceConfig.setRef(ref);
        // Start a business by offering services
        serviceConfig.export();
        System.out.println("Service started! Port:"+serviceConfig.getExportedUrls().get(0).getPort());
        ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
    }

    public static void main(String[] args) throws IOException {
        new Server().openServer(-1); System.in.read(); }}Copy the code

Client code

package com.niuh.zk.dubbo;

import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.alibaba.dubbo.config.RegistryConfig;

import java.io.IOException;

public class Client {
    UserService service;

    // URL Specifies the invocation address of the remote service
    public UserService buildService(String url) {
        ApplicationConfig config = new ApplicationConfig("young-app");
        // Build a reference object
        ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>();
        referenceConfig.setApplication(config);
        referenceConfig.setInterface(UserService.class);
        // referenceConfig.setUrl(url);
        referenceConfig.setRegistry(new RegistryConfig(Zookeeper: / / "127.0.0.1:2181"));
        referenceConfig.setTimeout(5000);
        / / transparent
        this.service = referenceConfig.get();
        return service;
    }

    static int i = 0;

    public static void main(String[] args) throws IOException {
        Client client1 = new Client();
        client1.buildService("");
        String cmd;
        while(! (cmd = read()).equals("exit")) { UserVo u = client1.service.getUser(Integer.parseInt(cmd)); System.out.println(u); }}private static String read(a) throws IOException {
        byte[] b = new byte[1024];
        int size = System.in.read(b);
        return new String(b, 0, size).trim(); }}Copy the code

Query the actual storage content of the ZK:

/dubbo
/dubbo/com.niuh.zk.dubbo.UserService
/dubbo/com.niuh.zk.dubbo.UserService/configurators
/dubbo/com.niuh.zk.dubbo.UserService/routers

/dubbo/com.niuh.zk.dubbo.UserService/providers
/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo:/ / 192.168.43.11:20880 / com. Niuh. Zk. Dubbo. UserService? Anyhost = true&application = simple - app&dubbo = 2.6.2 & generic = false&interface = com. Niuh. Zk. Dubbo. UserService&methods = getUser&pi d=48302&side=provider&threads=200&timestamp=1602057895881

/dubbo/com.niuh.zk.dubbo.UserService/consumers
/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer:/ / 192.168.43.11 com. Niuh. Zk. Dubbo. UserService? Application = young - app&category = consumers&check = false&dubbo = 2.6.2 & interface = com. Niuh. Zk. Dubbo. UserService&methods = getUser &pid=49036&side=consumer&timeout=5000&timestamp=1602075359549
Copy the code

Example Demo involves the component Zookeeper-Dubbo. The source code is submitted at github: github.com/Niuh-Frame/…

A distributed JOB

Distributed JOB requirements

  1. Multiple service nodes allow only one primary node to run the JOB task.
  2. If the primary node fails, the system automatically switches over to the primary node and continues JOB execution.

Architecture design

The node structure

  1. niuh-master
  2. server0001:master
  3. server0002:slave
  4. server000n:slave

The election process

Service startup:

  1. Create a server child node under niuh-maste with the value slave
  2. Get all child nodes under all niuh-master
  3. Check whether a master node exists
  4. If you do not set yourself as the master node

The child node deletion event is triggered:

  1. Get all child nodes under all niuh-master
  2. Check whether a master node exists
  3. If the minimum serial number is not set to master

The sample Demo

package com.niuh.zookeeper.master;

import org.I0Itec.zkclient.ZkClient;

import java.util.Map;
import java.util.stream.Collectors;

public class MasterResolve {
    private String server = "127.0.0.1:2181";
    private ZkClient zkClient;
    private static final String rootPath = "/niuh-master";
    private static final String servicePath = rootPath + "/service";
    private String nodePath;
    private volatile boolean master = false;
    private static MasterResolve resolve;

    private MasterResolve(a) {
        zkClient = new ZkClient(server, 2000.5000);
        buildRoot();
        createServerNode();
    }

    public static MasterResolve getInstance(a) {
        if (resolve == null) {
            resolve= new MasterResolve();
        }
        return resolve;
    }


    // Build the root node
    public void buildRoot(a) {
        if (!zkClient.exists(rootPath)) {
            zkClient.createPersistent(rootPath);
        }
    }

    // Create a server node
    public void createServerNode(a) {
        nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
        System.out.println("Create service node :" + nodePath);
        initMaster();
        initListener();
    }



    private void initMaster(a) {
        boolean existMaster = zkClient.getChildren(rootPath)
                .stream()
                .map(p -> rootPath + "/" + p)
                .map(p -> zkClient.readData(p))
                .anyMatch(d -> "master".equals(d));
        if(! existMaster) { doElection(); System.out.println("Currently elected master"); }}private void initListener(a) {
        zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
            doElection();// Execute the election
        });
    }
    // Execute the election
    public void doElection(a) {
        Map<String, Object> childData = zkClient.getChildren(rootPath)
                .stream()
                .map(p -> rootPath + "/" + p)
                .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
        if (childData.containsValue("master")) {
            return;
        }

        childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
            if (p.equals(nodePath)) { // Set the minimum serial number to master
                zkClient.writeData(nodePath, "master");
                master = true;
                System.out.println("Currently elected master"+ nodePath); }}); }public static boolean isMaster(a) {
        returngetInstance().master; }}Copy the code

Example Demo involves the zookeeper-master component. The source code submission is at Github.

A distributed lock

Basic concepts of locking

The concept of locking in development is not unfamiliar, through the lock can achieve multiple threads or multiple processes in the fight for resources, can reasonably allocate the ownership of resources. In a single application, locking can be achieved through synchronized or ReentrantLock. However, in distributed systems, merely adding synchronized is not enough, and a third component is needed to achieve it. For example, some simple methods are to use relational data row-level locks to achieve mutual exclusion between different processes, but the performance bottlenecks of large distributed systems are often concentrated in database operations. In order to improve performance, components such as Redis and Zookeeper are used to achieve distributed locking.

Shared lock: Also known as read-only lock, when one party obtains the shared lock, other parties can also obtain the shared lock. But it only allows reading. No other party can acquire a write lock until all shared locks are released.

Exclusive lock: Also known as read/write lock. After obtaining an exclusive lock, data can be read and written. The other party cannot acquire any lock until it is released.

To acquire the lock

For a bank account, the account information can be read at the same time, but the account data cannot be modified during the reading. The account ID is 888

Get the read lock process

  1. Create a temporary serial number read lock node based on the resource ID/lock/888.R0000000002 Read
  2. Obtain all child nodes under /lock and determine whether the smallest node is a read lock. If so, the lock is successful
  3. The smallest node is not a read lock and blocks wait. Add lock/ child node change listening.
  4. Step 2 is performed when node change listening is triggered

The data structure

Get write lock

  1. Create a temporary serial number write lock node based on the resource ID/lock/888.R0000000002 Write
  2. Obtain all child nodes under /lock and determine whether the smallest node is its own. If so, the lock is successful
  3. If the smallest node is not itself, it blocks and waits. Add lock/ child node change listening.
  4. Step 2 is performed when node change listening is triggered

Release the lock

After reading, delete the temporary node manually. If the node is down during the lock period, it will be automatically deleted after the session becomes invalid.

On herd behavior

During the waiting period, all waiting nodes are listening to the Lock node. Once the Lock node changes, all waiting nodes will be triggered, and then check the Lock child nodes at the same time. If the waiting example is too long, Zookeeper will bear heavy traffic pressure.

To improve the situation, you can use a listening linked list, where each waiting pair only listens on the previous node, and will only be notified if the previous node releases the lock. This creates a list of listeners.

The sample Demo

package com.niuh.zookeeper.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;
import java.util.stream.Collectors;

public class ZookeeperLock {
    private String server = "127.0.0.1:2181";
    private ZkClient zkClient;
    private static final String rootPath = "/niuh-lock1";

    public ZookeeperLock(a) {
        zkClient = new ZkClient(server, 5000.20000);
        buildRoot();
    }

    // Build the root node
    public void buildRoot(a) {
        if (!zkClient.exists(rootPath)) {
            zkClient.createPersistent(rootPath);
        }
    }
    / / acquiring a lock
    public Lock lock(String lockId, long timeout) {
        // Create a temporary node
        Lock lockNode = createLockNode(lockId);
        lockNode = tryActiveLock(lockNode);// Try activating the lock
        if(! lockNode.isActive()) {try {
                synchronized (lockNode) {
                    lockNode.wait(timeout); // Thread is locked}}catch (InterruptedException e) {
                throw newRuntimeException(e); }}if(! lockNode.isActive()) {throw new RuntimeException(" lock timeout");
        }
        return lockNode;
    }

    / / releases the lock
    public void unlock(Lock lock) {
        if(lock.isActive()) { zkClient.delete(lock.getPath()); }}// Try activating the lock
    private Lock tryActiveLock(Lock lockNode) {

        // Get all the children below the root node
        List<String> list = zkClient.getChildren(rootPath)
                .stream()
                .sorted()
                .map(p -> rootPath + "/" + p)
                .collect(Collectors.toList());      // Determine whether the current node is the smallest

        String firstNodePath = list.get(0);
        // The smallest node is the current node
        if (firstNodePath.equals(lockNode.getPath())) {
            lockNode.setActive(true);
        } else {
            String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
            zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {}@Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    // Event processing and heartbeat are in the same thread. If too much time is taken during debugging, this node will be deleted, which affects the locking logic.
                    System.out.println("Node Deletion :" + dataPath);
                     Lock lock = tryActiveLock(lockNode);
                    synchronized (lockNode) {
                        if (lock.isActive()) {
                            lockNode.notify(); / / released
                        }
                    }
                    zkClient.unsubscribeDataChanges(upNodePath, this); }}); }return lockNode;
    }


    public Lock createLockNode(String lockId) {
        String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
        return newLock(lockId, nodePath); }}Copy the code

Example Demo involves the component Zookeeper-lock. The source code is submitted at github: github.com/Niuh-Frame/…

Some pictures from the network, copyright to the original author, delete.Copy the code