This is the 8th day of my participation in Gwen Challenge

1. Introduction

We have already introduced SyclicBarrier, CountDownLatch and Semaphore for concurrent programming, and the last recovery is still available. Sanoer is a tool class used for data exchange and collaboration between threads. It provides a synchronization point at which pairs of threads can exchange data with each other. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait for the second thread to execute the Exchange method. When both threads reach the synchronization point, the two threads can exchange data and pass the data produced by each thread to the other.

2. Simple applications

Looking up the sano1100source code, we found that there are only three public methods available for us.

  1. Exchanger(): no-parameter constructor
  2. exchange(V):exchangeMethods are used to interact with data
  3. exchange(V,long,TimeUnit): Delays data exchange

The source code is concise, but its design concept is relatively complex. CyclicBarrier, CountDownLatch through AbstractQueuedSynchronized “tipping point” of the state field, the Exchanger is how to realize the “tipping point” judgment?

2.1 Example Demonstration

Create a thread for internal data exchange.

public class TestJava {
	private static final Exchanger<String> exgr = new Exchanger<String>();

	public static void main(String[] args) {
		new Thread((new Runnable() {
			@Override
			public void run(a) {
				try {
					String A = A "data";
					String B = exgr.exchange(A);
					System.out.println(Thread.currentThread().getName() + ": A input:"
							+ A + ",B input is: + B);
				} catch (InterruptedException e) {
				}
			}
		})).start();

		new Thread((new Runnable() {
			@Override
			public void run(a) {
				try {
					String B = Data "B";// B input bank statement data
					String A = exgr.exchange(B);
					System.out.println(Thread.currentThread().getName() +  ": A input:"
							+ A + ",B input is: + B);
				} catch(InterruptedException e) { } } })).start(); }}Copy the code

Running results:

Thread-0: A input data A,B input data B Thread-1: A is data A,B is data BCopy the code

The exchange method is called in both threads to exchange data, and when the second thread calls the Exchange method, the data is exchanged.

2. Source code analysis

Saner is a synchronizer used to exchange data between pairs of threads. It is mainly used for pairwise exchange and comparison of genetic algorithm and pipeline communication.

Internal structure:

private static final class Node extends AtomicReference<Object> {  
    /** The data to exchange provided by the thread that created the node. * /  
    public final Object item;  
    /** The thread waiting to wake up */  
    public volatile Thread waiter;  
    /** 
     * Creates node with given item and empty hole. 
     * @param item the item 
     */  
    public Node(Object item) {  
        this.item = item; }}/** * A Slot is a place where a pair of threads exchange data. * The Slot is filled with cache rows to avoid pseudo-sharing problems. * Although some space is wasted due to padding, slots are created on demand and are generally fine. * /  
private static final class Slot extends AtomicReference<Object> {  
    // Improve likelihood of isolation on <= 64 byte cache lines  
    long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;  
}  
  
/** * Slot array, initialized only when needed. * Volatile, because it is safe to build with double lock detection. * /  
private volatile Slot[] arena = new Slot[CAPACITY];  
/** * The capacity of arena(Slot array). Set this value to avoid contention. * /  
private static final int CAPACITY = 32;  
/** * The maximum subscript of the slot being used. When a thread experiences multiple CAS contention, the value of * increases; This value is decremented when a thread spins out. * /  
private final AtomicInteger max = new AtomicInteger();  
Copy the code

A non-parametric constructor used to create a sanodomain instance. The internal structure is clear. First, it contains a Slot array with a default size of 32 to avoid contention, similar to the ConcurrentHashMap strategy. Second, data is exchanged in slots, which are filled with cache lines to avoid pseudo-sharing problems. Finally, each thread exchanging data is internally represented by a Node.

Pseudo-sharing: Assuming that two independent attributes a and B of a class are contiguous in memory addresses (such as the first and last Pointers of a FIFO queue), they are usually loaded into the same CPU cache line. In concurrent cases, if one thread changes a, the entire cache line (including B) will be invalidated, and then another thread will need to load B from memory again. In this case, although a and B appear to be independent, they will interfere with each other and affect performance greatly.

Key Technical Point 1:CacheLinefill

In the code above, Slot is essentially an AtomicReference containing q0, Q1,.. Qd, those variables, they’re redundant, they’re not needed. So why add these extra variables? So that different slots do not fall into the same CacheLine of the CPU. When the CPU reads data from memory, instead of reading it byte by byte, it reads it by block, which is a CacheLine, usually 64 bytes. Ensure that the size of one Slot is greater than or equal to 64 bytes, so that changing one Slot does not invalidate the CPU cache of the other Slot, thereby improving performance.

From the previous example, we know that the core of the Recovery class is the Exchange method.

/** * Wait for another thread to call the Exchange method to reach the "tipping point" unless the current thread is interrupted. * If another thread is already waiting (the Exchange method has been called), that thread is woken up and receives data from *. The current thread immediately returns to exchange data. * If no other thread calls the Exchange method, the current thread is unavailable unless: * 1. Another thread is calling the Exchange method. * 2. The current thread is interrupted *@paramX exchanges data *@returnData exchanged by another thread *@throws InterruptedException if the current thread was
 *         interrupted while waiting
 */
public V exchange(V x) throws InterruptedException {
    if(! Thread.interrupted()) { Object v = doExchange((x ==null)? NULL_ITEM : x,false.0);
        if (v == NULL_ITEM)
            return null;
        if(v ! = CANCEL)return (V)v;
        Thread.interrupted(); // Clear interrupt status on IE throw
    }
    throw new InterruptedException();
}
Copy the code

If the current thread is not interrupted, the doExchange method is called for data exchange.

private Object doExchange(Object item, boolean timed, long nanos) {
    Node me = new Node(item);                 // Create a Node by storing the current data
    int index = hashIndex();                  // Index position of the current Slot
    int fails = 0;                            // Number of CAS operation failures

    for (;;) {
        Object y;                             
        // Current Slot value
        Slot slot = arena[index];
        /** Lazy initialization. If the current slot is null, a new slot */ will be created in the current index
        if (slot == null)
            createSlot(index);                // Continue the loop
        else if((y = slot.get()) ! =null &&  // If the current slot value is not NULL and y does not change, the current slot segment is set to NULL.
                 slot.compareAndSet(y, null)) {
            Node you = (Node)y;               // If the current node is null, the CAS operation assigns the new value.
            if (you.compareAndSet(null, item)) {
                LockSupport.unpark(you.waiter);
                return you.item;
            }                                 // Else cancelled; continue
        }
        else if (y == null &&                 // If the value stored in the current slot is null, and the CAS operation successfully assigned me.
                 slot.compareAndSet(null, me)) {
            // If index==0, join the queue and wait to exchange with others. That is, the position marked 0 is always waiting for others to exchange
            if (index == 0)                  
                return timed ?
                    awaitNanos(me, slot, nanos) :
                    await(me, slot);
            Object v = spinWait(me, slot);    // If the current index! = 0
            /** CANCEL*/
            if(v ! = CANCEL)return v;
            me = new Node(item);              // Throw away cancelled node
            int m = max.get();
            if (m > (index >>>= 1))           // If the current position is not 0, index/2 keeps shrinking until the swap value is found.
                max.compareAndSet(m, m - 1);  // Maybe shrink table
        }
        else if (++fails > 1) {               // Allow 2 fails on 1st slot
            int m = max.get();
            if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                index = m + 1;                // Grow on 3rd failed slot
            else if (--index < 0)
                index = m;                    // Circularly traverse}}}Copy the code

So, exchange’s idea is:

  1. According to each threadthread id.hashFigure out where you areslot index;
  2. With any luck, thisslotBe possessed (slotThere is anode), and someone is waiting to exchange, exchange with it;
  3. slotIs empty,slotThere is nonode), keep it for yourself, and wait for the exchange. No one to swap, move forward, put the currentslotThe contents are cancelled,indexCut it in half and see if there’s an exchange;
  4. Moved to0At this position, there’s no interaction yet, so just block and wait. All the other threads are going to keep moving until0This position right here.

So 0 is the “end point” of a transaction! I can’t find anyone else to trade, so I’m going to end up at 0.

So far, the whole logic is finished.

Refer to the reading

  • Blog.csdn.net/chunlongyu/…
  • Brokendreams.iteye.com/blog/225395…