Register services and discover

Definition: Microservice era, multiple same JAR packages on different servers to enable the same service, you can use Nginx to configure load balancing on the server side. ZooKeeper can also be used to configure load balancing on the client.

  1. Multiple service registration
  2. The client retrieves the middleware address set
  3. Select a service at random from the collection to perform the task

Server code

Use SpringBoot to complete a simple Web service, and connect to the ZK server to achieve the registration function. ProductController

@RestController
@RequestMapping("/product")
public class ProductController
{

 @RequestMapping("/getProduct/{id}")  public Object getProduct(HttpServletRequest request, @PathVariable("id") String id)  {  return new Product(id, "name:" + request.getLocalPort());  } } Copy the code

InitListener


public class InitListener implements ServletContextListener {

    @Value("${server.port}")
    private int port;
  @Override  public void contextInitialized(ServletContextEvent sce) { WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext()).getAutowireCapableBeanFactory().aut owireBean(this);  try {  String hostAddress = InetAddress.getLocalHost().getHostAddress();  ServiceRegister.register(hostAddress,port);  } catch (Exception e) {  e.printStackTrace();  }   }   @Override  public void contextDestroyed(ServletContextEvent sce) {   } } Copy the code

Product

public class Product {
    private  String id;

    private String name;

 public String getId(a) {  return id;  }   public void setId(String id) {  this.id = id;  }   public String getName(a) {  return name;  }   public void setName(String name) {  this.name = name;  }   public Product(String id, String name) {  this.id = id;  this.name = name;  }   public Product(a) {  } } Copy the code

ServiceRegister

public class ServiceRegister
{

