This is part 6 of Java Multithreading:

Java Multithreading – Where do Threads come from?

Java Multithreading – Memory Model

Java Multithreading — Volatile

Java Multithreading — Locking

Java Multithreading — CAS

——————————————————————–

Concurrent programs can be programmed in the same way as serial programs.

The only difficulty is that there is uncertainty in concurrent programs, which can cause far more errors than in serial programs, and there are no fixed rules for how they can occur. So how to test, as much as possible to expose these problems, and understand their performance bottlenecks, this article for these problems to make a brief summary.

This article mainly includes the following contents:

1. Classification of concurrent tests

2. Correctness test

3. Safety testing

4. Performance test

The test process

Concurrent testing and serial testing have the same parts, for example, both need to test its correctness in serial case, which is the basis for ensuring subsequent testing.

Of course, the correctness test is no different from our serial test, which is to ensure that the program executed in a single thread has the same result as the serial execution, which we will not state.

For general concurrent testing, we follow the following process.

classification

Concurrent testing can be broadly divided into two categories: security testing and active testing.

Security testing can be defined as “nothing wrong happens” or it can be understood as consistency. I++ operation, for example, but in a single-threaded cases cycle 20 times, I = 20, but in a multi-threaded cases, if the total cycle 20 times, the result not to 20, then the result is wrong, the wrong thread safety problems.

When we test this problem, we must add a “test point” to ensure atomicity without affecting the correctness of the program. Execute the test code based on this judgment. We’ll talk more about how to do “test point” later.

Active testing is defined as “a certain good behavior will eventually happen”, which can also be understood as the inevitable result of the program running. There will not be a method blocking, slow running, or thread deadlock, resulting in a waiting state.

Related to activity testing is performance testing. The main measures are throughput, responsiveness, and scalability.

Throughput: The percentage of completed tasks in a set of concurrent tasks. Or the number of tasks completed in a given period of time.

Responsiveness: The time between a request being sent and its completion.

Scalability: Increased throughput as more resources (CPU, IO, memory) are added.

Security testing, as stated earlier, “does nothing wrong”, is also testing for errors that can be caused by their data contention. This is where we need to find concurrent “test points” in a feature and build some additional tests on it. And ideally these tests do not require any synchronization mechanism.

Let’s use an example to illustrate.

With ArrayBlockingQueue, we know that this list is implemented in a production-consumption mode using a bounded blocking queue. If you’re testing for concurrency problems, it’s important to test the PUT and take methods, and an effective way to do this is to check whether elements put into and out of the queue are equal.

If there is a data security problem, the value that is bound to be in the queue does not correspond to the value that is out of the queue, and the result may be different. For example, in the case of multi-threading, we compare the checks of all the in-column elements with those of the out-column elements, and if they are equal, the test is successful.

To ensure that it can test all the points, it needs to randomly generate the values of the team so that the results of each test are not the same. In addition, in order to ensure its fairness, it is necessary to ensure that all threads start operation at the same time, to prevent the advanced program from serial operation.

public class PutTakeTest {

    protected static final ExecutorService pool = Executors.newCachedThreadPool();

// fence, which allows a group of threads to wait until a certain state before they all execute simultaneously

    protected CyclicBarrier barrier;

    protected final ArrayBlockingQueue<Integer> bb ;

    protected final int nTrials , nPairs;

// Total of columns

    protected final AtomicInteger putSum = new AtomicInteger(0) ;

// the sum of the columns

    protected final AtomicInteger takeSum = new AtomicInteger(0) ;

    public static void main(String[] args) throws Exception {

new PutTakeTest(10 , 10, 100000).test() ; // 10 bearer factors, 10 threads, run 100000

        pool.shutdown() ;

    }

    public PutTakeTest(int capacity, int npairs, int ntrials) {

        this.bb = new ArrayBlockingQueue<Integer>(capacity);

        this.nTrials = ntrials;

        this.nPairs = npairs;

        this.barrier = new CyclicBarrier(npairs * 2 + 1) ;

    }

    void test() {

        try {

            for (int i = 0; i < nPairs ; i++) {

                pool.execute( new Producer());

                pool.execute(new Consumer()) ;

            }

barrier.await() ; // Wait for all threads to be ready

barrier.await() ; // Wait for all threads to complete

Println (“result, put==take :”+(putsum.get ()== takesum.get ())));

        } catch (Exception e) {

            throw new RuntimeException(e) ;

        }

    }

    static int xorShift( int y) {

        y ^= (y << 6);

        y ^= (y >>> 21) ;

        y ^= (y << 7) ;

        return y;

    }

