The original address: www.xilidou.com/2018/01/22/…

In high concurrency systems, we often encounter such requirements: the system generates a large number of requests, but these requests are not high real-time requirements. We can combine those requests, and when we reach a certain number we submit them all. Maximize the utilization of system IO to improve system throughput performance.

So the request merge framework needs to consider the following two requirements:

  1. Submit data when a certain number of requests have been collected
  2. After a certain period of time if the number of requests did not reach the specified number also commit

Let’s talk about how to implement such a requirement.

Read this article and you will learn:

  • ScheduledThreadPoolExecutor
  • Blocking queue
  • Thread-safe parameters
  • The use of LockSuppor

Design ideas and implementation

So let’s talk a little bit about what’s the idea behind this. I hope you can learn to analyze problems and design some routines of modules.

  1. What underlying data structure is used to hold the requests that need to be merged?
    • Since our system is used in a high concurrency environment, we certainly can’t use ordinaryArrayListTo hold. We can use blocking queues to hold requests that need to be merged.
    • Our data structure needs to provide an add() method externally for submitting data. After the external add data, we need to check whether the number of data in the queue has reached our limit. Quantity reached submit data, not reached continue to wait.
    • The data structure also needs to provide a timeOut() method that is periodically called by an external timer and, if called, directly submits data to the remote.
    • When the condition is met, the thread performs the submission action; when the condition is not met, the thread should pause and wait for the queue to reach the condition of submitting data. So we can consider usingLockSuppor.park()andLockSuppor.unparkTo suspend and activate the action thread.

After the above analysis, we have this data structure:

private static class FlushThread<Item> implements Runnable{

        private final String name;

        // Queue size
        private final int bufferSize;
        // Operation interval
        private int flushInterval;

        // The time of the last submission.
        private volatile long lastFlushTime;
        private volatile Thread writer;

        // A blocking queue holding data
        private final BlockingQueue<Item> queue;

        // How to implement the conditions after they are reached
        private final Processor<Item> processor;

        // constructor
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        // The method of submitting data externally
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        // The timeout method provided to the external
        public void timeOut(a){
            // More than two commits exceed the time interval
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){ start(); }}// Unblock the thread
        private void start(a){
            LockSupport.unpark(writer);
        }

        // Whether the current data is greater than the submitted condition
        private void flushOnDemand(a){
            if(queue.size() >= bufferSize){ start(); }}// Execute the method to submit data
        public void flush(a){
            lastFlushTime = System.currentTimeMillis();
            List<Item> temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0) {try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e); }}}// Decide whether to commit according to the size and time interval of the data
        private boolean canFlush(a){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run(a) {
            writer = Thread.currentThread();
            writer.setName(name);

            while(! writer.isInterrupted()){while(! canFlush()){// If the thread is not interrupted and the condition for execution is not met, the thread is blocked
                    LockSupport.park(this); } flush(); }}}Copy the code
  1. How do you implement timed commits?

We usually meet timing relevant requirements, the first thought should be use FlushThread ScheduledThreadPoolExecutor timing to call a timeOut method, if you think of the Thread. The sleep ()… That needs to study hard again, read the source code.

  1. How to further improve system throughput?

The FlushThread we use implements Runnable, so we can consider using a thread pool to hold multiple Flushthreads.

So we have code like this:


public class Flusher<Item> {

    private final FlushThread<Item>[] flushThreads;

    private AtomicInteger index;

    // Prevent multiple threads from executing simultaneously. Add a random number interval
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {

        this.flushThreads = new FlushThread[threads];


        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            // Call the timeOut() method periodically.TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS); }}// mod index to ensure that all threads can be added
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1) {return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    // As described above
    private static class FlushThread<Item> implements Runnable{
        ...省略
    }
}

Copy the code
  1. Interface oriented programming to improve system expansibility:
public interface Processor<T> {
    void process(List<T> list);
}
Copy the code

use

Let’s write a test method to test:

// The Processor outputs the entire String
public class PrintOutProcessor implements Processor<String>{
    @Override
    public void process(List<String> list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush"); }}Copy the code

public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher<String> stringFlusher = new Flusher<>("test".5.1000.30.1.new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000); }}}Copy the code

Results of execution:


start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

Copy the code

We found that it did not take five digits to trigger flush. Flush was executed even though the required five numbers were not reached because a timeout commit was triggered.

If we remove Thread.sleep(1000); And look at the results:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
Copy the code

Every five numbers are submitted. Perfect…

conclusion

A more vivid example to explain some of the specific use of multithreading. Learning multithreading should be more thinking more hands-on, will have a better effect. I hope that after reading this article, you will have a harvest. Welcome to exchange.

Making address: github.com/diaozxin007…

Free hand lift frame series article address:

Hands-free framework – Implementing IoC

Hands-free framework – Implementing Aop