I am an kangarooking, not my brother’s brother. An Internet small cochlea “GUa” cattle, sincerely to share experience and technical dry goods, I hope my article is helpful to you. Follow me and make progress every day ❗ ❗ ❗

This period is full of dry goods, pass by, do not miss ah.

Set up the halomon array

Recently, the epidemic situation in Shenzhen has become more and more serious. Some buildings near our company have been closed. There are a lot of people who are isolated in the company, look at their chat records, it’s so sad, queue up to wash their hair every day, wake up and go to work, then have a good night after work… . Then our company is ok, see this situation directly organized all staff telecommuting. Now is the fourth day of telecommuting, telecommuting is really sweet, every morning can sleep to go to work, have a meal on time at noon, eat a meal directly lie on the bed to sleep, thief comfortable, basic on time off work (ha ha ha). But online meetings become more and more, in fact, I think the job of programmer is to work remotely, all our work is carried out on the Internet, and now online communication is very convenient. I wish we could make it possible for all programmers to work remotely. I don’t know when that will happen.

I hope the epidemic will turn around and we will win!

This article involves knowledge points:

  • Thread pools
  • Closing the thread pool
  • Select the number of threads to perform different types of tasks
  • redis pipeline
  • Functional programming

background

On this day, I was on duty as usual, and I was taking a shit with pay. I was watching the operation of a small super Dream god with relish. Suddenly, wechat information constantly flashed out on the top of the mobile phone.

In fact, it is a scheduled task to synchronize database data to Redis. Due to the large amount of data, it needs paging query and then synchronization.

This is the old code to be optimized, the logic is to do a while infinite loop, pageNum+1 after each query, until the query out of the list is empty directly return. The logic of saveOrUpdateToRedis method is to hmset a user’s data to Redis after a series of processing and transformation.

