1 Exchanger introduction

Having covered CyclicBarrier, CountDownLatch, and Semaphore, I now cover the last Exchange in the concurrency utility class. The sano1100is a tool class used for collaboration between threads, and the sano1100Is used for data exchange between threads. It provides a synchronization point where two threads can exchange data. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait until the second thread executes the Exchange method. When both threads reach the synchronization point, the two threads can exchange data.

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner’s object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

In the above description, there are several key points:

  • External operations of this type are synchronous;
  • Used to exchange data between pairs of threads;
  • Think of it as a two-way synchronous queue;
  • It can be applied to genetic algorithm, pipeline design and other scenarios. Following the API documentation, this class provides a very concise external interface, a no-argument constructor, and two overloaded generic Exchange methods:

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
Copy the code

2 Exchanger instance


public class ExchangerTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final Exchanger exchanger = new Exchanger();
        executor.execute(new Runnable() {
            String data = "data1";

            @Override
            public void run(a) { doExchangeWork(data, exchanger); }}); executor.execute(new Runnable() {
            String data = "data2";

            @Override
            public void run(a) { doExchangeWork(data, exchanger); }}); executor.shutdown(); }private static void doExchangeWork(String data, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + "Putting the data together." + data + "Trade it out.");
            Thread.sleep((long) (Math.random() * 1000));

            String exchangeData = (String) exchanger.exchange(data);
            System.out.println(Thread.currentThread().getName() + "Exchange data." + exchangeData);
        } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

Pool-1-thread-1 is exchanging data data1 out of the pool. Pool-1-thread-2 is exchanging data data2 out of the pool. Pool-1-thread-2 is exchanging data data1 out of the poolCopy the code

When thread A calls the Exchange () method of an Exchange object, it blocks until thread B also calls the Exchange () method and then exchanges data in A thread-safe manner, after which thread A and THREAD B continue running.

Exchange wait timeout

public class ExchangerTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final Exchanger exchanger = new Exchanger();
        executor.execute(new Runnable() {
            String data = "data1";

            @Override
            public void run(a) { doExchangeWork(data, exchanger); }}); executor.execute(new Runnable() {
            String data = "data2";

            @Override
            public void run(a) {
                try {
                    Thread.sleep((long) (3000));
                } catch(InterruptedException e) { e.printStackTrace(); } doExchangeWork(data, exchanger); }}); executor.shutdown(); }private static void doExchangeWork(String data, Exchanger exchanger) {
        try {
            System.out.println(Thread.currentThread().getName() + "Putting the data together." + data + "Trade it out.");

            // Much less than 3 seconds to throw an exception
            String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread().getName() + "Exchange data." + exchangeData);
        } catch ( TimeoutException e) {
            e.printStackTrace();
        } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

pool-1-thread-1Is the data data1 swapped out Java. Util. Concurrent. TimeoutException at Java. Util. Concurrent. Exchanger. Exchange (Exchanger. Java:626)
    at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
    at ExchangerTest.access$000(ExchangerTest.java:3)
    at ExchangerTest$1.run(ExchangerTest.java:12)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2Is the data data2 swapped out Java. Util. Concurrent. TimeoutException at Java. Util. Concurrent. Exchanger. Exchange (Exchanger. Java:626)
    at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
    at ExchangerTest.access$000(ExchangerTest.java:3)
    at ExchangerTest$2.run(ExchangerTest.java:26)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Copy the code

Actual combat scenario Design a scheduled task, every morning to execute. Start two threads in the scheduled task. One thread is responsible for querying the statistics of the business detail table (xxx_INFO) and placing the statistics in the memory buffer. The other thread is responsible for reading the statistics in the buffer and inserting them into the business statistics table (xxx_statistics). Honey, doesn’t that sound sexy? That’s right! The two threads are exchanging data in batch in memory, which we can use sano11003 to do!

3 Implementation Principle

Sanoer is a tool class used for collaboration between threads. Used for data exchange between threads. It provides a synchronization point at which two threads can exchange data with each other. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait until the second thread executes the Exchange. When both threads reach the synchronization point, the two threads can exchange data and pass the data produced by each thread to the other. Therefore, the use of the exchange() method is important for pairs of threads to use, and when one pair reaches the synchronization point, data is exchanged. Thus the thread objects of the utility class are paired. The San_recovery class provides two methods: String exchange(VX): Used for exchange, which starts the exchange and waits for another thread to call the exchange. String Exchange (V x,long timeout,TimeUnit Unit) : For exchange, start the exchange and wait for another thread to call the exchange, and set the maximum wait time. When the wait time exceeds timeout, the wait is stopped.