/ / producer

    class Producer implements Runnable {

        public void run() {

            try {

                int seed = (this.hashCode() ^ (int) System.nanoTime()) ;

                int sum = 0;

                barrier.await();

                for (int i = nTrials; i > 0; –i) {

                    bb.put(seed) ;

                    sum += seed;

                    seed = xorShift(seed);

                }

                putSum.getAndAdd(sum) ;

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }

/ / consumer

    class Consumer implements Runnable {

        public void run() {

            try {

                barrier.await() ;

                int sum = 0;

                for (int i = nTrials; i > 0; –i) {

                    sum += bb.take() ;

                }

                takeSum.getAndAdd(sum) ;

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }

}

In the above program, we added putSum and takeSum variables to count the check sum of PUT and take data. Also use a CyclicBarrier so that all threads start executing from the same place at the same time. The entry data of each thread, to ensure its uniqueness, generates a unique seed, which must be the place where multiple threads compete when executed in the following code.

    for (int i = nTrials; i > 0; –i) {

         bb.put(seed);

                    sum += seed;

                    seed = xorShift(seed);

       }

If there is a thread safety problem, then the data taken and put must be different, and the values of putSum and takeSum must be different, and vice versa.

Since most errors in concurrent code are low-probability events, it is necessary to test repeatedly to improve the probability of finding errors.

Performance testing is usually an extension of functional testing. While there may be some overlap between performance and functional testing, their goals are different.

First, performance tests need to reflect the actual use of the object being tested in the application and its throughput. In addition, we need to adjust different limits according to the empirical values, such as the number of threads, concurrency, etc., so that the program can run better on the system.

We extend the above PutTakeTest to add the following functionality:

1. To ensure time accuracy, add a run time for a set of operations.

A BarrierTimer is used to maintain a single group run time. It implements Runnable, which is called once after the number of CyclicBarrier groups has been counted to set the end time.

We use it to record the total time of a group. So we have the total time, and we can calculate the time for a single operation. This allows us to calculate the throughput of individual tests.

The following is a fence based timer.

public class BarrierTimer implements Runnable{

    private boolean started ;

    private long startTime , endTime;

    public synchronized void run() {

        long t = System.nanoTime() ;

if (! started) {

            started = true;

            startTime = t;

        } else

            endTime = t;

    }

    public synchronized void clear() {

        started = false;

    }

    public synchronized long getTime() {

        return endTime – startTime;

    }

}

2. Performance test needs to be carried out for different parameter combinations.

The combination test is performed with different parameters to obtain throughput rates under different parameters and scalability under different number of threads. In putTakeTest, we only test for security.

The ArrayBlockingQueue has a capacity of 1, 2, 4, 8, 16, 32, 64, and 128 threads, respectively. The ArrayBlockingQueue has a capacity of 1, 2, 4, 8, 16, 32, 64, and 128 threads.

    public static void main(String[] args) throws Exception {

        int tpt = 100000 ; // trials per thread

        for (int cap = 1; cap <= 1000; cap *= 10) {

            System.out.println( “Capacity: ” + cap);

            for (int pairs = 1; pairs <= 128 ; pairs *= 2) {

                TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);

                System.out.print(“Pairs: ” + pairs + “\t”) ;

                t.test();

                System.out.print(“\t “);

                Thread.sleep(1000);

                t.test();

                System.out.println();

                Thread.sleep(1000);

            }

        }

        PutTakeTest.pool.shutdown() ;

    }

Here are the results of our performance test on ArrayBlockingQueue. My hardware environment is:

CPU: I7 4 cores and 8 threads

Memory: 16 g

Hard disk: SSD110G

The JDK environment

Java version “1.8.0 comes with _45”

Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

The following points can be seen from above

If ArrayBlockingQueue has a cache capacity of 1, no amount of linear concurrency will significantly increase its throughput. This is because each thread is blocking and waiting for another thread to perform a task.

2, When trying to increase the cache capacity to 10, 100, 1000, the throughput rate has been greatly improved, especially at 1000, up to 900W times /s.

3. Throughput peaks as the number of threads increases to 16, and then drops again, not dramatically, because as the number of threads increases, most of the time is spent blocking and unblocking.

Comparison of other blocking queues

Below is a horizontal review of several blocking queues: ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDeQue, and PriorityBlockingQueue. The hardware environment is the same as above. The JDK still uses the 1.8 API.

The cache capacity of each queue is 1000. Then check the throughput of threads 1, 2, 4, 8, 16, 32, 64, and 128 at the same time.

From the above data, we can see that:

ArrayBlockingQueue performs better than LinkedBlockingQueue under jdk1.8 optimization, although not by much. Before 1.6, ArrayBlockingQueue performed better than LinkedBlockingQueue.

The performance of PriorityBlockingQueue continues to decline after it reaches the peak of 290W. This is because the priority queue requires constant optimization of the priority list, which requires a certain amount of sorting time.

The main purpose of the above tests is to test the constraints that affect the overall throughput as producers and consumers send data through bounded PUTS and takes. So there are a lot of practical factors that can be ignored. In addition, due to jit’s dynamic compilation, the compiled code will be directly compiled into machine code.

Therefore, the above tests need to be warmed up and run more times to ensure that all the code is compiled before the test run time is counted.

Testing the correctness of concurrent programs can be particularly difficult because many failures of concurrent programs are low-probability events and are sensitive to execution timing, load conditions, and other conditions that are difficult to reproduce.

In order to find these errors as much as possible, we need to do more work to analyze and test them. I hope today’s introduction can help you broaden your mind.

Reference:

A weekly technical dry goods, adhere to the original, pay attention to game research and development, scan more attention