long taskStart = System.currentTimeMillis();
        int pageSize = 1000;
        int pageNum = 1;
        while (true) {
            long start = System.currentTimeMillis();
            logger.info("Start processing no." + pageNum + "Page data...");
            PageDto page = PageUtil.setPage(pageNum, pageSize);
    
            long cxStart = System.currentTimeMillis();
            List<UserDto> userDtoList = userDao.selectUser(page.getBeginNum(), page.getEndNum());
            long cxEnd = System.currentTimeMillis();
            logger.info("Query user time:" + (cxEnd - cxStart) + "毫秒");

            if (CollectionUtils.isEmpty(userDtoList)) {
                logger.info("Each time {} person, no data is returned directly for the {} batch.", pageSize, pageNum);
                long taskEnd = System.currentTimeMillis();
                logger.info("Task completed, total time: {} ms", taskEnd - taskStart);
                return;
            }
            logger.info("Batch {} data is being executed", pageNum);
            for (UserDto user : userDtoList) {
                String xxx = user.getXXX();
                if(! StringUtils.isEmpty(xxx)) {// Save the logic to redissaveOrUpdateToRedis(user, xxx, xxxx); }}long end = System.currentTimeMillis();
            logger.info("Batch {} completed, time: {} ms", pageNum, (end - start));
            pageNum++;
        }
Copy the code
private void saveOrUpdateToRedis(xxx,xxx,xxx) {

        / /... It omits a bunch of messy data processing and transformation details
        // Finally store the hash structure in redis
        redisUtils.hmset(xxx,xxx,time);
    }
Copy the code

The logic is simple, but this timed task would take about an hour to execute in a production environment. Of course, one factor is the large amount of data (millions of bytes), but we’ll start with the code.

Requirements: small change, significantly improve the execution efficiency, stable requirements, small impact.

journey

I first analyze

To optimize a code’s execution efficiency, I first need to know what the main factors are that affect efficiency. And then targeted to optimize, optimize the big head first, and then go to see those details.

I’ll just do it

After determining the first step, I directly found the log of the production execution of this scheduled task and analyzed the log. I found that the most time-consuming part was in the paging SQL query, which took 3 seconds on average, while the other parts took milliseconds. This to SQL optimization ah, so I began to SQL optimization, connect to the database I stupid. I am familiar with mysql, but how to tune oracle SQL, I guess it is because of the deep paging. However, it takes about the same time to query the start and end positions of the logs. I was still in Baidu inside the continuous travel, during the consideration of whether there is no set index, whether the efficiency of paging is low, and then check the SQL execution plan, toss and toss for more than half an hour, constantly modify SQL statements, and efficiency test. In the end, I found that the efficiency of my optimized SQL was about the same as the original, and I concluded that the SQL was difficult to optimize (or maybe I was not good enough).

Can’t go, change an optimization Angle

SQL optimization of the way to go, then change another Angle to optimize, before the old task using a single thread to execute. If I use a thread pool to do a multithreading now, it’s not going to take off immediately.

Code porter

Code too lazy to knock, directly open before looking at open source framework – Shenyu source code, copy a thread pool over (I don’t write code, I just code porters, ha ha ha) change ba change ba. Consider setting up some attributes and traceability for the thread, and move its ThreadFactory too.

Then I wondered whether to put the thread pool in a member variable or in a method. The final decision is to put it in the method, because the thread pool is needed to run the synchronization task, and the synchronization task ends by closing the thread pool, freeing resources.

ThreadFactory allows you to define thread groups, thread names, daemons, thread priorities, etc.

public final class XxlThreadFactory implements ThreadFactory {

    private static final AtomicLong THREAD_NUMBER = new AtomicLong(1);

    private static final ThreadGroup THREAD_GROUP = new ThreadGroup("xxl");

    private final boolean daemon;

    private final String namePrefix;

    private final int priority;

    private XxlThreadFactory(final String namePrefix, final boolean daemon, final int priority) {
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.priority = priority;
    }

    /**
     * create custom thread factory.
     *
     * @param namePrefix prefix
     * @param daemon     daemon
     * @return {@linkplain ThreadFactory}
     */
    public static ThreadFactory create(final String namePrefix, final boolean daemon) {
        return create(namePrefix, daemon, Thread.NORM_PRIORITY);
    }

    /**
     * create custom thread factory.
     *
     * @param namePrefix prefix
     * @param daemon     daemon
     * @param priority   priority
     * @return {@linkplain ThreadFactory}
     */
    public static ThreadFactory create(final String namePrefix, final boolean daemon, final int priority) {
        return new XxlThreadFactory(namePrefix, daemon, priority);
    }

    @Override
    public Thread newThread(final Runnable runnable) {
        Thread thread = new Thread(THREAD_GROUP, runnable,
                THREAD_GROUP.getName() + "-" + namePrefix + "-" + THREAD_NUMBER.getAndIncrement());
        thread.setDaemon(daemon);
        thread.setPriority(priority);

        return thread;
    }
Copy the code

Thread pool: I will be the core number of threads and the maximum number of threads is set to the same, the number of threads depends on the participation or according to the number of CPU cores to determine, because the synchronization task is IO intensive, so if there is no incoming custom thread parameters, then take about 2 times the number of CPU cores value is the number of threads (of course, eventually want to take a test after the best). Set the blocking queue size to 600 based on the total number of tasks. Custom reject policy: when the thread pool queue is full, let the main thread enter a one-second loop to determine whether the current queue has consumed more than half of the tasks, and then let the main thread break out of the loop to continue submitting tasks.

The method of obtaining CPU cores is also learned from the source code of ConcurrentHashMap. You know the benefits of looking at the source code, other not to mention, but will let you become a better porter 0.0

private ExecutorService getThreadPool(String param) {
        int poolSizeParam = 0;
        if (org.apache.commons.lang3.StringUtils.isNotBlank(param) && param.matches("[0-9] +")) {
            poolSizeParam = Integer.parseInt(param);
        }
        // Determine the number of core threads in the thread pool and the maximum number of threads based on the passed argument or the number of CPU cores in the current system
        int poolSize = poolSizeParam > 0 && poolSizeParam < NCPU * 3 + 3 ? poolSizeParam : NCPU <= 2 ? NCPU * 2 + 1 : NCPU * 2 - 1;
        ThreadPoolExecutor jobThreadPool = new ThreadPoolExecutor(poolSize, poolSize,
                10L, TimeUnit.SECONDS, queue,
                XxlThreadFactory.create("refreshCollectJob".false),
                When the queue is full, the main thread waits in a loop
                The main thread does not exit the loop until the number of tasks in the queue is less than half
                (r, e) -> {
                    if(! e.isShutdown()) {for(; ;) { ThreadUtils.sleep(1000);
                            log.info("Queue full, main thread stopped submitting tasks!!");
                            if (queue.size() < (QUEUE_SIZE / 2)) {
                                log.info("Block queue elements halfway, continue to submit tasks to the thread pool!!");
                                break; }}}}); log.info("Thread pool initialization completed, system CPU ={} core threads and maximum threads ={}, blocking queue length ={}", NCPU, poolSize, QUEUE_SIZE);
        return jobThreadPool;
Copy the code

Submit a task

Submitted tasks refer to tasks submitted to a thread pool for execution.

The thread pool is done, so the next step is to submit tasks to the thread pool. At the beginning, I confirmed the following way: the main thread circularly paged the database and submitted a task for every page of data queried. The task executed by multi-threading is stored in Redis circularly after some column conversion processing according to the data obtained from each page. Behind but suddenly thought of in a production environment read a page of data takes up to 3 seconds, a task executes only need about 20 ms, so there will be just submit a task is one of the threads in the pool of 20 ms performed waiting, 3 seconds before submit the next task, appear in short supply situation, such basic no efficiency increases.

The optimized way: paging query is also put into the task, the task performed by multi-threading is based on the current pageNum and pageSize, query the current page data, the data for a series of processing conversion cycle storage to Redis. The main thread for submitting tasks uses a for loop to control the number of submitted tasks based on the total number of pages.

The optimized code looks like this:

 public void kangarooking(a){
        int pageSize = 1000;
        // Get the thread pool
        ExecutorService executor = getThreadPool();
        // Record the current page count
        AtomicInteger pageNumAtomic = new AtomicInteger(1);
        // Record the number of failed synchronization tasks
        AtomicInteger failCount = new AtomicInteger(0);
        // Query the total number of records
        int totalCount = userDao.selectUserCount();
        // Calculate the total number of tasks (i.e. total number of pages)
        int taskNum = (totalCount + pageSize - 1) / pageSize;
        logger.info("Total number of entries to the thread pool ={} Total number of entries ={}", totalCount, taskNum);
        long taskStart = System.currentTimeMillis();
        for (int i = 0; i < taskNum; i++) {
            // Use thread pools to perform multiple tasks in parallel
            executor.execute(() -> {
                int pageNum = pageNumAtomic.getAndIncrement();
                String threadName = Thread.currentThread().getName();
                try {
                    long start = System.currentTimeMillis();
                    logger.info("Thread: {} start processing data on page {}...", threadName, pageNum);
                    PageDto page = PageUtil.setPage(pageNum, pageSize);
                    List<UserDto> userDtoList = userCollectDao.selectUser(page.getBeginNum(), page.getEndNum());
                    if (CollectionUtils.isEmpty(userDtoList)) {
                        logger.info("Each time {} person, no data is returned directly for the {} batch.", pageSize, pageNum);
                        return;
                    }
                    long mid = System.currentTimeMillis();
                    logger.info("Thread: batch {} is executing, search time ={} ms", threadName, pageNum, mid - start);
                    // Use Redis pipeline to send multiple instructions at once to reduce IO time
                    redisUtils.hmsetPipeline(() -> {
                        Map<String, Map<String, Object>> resultMap = new HashMap<>();
                        for (UserDto user : userDtoList) {
                            String xxx = userCollect.getXXX();
                            if(! StringUtils.isEmpty(xxx)) {// Assemble the map dataconvertMap(resultMap, user, xxx, xxxx); }}return resultMap;
                    }, REDIS_EXPIRES);
                    long end = System.currentTimeMillis();
                    logger.info("Thread: {} batch {} is refreshed, pipeline time ={} ms", threadName, pageNum, end - start);
                } catch (Exception e) {
                    logger.error("Thread: {} failed to refresh batch {}", threadName, pageNum, e); failCount.incrementAndGet(); }}); } executor.shutdown();try {
            logger.info("All tasks submitted, waiting for thread to execute remaining tasks...");
            // The main thread stops the loop and waits for all threads to complete
            executor.awaitTermination(60, TimeUnit.MINUTES);
            long taskEnd = System.currentTimeMillis();
            logger.info("All tasks completed, the thread pool exits. Total number of tasks: {} Number of successful tasks: {} Number of failed tasks: {} Total time: {} ms", taskNum, taskNum - failCount.get(), failCount.get(), (taskEnd - taskStart));
        } catch (InterruptedException e) {
            logger.error("Thread pool exit failed", e); }}Copy the code

The advantage of this design is that I can now submit tasks without waiting for database paging queries to take time (because after optimization, paging queries have been moved into a block of code that is being executed by multiple threads). Because it was the for loop that submitted the task, the total number of submitted tasks was determined by looking up the total number of pages beforehand. The effect is that the main thread can complete all tasks within a second and then assign tasks to all threads in the thread pool. This is equivalent to multi-page data execution, greatly improving processing efficiency.

redis pipeline

I believe you have also seen the code I use a hmsetPipeline(); methods

Redis itself is based on the Request/Response protocol. In normal cases, the client sends a command and waits for Redis to reply. After receiving the command, Redis responds. In this case, if a large number of commands need to be executed at the same Time, it is to wait for the reply of the last command before executing it. In this case, not only RTT (Round Time Trip) is added, but also system IO is frequently called and network requests are sent.

This refers to the way old code calls, executing hmset commands in a loop, wasting time with a request and response every time.

To improve efficiency, pipelines emerged, which allow clients to send multiple commands at once without waiting for the results of the previous command execution, similar to the network’s Nagel algorithm (TCP_NODELAY option). Not only does it reduce RTT, but it also reduces the number of IO calls (which involve switching from user to kernel)

Therefore, I chose to use Redis Pipeline to package the commands in a loop and then send them uniformly to Redis for processing. In this way, multiple RTTS are reduced to one RTT, thus improving efficiency and saving resources.

The following is a concrete implementation of the hmsetPipeline method. RedisTemplate provides the REDis Pipeline API, so I directly use the API call here (you can go to see the source code, in fact, is the Jedis API package).

Redis pipeline set Map<K,Map<K,V>> data *@paramSupplier functional programming *@paramTime Sets the expiration time of this batch of keys */
    public void hmsetPipeline(Supplier<Map<K, V>> supplier, long time) {
        redisTemplate.executePipelined(new SessionCallback<Integer>() {
            @Override
            public <K, V> Integer execute(RedisOperations<K, V> operations) throws DataAccessException {
                @SuppressWarnings("unchecked")
                Map<K, V> kMapMap = (Map<K, V>) supplier.get();
                for(Map.Entry<K, V> stringMapEntry : kMapMap.entrySet()) { K key = stringMapEntry.getKey(); V value = stringMapEntry.getValue(); operations.opsForHash().putAll(key, (Map<? ,? >) value);if (time > 0) { operations.expire(key, time, TimeUnit.SECONDS); }}return null; }}); }Copy the code

Functional programming

The above entry Supplier

> Supplier implements functional programming.

Java8 new introduction of functional programming, greatly improve coding efficiency. Let’s be clear about the concept of functional interfaces; It is an interface that has one and only abstract method, usually using the @functionalinterface annotation to indicate that an interface is a FunctionalInterface. Functional interfaces are the foundation of Java’s support for functional programming.

The @functionalinterface annotation is not required to be a FunctionalInterface, as long as it satisfies the interface condition of having one and only one abstract method.

Common functional interfaces are Supplier, Function, Runable, Comparable… You can also customize a functional interface as long as there is only one abstract interface as described above.

To understand how to use a functional interface, see the Supplier interface anonymous inner class:

And then I’ll write it as a function:

    this.redisURISupplier = ()-> {
       AtomicReference<RedisURI> uriFieldReference = new AtomicReference<>();
       RedisURI uri = uriFieldReference.get();
       if(uri ! =null) {
          return uri;
       }
       uri = RedisURI.class.cast(new DirectFieldAccessor(client).getPropertyValue("redisURI"));
       return uriFieldReference.compareAndSet(null, uri) ? uri : uriFieldReference.get();
    };
Copy the code

This is equivalent to creating an implementation class for Supplier. The method inside the {} parentheses is the only abstract method –get(); The concrete implementation of the method. RedisURISupplier. (Because of this, the compiler assumes that the code inside the {} brackets is the only implementation of the abstract method. If you have more than one abstract method, an error will be reported at compile time because it doesn’t know which abstract method you want to implement.)

Custom implementation

Let’s implement a functional interface ourselves:

/** ** here@FunctionalInterfaceIt simply checks and identifies whether the current interface is a functional interface@param <T>
 */
@FunctionalInterface
public interface MyFunction<T> {
    T test1(String name, Integer age);
}
Copy the code

Error compiling if there are two abstract methods:

It’s easier to see how functional writing works when you compare the use of anonymous inner classes.

public class FunctionClient {
    public static String get(MyFunction<String> function, String name, Integer age) {
        // omit the XXXX logic
        return function.test1(name, age);
    }

    public static void main(String[] args) {
        // Here (name, age) is equivalent to overriding the parameter to the test1 method
        get((name, age) -> {
            System.out.println("I'm functional, I'm an implementation class of MyFunction interface.");
            System.out.println("Here is the overriding method for myfunction. test1");
            return "Return value";
        }, "kangarooking".24);

        / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /

        get(new MyFunction<String>() {
            @Override
            public String test1(String name, Integer age) {
                System.out.println("I'm an anonymous inner class, I'm also an implementation of MyFunction interface.");
                System.out.println("Here is the overriding method for myfunction. test1");
                return "Return value"; }},"kangarooking".20);

        /************* or there are small partners do not understand, see the following ***************/

        MyFunction<String> myFunction = new MyFunction<String>() {
            @Override
            public String test1(String name, Integer age) {
                System.out.println("I'm an anonymous inner class, I'm also an implementation of MyFunction interface.");
                System.out.println("Here is the overriding method for myfunction. test1");
                return "Return value"; }}; get(myFunction,"kangarooking".20); }}Copy the code

Another extension tip, I wonder if you will see the following when you look at the framework, or the implementation class of a company interface:

The beginning of anonymous indicates the implementation of an anonymous inner class. Above ()->{} represents a functional implementation. If you don’t believe me, you can find a functional interface.

The thread pool shutdown

When I finished with the optimization point, I thought I wanted to count how long it took for the entire thread pool to execute, and to close the thread pool when all the threads had finished playing. My mind directly emerged before the back of the eight-part essay to understand the thread pool shutdown, I immediately opened baidu review again. Hum ~ so ga, I know, keep going. After thinking about it for a while, I wrote this code inside the optimized code:

    executor.shutdown();
    try {
        logger.info("All tasks submitted, waiting for thread to execute remaining tasks...");
        // The main thread stops the loop and waits for all threads to complete
        executor.awaitTermination(60, TimeUnit.MINUTES);
        long taskEnd = System.currentTimeMillis();
        logger.info("All tasks completed, the thread pool exits. Total number of tasks: {} Number of successful tasks: {} Number of failed tasks: {} Total time: {} ms", taskNum, taskNum - failCount.get(), failCount.get(), (taskEnd - taskStart));
    } catch (InterruptedException e) {
        logger.error("Thread pool exit failed", e);
    }
Copy the code

executor.shutdown(); Instead of closing the thread pool immediately, it waits for all tasks to complete before closing the thread pool. executor.awaitTermination(60, TimeUnit.MINUTES); The purpose of this method is to block the main thread and wait for all threads to complete, in order to count the execution time of the entire task. If 60 is set to wait more than 60 seconds, the main thread will run straight down and no longer wait. For details, see executor.shutdown(); And executor. ShutdownNow (); Or wait for me to write an article someday.

The results show

At this point, the synchronization task is almost optimized. Let’s look at a rough flow chart:

Look at the efficiency before and after: 16 seconds for the old task

Optimized in 800 milliseconds

This is about 20 times faster than the original when the amount of data and task is small, and it is estimated that the speed will be 30 to 60 times faster than the original speed when running online. I’ll come back to you later on.

If you have any questions, see you in the comments section of the wechat official account “Mr. Kangaroo’s Inn”. If you think my sharing is helpful to you, or if you think I have some skills, support me as a budding writer. Like 👍 and follow ❤️ share 👥