 private static final String BASE_SERVICES = "/services";
 private static final String SERVICE_NAME = "/products";
  public static void register(String address, int port)  {  try  {  ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181".5000, (watchedEvent) -> {  });  Stat exists = zooKeeper.exists(BASE_SERVICES + SERVICE_NAME, false);  if (exists == null)  {  zooKeeper.create(BASE_SERVICES + SERVICE_NAME, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  }  String server_path = address + ":" + port;  zooKeeper.create(BASE_SERVICES + SERVICE_NAME + "/child", server_path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  } catch (Exception e)  {  e.printStackTrace();  }  }  } Copy the code

ProductApp

@SpringBootApplication
public class ProductApp
{
 public static void main(String[] args)
 {
 SpringApplication.run(ProductApp.class, args);  }   @Bean // Start automatically with the service  public ServletListenerRegistrationBean servletListenerRegistrationBean(a)  {  ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();  servletListenerRegistrationBean.setListener(new InitListener());  return servletListenerRegistrationBean;  } } Copy the code

application.properties

server.port=8080
Copy the code

You can change server.port such as 8080 and 8081 to register two services at the same time.

Business side code

OrderController

@RequestMapping("/order")
@RestController
public class OrderController
{

 @Resource  private RestTemplate restTemplate;   private LoadBalance loadBalance = new RamdomLoadBalance();   @RequestMapping("/getOrder/{id}")  public Object getOrder(@PathVariable("id") String id)  {  System.out.println("sowhat1412:" + loadBalance.choseServiceHost());  Product product = restTemplate.getForObject("http://" + loadBalance.choseServiceHost() + "/product/getProduct/1", Product.class);  return new Order(id, "orderName", product);  } } Copy the code

InitListener

public class InitListener implements ServletContextListener {

    private  static final String BASE_SERVICES = "/services";
    private static final String  SERVICE_NAME="/products";

 private ZooKeeper zooKeeper;  @Override  public void contextInitialized(ServletContextEvent sce) {  try {  zooKeeper = new ZooKeeper("127.0.0.1:2181".5000,(watchedEvent)->{  if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged && watchedEvent.getPath().equals(BASE_SERVICES+SERVICE_NAME)) {  updateServiceList();  }  });   updateServiceList();  } catch (Exception e) {  e.printStackTrace();  }   }   private void updateServiceList(a) {  try{  List<String> children = zooKeeper.getChildren(BASE_SERVICES + SERVICE_NAME, true);  List<String> newServerList = new ArrayList<String>();  for(String subNode:children) {  byte[] data = zooKeeper.getData(BASE_SERVICES + SERVICE_NAME + "/" + subNode, false.null);  String host = new String(data, "utf-8");  System.out.println("host:"+host);  newServerList.add(host);  }  LoadBalance.SERVICE_LIST = newServerList;  }catch (Exception e) {  e.printStackTrace();  }  }   @Override  public void contextDestroyed(ServletContextEvent sce) {   } } Copy the code

Order

public class Order {
    private String id;
    private  String name;
    private Product product;

 public String getId(a) {  return id;  }   public void setId(String id) {  this.id = id;  }   public String getName(a) {  return name;  }   public void setName(String name) {  this.name = name;  }   public Product getProduct(a) {  return product;  }   public void setProduct(Product product) {  this.product = product;  }   public Order(String id, String name, Product product) {  this.id = id;  this.name = name;  this.product = product;  }   public Order(a) {  } } Copy the code

Product

public class Product {
    private  String id;

    private String name;

 public String getId(a) {  return id;  }   public void setId(String id) {  this.id = id;  }   public String getName(a) {  return name;  }   public void setName(String name) {  this.name = name;  }   public Product(String id, String name) {  this.id = id;  this.name = name;  }   public Product(a) {  } } Copy the code

LoadBalance

public abstract class LoadBalance
{
 public volatile static List<String> SERVICE_LIST;
 // Note that the IP pool is volatile

 public abstract String choseServiceHost(a); } Copy the code

public class RamdomLoadBalance extends LoadBalance
{
 @Override
 public String choseServiceHost(a)  {  String result = "";  if(! CollectionUtils.isEmpty(SERVICE_LIST)) {  int index = new Random().nextInt(SERVICE_LIST.size());  result = SERVICE_LIST.get(index);  }  return result;  } } Copy the code

A distributed lock

Tasks can operate the resource only by competing for locks (① Competing locks). When a task is updating a resource (② holding the lock), no other task can operate on the resource (③ blocking the task) until the task completes the update (④ releasing the lock).

  1. You need it in a multitasking environment
  2. All tasks need to write to the same shared resource.
  3. Access to resources is mutually exclusive

Previous multithreading is a JVM cluster on JUC programming, can be implemented with SYN,Lock, AtomicInteger. But it is not available when in a cluster because there are multiple JVMS.

The old way

/ * * * @author sowhat
 * @createIn 2020-06-10 he* /
public class OrderService implements Runnable {  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();  private static CountDownLatch countDownLatch = new CountDownLatch(50);  private static List<String> result = new Vector<>();    @Override  public void run(a)  {  countDownLatch.countDown();  result.add(orderNumGenerator.getNumber_byLock());  }   public static void main(String[] args) throws InterruptedException  {  for (int i = 0; i < 50; i++)  {  new Thread(new OrderService()).start();  }  countDownLatch.await();  Thread.sleep(1000);  Collections.sort(result);  result.forEach(s -> System.out.println(s));  } }  class OrderNumGenerator {  public static int count = 0;   public String getNumber(a)  {  SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");  return sim.format(new Date()) + "-" + (++count);  }   public static Object obj = new Object();   public String getNumber_syn(a)  {  synchronized (obj)  {  SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");  return sim.format(new Date()) + "-" + (++count);  }   }   private Lock lock = new ReentrantLock();   public String getNumber_byLock(a)  {  try  {  lock.lock();  SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");  return sim.format(new Date()) + "-" + (++count);  } finally  {  lock.unlock();  }  } } Copy the code

Zookeeper uses distributed locks on nodes with the same name

core idea: There is a unique temporary node in ZK, only those who get the node can manipulate the data, not have to wait.disadvantages: may triggerHerd behaviourImmediately after the first one was used up, 999 concurrent requests were made to ZK for the lock.

package cn.sowhat;


import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
 import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch;   class OrderNumGenerator {  public static int count = 0;   public String getNumber(a)  {  SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");  return sim.format(new Date()) + "-" + (++count);  }  }  public class ZkLock implements Runnable {  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();  private Lock lock = new ZookeeperDistrbuteLock();   @Override  public void run(a)  {  getNumber();  }   public void getNumber(a)  {  try  {  lock.getLock();  String number = orderNumGenerator.getNumber();  System.out.println(Thread.currentThread().getName() + ", generate ID:" + number);   } catch (Exception e)  {  e.printStackTrace();  } finally  {  lock.unLock();  }  }   public static void main(String[] args)  {  for (int i = 0; i < 50; i++)  {  new Thread(new ZkLock()).start();  }  } }  interface Lock {  public void getLock(a);   public void unLock(a); }  abstract class AbstrackLock implements Lock {  @Override  public void getLock(a)  {  if (tryLock())  {  System.out.println("Obtain lock resource");  } else  {  waitLock();  getLock();  }  }   public abstract boolean tryLock(a);   public abstract void waitLock(a); }  abstract class ZookeeperAbstractLock extends AbstrackLock {  private static final String CONN = "127.0.0.1:2181";  protected ZkClient zkClient = new ZkClient(CONN);  protected static final String PATH = "/lock";  protected static final String PATH2 = "/lock2"; }  class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {   private CountDownLatch countDownLatch = null;   @Override  public boolean tryLock(a)  {  try  {  zkClient.createEphemeral(PATH);  // The lock is successfully created  return true;  } catch (Exception e)  {  return false;  }  }   @Override  public void waitLock(a)  {  IZkDataListener iZkDataListener = new IZkDataListener()  {  @Override  public void handleDataDeleted(String path) throws Exception  {// Wake up the waiting thread  if(countDownLatch ! =null)  {  countDownLatch.countDown();  }  }   @Override  public void handleDataChange(String s, Object data) throws Exception  {  }  };  // Register event monitoring data  zkClient.subscribeDataChanges(PATH, iZkDataListener);  if (zkClient.exists(PATH))  { // If the thread is already in use, wait. When a deletion occurs, iZkDataListener is called  countDownLatch = new CountDownLatch(1);  try  {  countDownLatch.await();  } catch (Exception e)  {  e.printStackTrace();  }  }   zkClient.unsubscribeDataChanges(PATH, iZkDataListener);  // Get the lock needed to delete the listener   }   @Override  public void getLock(a)  {  super.getLock();  }   @Override  public void unLock(a)  {  if(zkClient ! =null)  {  zkClient.delete(PATH);  zkClient.close();   }  }  } Copy the code

High performance distributed lock

Idea: very simple, do not grab every operation in ZKLine up, it’s your turn to manipulate data. This effectively avoids concurrency.

package cn.sowhat;


import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
 import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch;   class OrderNumGenerator {  public static int count = 0;   public String getNumber(a)  {  return ++count + "";  }  }  public class ZkLock implements Runnable {  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();  private Lock lock = new ZookeeperDistrbuteLock2();   @Override  public void run(a)  {  getNumber();  }   public void getNumber(a)  {  try  {  lock.getLock();  String number = orderNumGenerator.getNumber();  System.out.println(Thread.currentThread().getName() + ", generate ID:" + number);   } catch (Exception e)  {  e.printStackTrace();  } finally  {  lock.unLock();  }  }   public static void main(String[] args)  {  for (int i = 0; i < 50; i++)  {  new Thread(new ZkLock()).start();  }  } }  interface Lock {  public void getLock(a);   public void unLock(a); }  abstract class AbstrackLock implements Lock {  @Override  public void getLock(a)  {  if (tryLock())  {  System.out.println("Obtain lock resource");  } else  {  waitLock();  getLock();  }  }   public abstract boolean tryLock(a);   public abstract void waitLock(a); }  abstract class ZookeeperAbstractLock extends AbstrackLock {  private static final String CONN = "127.0.0.1:2181";  protected ZkClient zkClient = new ZkClient(CONN);  protected static final String PATH = "/lock";  protected static final String PATH2 = "/lock2"; }  class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock {   private CountDownLatch countDownLatch = null;  private String beforePath;  private String currentPath;   public ZookeeperDistrbuteLock2(a)  {  if (!this.zkClient.exists(PATH2))  {  this.zkClient.createPersistent(PATH2);  // Create a persistent node  }  }   @Override  public boolean tryLock(a)  {  // If currentPath is empty, the lock is first attempted  if (currentPath == null || currentPath.length() <= 0)  {  // Create a temporary order node  currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/"."lock");  }  // Get all temporary nodes and sort them. The name of the temporary node is an incremented string such as 0000000000400  List<String> children = this.zkClient.getChildren(PATH2);  Collections.sort(children);  if (currentPath.equals(PATH2 + '/' + children.get(0)))  {  return true;  // If the current node is the first of all nodes to acquire the lock  } else  {  int wz = Collections.binarySearch(children, currentPath.substring(7));  beforePath = PATH2 + '/' + children.get(wz - 1);  }  return false;  }   @Override  public void waitLock(a)  {  IZkDataListener iZkDataListener = new IZkDataListener()  {  @Override  public void handleDataChange(String s, Object o) throws Exception  {   }   @Override  public void handleDataDeleted(String s) throws Exception  {  if(countDownLatch ! =null) { countDownLatch.countDown();  }  }  };  // Add the delete watcher to the first node, essentially starting another thread to listen on the first node  this.zkClient.subscribeDataChanges(beforePath,iZkDataListener);  if(this.zkClient.exists(beforePath)){  countDownLatch = new CountDownLatch(1);  try{countDownLatch.await(); }catch(Exception e){  e.printStackTrace();  }  }  this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener);  }   @Override  public void getLock(a)  {  super.getLock();  }   @Override  public void unLock(a)  {  zkClient.delete(currentPath);  zkClient.close();  }  } Copy the code

The cluster of election

Idea: Just like distributed lock, it is a simplified version of distributed lock.

package cn.sowhat.order;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletListenerRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Timer;  import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener;    / * * * @author sowhat  * @createThe 2020-06-10 18:55* /  class IsMaster {  public static boolean isSurvival; }  @RestController class IndexController {  @RequestMapping("/getserverinfo")  public String getServerInfo(a)  {  return IsMaster.isSurvival ? "is Master" : "is slave";  } }  class InitListener implements ServletContextListener {   ZkClient zkClient = new ZkClient("127.0.0.1:2181");  private String path = "/election";   @Value("${server.port}")  private String serverPort;   private void init(a)  {  System.out.println("Project started and completed");  createEphemeral();  zkClient.subscribeDataChanges(path, new IZkDataListener()  {  @Override  public void handleDataChange(String s, Object o) throws Exception  {   }   @Override  public void handleDataDeleted(String s) throws Exception  {  System.out.println("Master node is down. New election.");  Thread.sleep(5000);  createEphemeral();   }  });   }   private void createEphemeral(a)  {  try  {  zkClient.createEphemeral(path, serverPort);  IsMaster.isSurvival = true;  } catch (Exception e)  {  IsMaster.isSurvival = false;  }  }   @Override  public void contextInitialized(ServletContextEvent servletContextEvent)  {  init();  }   @Override  public void contextDestroyed(ServletContextEvent servletContextEvent)  {   } }  @SpringBootApplication public class selectMaster {  public static void main(String[] args)  {  SpringApplication.run(selectMaster.class,args);  }  @Bean  public ServletListenerRegistrationBean servletListenerRegistrationBean(a){  ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();  servletListenerRegistrationBean.setListener(new InitListener());  return servletListenerRegistrationBean;  }  } Copy the code

Configuration center

For example, Mybatis accesses mysql through URL, and we change the URL dynamically from accessing DATABASE A to accessing database B. The core listens to the dataChange.

ZK Precautions

  1. Zk data and log cleaning

The dataDir directory and the dataLogDir directory can become large over time. Use zkclient. sh of zK to save the latest n files. ZkClient. Sh – n 15 configuration autopurge. SnapRetainCount and autopurge purgeInterval cooperate to use two parameters;

  1. Too many connections

The default maximum number of connections is 60. Set the maxClientCnxns parameter to the maximum number of connections created on a single client.

  1. Disk management

Disk I/O performance directly affects the zooKeeper update operation speed. To improve ZK write performance, it is recommended that you use a separate disk and be careful about the Jvm heap memory Settings.

  1. Disk management number of clusters

The number of machines in the cluster is not always better, a write operation requires more than half of the nodes ack, so the more nodes in the cluster, the more nodes can withstand hangs (more reliable), but the worse the throughput. The number of clusters must be odd;

  1. Disk management number of clusters

The ZK implements memory-based read and write operations and sometimes broadcasts messages. Therefore, it is not recommended to read or write data with large capacity on nodes.

This article is formatted using MDNICE