IdGenerator zooKeeper series

IdGenerator, Multithreading test

Zookeeper series [Distributed lock]

Zookeeper: Master Election


The principle of

See: ZooKeeper implementation IdGenerator (Principle version)

Code implementation

public interface IGenerator {

	/** * Generator abstract method */
	long gen(a);
}
Copy the code
public class IdGenerator implements IGenerator {

	private ZkClient client; // Client hold
	private String nodePath; // In which directory to place the generated sequential nodes

	private ArrayBlockingQueue<String> removeQueue;

	public IdGenerator(ZkClient client, String basePath, ArrayBlockingQueue<String> removeQueue) {
		this.client = client;
		this.nodePath = basePath.concat("/");
		this.removeQueue = removeQueue;

		// When initializing, check to see if there is a base directory
		if(! client.exists(basePath)) { client.createPersistent(basePath,true); }}@Override
	public long gen(a) {
		// Create temporary nodes that disappear automatically when the ZooKeeper session closes
		String fullPath = client.createEphemeralSequential(nodePath, null);
		removeQueue.add(fullPath);// Add to the delete queue (if used formally, client sessions are not easily closed and need to be deleted by asynchronous threads)
		return Long.parseLong(fullPath.substring(fullPath.lastIndexOf("/") + 1));// Digital conversion}}Copy the code
public class TestClient {

	private static final int TOTAL_NUM = 10000;
	private static final int THREAD_NUM = 10;

	public static void main(String[] args) throws InterruptedException {
		// Delete the queue
		ArrayBlockingQueue<String> removeQueue = new ArrayBlockingQueue<>(100000);

		/ / thread pool
		ExecutorService executorService = new ThreadPoolExecutor(10.100.0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(200), new ThreadFactory() {

			private final AtomicInteger threadNum = new AtomicInteger(0);

			@Override
			public Thread newThread(Runnable runnable) {
				return new Thread(runnable, "pool-1-thread-"+ threadNum.incrementAndGet()); }}); CountDownLatch countDownLatch =new CountDownLatch(THREAD_NUM + 1);

		// Start the asynchronous deletion thread
		executorService.execute(new Runnable() {

			private ZkClient deleteClient = new ZkClient("localhost:2181,localhost:2182,localhost:2183".5000.5000.new SerializableSerializer());

			@Override
			public void run(a) {
				while (true) {// Just for demonstration
					String removeNode = removeQueue.poll();
					if (removeNode == null) {
						if (countDownLatch.getCount() == 1) {// Exit the judgment
							break;
						}
						try {
							Thread.sleep(100);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						continue; } deleteClient.delete(removeNode); } countDownLatch.countDown(); }});final long start = System.currentTimeMillis();
		for (int i = 0; i < THREAD_NUM; i++) {
			executorService.execute(new Runnable() {

				@Override
				public void run(a) {
					ZkClient client = new ZkClient("localhost:2181,localhost:2182,localhost:2183".5000.5000.new SerializableSerializer());
					IGenerator generator = new IdGenerator(client, "/mygen", removeQueue);
					for (int i = 0; i < TOTAL_NUM / THREAD_NUM; i++) { // Assign tasks evenly to each thread
						System.out.println(generator.gen());
					}
					client.close();// Close the ZK session
					countDownLatch.countDown();
					if (countDownLatch.getCount() == 1) {// Count time
						System.out.println("generator " + TOTAL_NUM + ", use time:"+ (System.currentTimeMillis() - start)); }}}); } countDownLatch.await();// Close the thread poolexecutorService.shutdown(); }}Copy the code