Today’s article will share the knowledge related to concurrent programming in Java. Although there are many articles like this, this article will be presented in a more realistic way. In particular, the common concurrent programming methods and means of Java will be summarized, so that we can better perceive the effect of Java concurrent programming from the perspective of use, so as to pave the way for a deeper understanding of Java concurrent mechanism.

Java Multithreading Overview

Using multithreading in Java is an important means to improve the concurrent response ability of programs, but it is also a double-edged sword; If used improperly, it is also easy to cause errors in the program, and it is difficult to find problems intuitively. This is because: 1) Thread running itself is scheduled by the operating system, with a certain degree of randomness; 2) Java shared memory model is easy to produce thread safety problems in multi-threaded environment; 3) Unreasonable encapsulation dependence, very easy to cause the release of the object inadvertently escape.

Therefore, to use the sword of multithreading, you need to have a deep understanding of the Java memory model, thread safety issues. However, due to Java’s rich ecosystem, the scenarios that require us to do our own concurrent processing are mostly shielded by various frameworks or components. This is the main reason why many Java developers are not aware of concurrent programming.

First of all, understand the core problem of using multithreaded programming from the perspective of Java memory model, as shown in the figure below:

As shown in the figure above, the most frequently used items in the Java memory model for user programs are heap memory and stack memory, where heap memory mainly holds objects and arrays, such as instances generated by new(). Stack memory is mainly used to store local variables, operands, method exits and other information needed to run a method.

Thread the heap memory is Shared, a class is instantiated generated objects, and objects are defined in the member variables can be Shared among multiple threads access, this sharing mainly reflects in multiple threads at the same time, the same object instance of a method, the method of operation will be the object of member variables, respectively, in the form of multiple copies of copies to the method in the stack, Instead of directly modifying the value of an object’s member variable in heap memory; After the thread operation is complete, the modified variable value is again synchronized to the main memory address in heap memory and made visible to other threads.

While this process may seem smooth, it takes at least six atomic steps to complete in the JVM, as shown in the following figure:

As shown in the figure above, a thread changes a member variable of an object in heap memory in approximately six steps, without considering locking the shared variable:

1. Read: Reads the variable to be operated on from the heap; Load: copies read variables into thread stack memory. 3. Use: passes variable values in stack memory to the execution engine; Assign: Assigns the results from the execution engine to variables in stack memory; 5. Store: Transfer variable values in the changed stack memory to the main memory; 6, write (write) : change the variable value in the main memory, the new value is visible to all threads;

Thus, each thread can operate on the same shared variable in parallel by following these steps. As you can imagine, without any synchronization measures, the value of this shared variable can become erratic in a multi-threaded environment, making it difficult to get the final correct result. This is the so-called thread safety issue, and it’s one of the most important issues to focus on when using multithreaded programming!

Use of thread pools

In the actual scene, the use of multithreading is not alone, ** threads as precious system resources, its creation and destruction need to consume a certain amount of system resources; ** And creating thread resources without limit will also cause system resources to be exhausted. Therefore, to reuse thread resources and limit thread creation, thread pools are generally used. Take the most widely used Tomcat server in Java Web services for example, in order to process network requests in parallel to use the thread pool, the source code example is as follows:

public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = null; if (processorCache ! = null) { sc = processorCache.pop(); } if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // Thread execution is managed by thread pool Executor Executor = getExecutor(); if (dispatch && executor ! = null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }Copy the code

The above code is an example of Tomcat source code using the thread pool to concurrently process network requests. Tomcat is taken as an example here, mainly because most Web services developed based on Spring Boot and Spring MVC run in the Tomcat container. The complexity of threads and thread pools is hidden in middleware and frameworks. Therefore, although many students have written a lot of Java code, there may not be many additional threads in business development. The purpose of this example is to promote the awareness of concurrent programming.

The main way to use thread pools in Java is through the Executor framework, which provides a flexible thread pool implementation for Java programs as part of JUC and package delivery. Its logical hierarchy is shown in the figure below:

As shown, using the Executor framework, you can either create a thread pool by directly customizing the configuration, extending ThreadPoolExecutor, You can also create a thread pool by calling *** newSingleThreadExecutor(), newFixedThreadPool(), and newCachedThreadPool() by using the Executors class.

In addition, also can be extended by custom configuration, ScheduledThreadPoolExecutor to create a thread pool is cyclical, timing function, such as threads run after 10 s, thread running a second-class per minute. Again, as with ThreadPoolExecutor, if you don’t want to customize the configuration, May also be called directly by Executors class * * * “newScheduledThreadPool (), newSingleThreadScheduledExecutor ()” * * * this two ways to create automatic thread scale expansion ability and thread pool allows only a single line Process specific thread pool.

ForkJoinPool is a thread pool implementation type that has been added since jdk1.8, similar to the functionality supported by the fork-join framework. This is a kind of split into multiple tasks can be a large task queue, and the specific assigned to different thread processing mechanism, and the key feature is to, by stealing algorithm, a thread after the execution of the this task queue, can steal other queue tasks to perform, to maximize the efficiency of thread.

In practice, although you can create a single thread through Executors convenient, fixed threads or automatic contraction ability of thread pool, but generally suggest directly through the ThreadPoolExecutor or ScheduledThreadPoolExecutor custom configuration, Threads can be added to the thread pool by following the following path: * If the thread pool is created by default, the following path can be added to the thread pool by following the following path: * If the thread pool is created by default, the thread pool will likely cause OOM.

Let’s take a practical example to show how to customize a business thread pool using ThreadPoolExecutor:

1) Configure a thread pool class

Public final class SingleBlockPoolExecutor {/** * Custom configuration thread pool (thread pool core number of threads, maximum number of threads, lifetime Settings, queue type used, thread factory class, thread pool rejection class) */ private final ThreadPoolExecutor pool = new ThreadPoolExecutor(30, 100, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(100), new BlockThreadFactory(), new BlockRejectedExecutionHandler()); public ThreadPoolExecutor getPool() { return pool; } private SingleBlockPoolExecutor() {} public static class BlockThreadFactory implements ThreadFactory { private AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); String threadName = SingleBlockPoolExecutor.class.getSimpleName() + "-" + count.addAndGet(1); t.setName(threadName); return t; }} / class defined mechanism of thread pool refused to deal with * * * * / public static class BlockRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {// The rejected thread returns to the blocking queue for processing executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }}} /** * Holds instances of singleton classes in static inner classes, Private static class Holder {private static SingleBlockPoolExecutor instance = new SingleBlockPoolExecutor(); } /** * call getInstance, Public static SingleBlockPoolExecutor getInstance() {return holder.instance; } /** * public void destroy() {if (pool! = null) {// Thread pool destruction pool.shutdownNow(); }}}Copy the code

As shown in the code above, a thread pool is configured through the singleton pattern. In the configuration of ThreadPoolExecutor, you need to set the core parameters ** “number of core threads, maximum number of threads, lifetime Settings, type of queue used, thread factory class, thread pool rejection class” **.

2) Define the system global thread pool management class

Public class AsyncManager {/** * public thread pool for task processing */ public static final ExecutorService service = SingleBlockPoolExecutor.getInstance().getPool(); }Copy the code

In the application, in addition to the thread pool defined by the framework, if you customize the thread pool, in order to facilitate unified management and use, you can create a global management class, as shown above, through the static variable initialization of the thread pool we defined above.

3) Used in business

@Service @Slf4j public class OrderServiceImpl implements OrderService { @Override public CreateOrderBO CreateOrder (CreateOrderDTO CreateOrderDTO) {//1, CreateOrderDTO (log.info); //2, thread pool commit, asynchronous processing non-core logic, Such as log buried point AsyncManager. Service. Execute (() - > {System. Out. Println (" Thread - > "+ Thread. CurrentThread (). The getName () + ", performing an asynchronous log processing task "); }); return CreateOrderBO.builder().result(true).build(); }}Copy the code

As shown in the code above, when a business needs to be processed asynchronously through a thread pool, it can obtain the corresponding thread pool through the thread pool management class and submit the thread task to it for execution.

FutureTask implements asynchronous results return

In Thread processing implemented with Thread or Runnable, it is generally not possible to return Thread processing results. However, if you want to get the result of asynchronous processing by the calling thread after the asynchronous processing is complete, you can do so through the FutureTask framework. Example code is as follows:

@Service @Slf4j public class OrderServiceImpl implements OrderService { @Override public CreateOrderBO CreateOrder (CreateOrderDTO CreateOrderDTO) {//Future asynchronous processing returns the execution result // Defines the FutureTask object that receives the execution result of the thread List<Future<Integer>> Results = Collections.synchronizedList(new ArrayList<>()); / / implementation Callable interface definition thread execution logic results. The add (AsyncManager. Service. Submit (new Callable < Integer > () {@ Override public Integer call ()  throws Exception { int a = 1, b = 2; System.out.println("Callable interface executing "); return a + b; }})); For (Future<Integer> Future: Results) {try {system.out.println ("a+b=" + future.get(200, timeunit.milliseconds)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); Return createOrderBo.builder ().result(true).build(); }}Copy the code

As in the code above, if you want a thread to return execution results, you can define a thread class by implementing the Callable interface and receive thread processing results through FutureTask. However, in practical use, you need to pay attention to the business processing logic in the case that the thread has not finished executing temporarily.

CountDownLatch implements parallel thread synchronization

In concurrent programming, a complex business logic can be executed concurrently by multiple threads to increase speed; However, if you need to wait synchronously for these threads to complete their execution before further logic can be performed, you can use CountDownLatch to implement synchronous aggregation of multiple threads. The logic diagram is as follows:

In principle, CountDownLatch actually creates and maintains a volatile integer counter internally. When CountDownLatch () is called, the integer counter -1 is attempted. When wait() is called, the current thread determines whether the integer counter is zero. If it is not 0, the current thread blocks until one of the threads sets the counter to 0 and wakes up the thread waiting in the await() method to continue.

Examples of common code use are as follows:

1) Create a thread-handling class that performs specific business logic

public class DataDealTask implements Runnable { private List<Integer> list; private CountDownLatch latch; public DataDealTask(List<Integer> list, CountDownLatch latch) { this.list = list; this.latch = latch; } @override public void run() {try {system.out.println (" Thread ->" + thread.currentThread ().getName() + ", "+ list.size()); } finally {// Latch.countdown (); }}}Copy the code

The thread handles the class and receives the CountDownLatch object in addition to the data parameters to be processed when instantiated. After executing the thread logic, note that the countDown() method is called for success or failure.

2) Specific use methods

@Service @Slf4j public class OrderServiceImpl implements OrderService { @Override public CreateOrderBO CreateOrder (CreateOrderDTO CreateOrderDTO) {// Example of using CountDownLatch // Generate Integer[] array = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101, 102}; List<Integer> list = new ArrayList<>(); Arrays.asList(array).stream().map(o -> list.add(o)).collect(Collectors.toList()); Map<String, List<? >> entityMap = this.groupListByAvg(list, 6); CountDownLatch = new CountDownLatch(entitymap.size ()); Iterator<Entry<String, List<? >>> it = entityMap.entrySet().iterator(); While (it.hasnext ()) {DataDealTask DataDealTask = new DataDealTask((List<Integer>)) it.next().getValue(), latch); AsyncManager.service.submit(dataDealTask); } // Wait for batching threads to complete latching. Await (); } catch (InterruptedException e) { e.printStackTrace(); } return CreateOrderBO.builder().result(true).build(); }}Copy the code

As shown in the code above, if there is a large amount of data in the business logic, it can be processed in parallel by grouping, and wait for all threads to finish processing, and then synchronously return to the caller. This scenario can be synchronized with CountDownLatch!

CycliBarrier enables periodic synchronization of threads

The main function of CountDownLatch is to enable one-time synchronization of threads. In actual service scenarios, a phased task may be performed, for example, “Phase 1-> Phase 2-> Phase 3-> Phase 4-> Phase 5″**. Therefore, when the phased task is processed concurrently, the fence should be set at each stage. Only when all threads execute to a certain stage point, can they continue to advance the execution of the task at the next stage, as shown in the following logic:

For the above scenario, this can be done with CycliBarrier. CyclicBarrier uses a reentrantlock-based mutex implementation. Inside the CyclicBarrier there is a count. If count is not 0, each thread blocks with an await method at the synchronization point and decreases the count by one until the await method reaches 0, and all threads blocked by calling an await method are woken up to continue executing. And enter the next block, where the parties value set during new CyclicBarrier(parties) is assigned to count for reuse.

For example, to calculate the salaries of employees in a certain department, it is required that all employees’ salaries have been calculated before the next step of integration can be performed. An example of this code is as follows:

@ Slf4j @ Service public class SalaryStatisticServiceImpl implements SalaryStatisticService {/ simulation department store data * * * * / public static Map<String, List<EmployeeSalaryInfo>> employeeMap = Collections.synchronizedMap(new HashMap<>()); static { EmployeeSalaryInfo employeeA = new EmployeeSalaryInfo(); employeeA.setEmployeeNo("100"); employeeA.setBaseSalaryAmount(10000); employeeA.setSubsidyAmount(3000); EmployeeSalaryInfo employeeB = new EmployeeSalaryInfo(); employeeB.setEmployeeNo("101"); employeeB.setBaseSalaryAmount(30000); employeeB.setSubsidyAmount(3000); List<EmployeeSalaryInfo> list = new ArrayList<>(); list.add(employeeA); list.add(employeeB); employeeMap.put("10", list); } @override Public StatisticReportBO statisticReport(StatisticReportDTO StatisticReportDTO) {// Query information about all employees in the department (simulation) List<EmployeeSalaryInfo> employeeSalaryInfos = employeeMap.get(statisticReportDTO.getDepartmentNo()); If (employeeSalaryInfos == null) {log.info(" Department employee information does not exist "); return StatisticReportBO.builder().build(); } // Define the safety variable AtomicInteger totalSalary = new AtomicInteger(); CyclicBarrier CyclicBarrier = new CyclicBarrier(employeesalaryInfos.size (), New Runnable() {// Order of execution -b1 (random) // This thread does not block the main thread @override public void run() {log.info(" Sum up the calculated salaries of two employees ->" + TotalSalary. Get () + ", order ->B"); }}); -a for (EmployeeSalaryInfo e: employeeSalaryInfos) { AsyncManager.service.submit(new Callable<Integer>() { @Override public Integer call() { int totalAmount = e.getSubsidyAmount() + e.getBaseSalaryAmount(); The info (" calculate employee {} ", um participant etEmployeeNo () + "wages - >" + totalAmount +, execution order - > A); TotalSalary. AddAndGet (totalAmount); Try {// wait for other threads to synchronize cyclicBarrier. Await (); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } return totalAmount; }}); } // Order of execution -A/B (before or after random,totalSalary value is not guaranteed to be obtained, So CyclicBarrier is better for repeatable parallel computation with no return) // Encapsulating the response parameter StatisticReportBO StatisticReportBO = StatisticReportBO.builder().employeeCount(employeeSalaryInfos.size()) .departmentNo(statisticReportDTO.getDepartmentNo()) .salaryTotalAmount(totalSalary.get()).build(); Log.info (" Encapsulate interface response parameters, execution order ->A/B"); return statisticReportBO; } @data public static class EmployeeSalaryInfo {/** * employee id */ private String employeeNo; /** * basic salary */ private Integer baseSalaryAmount; /** * subsidyAmount */ private Integer subsidyAmount; }}Copy the code

The result of the above code is as follows:

[kPoolExecutor-1] c.w.c.s.impl.SalaryStatisticServiceImpl : Calculate salary - > 13000, 100 employees execution order - [kPoolExecutor - 2] C.W.C.S.I MPL. SalaryStatisticServiceImpl: Calculate salary - > 33000, 101 employees execution order - [kPoolExecutor - 2] C.W.C.S.I MPL. SalaryStatisticServiceImpl: Summary have been calculated respectively two employees wages - > 46000, [nio - 8080 - exec - 2] C.W.C.S.I MPL. SalaryStatisticServiceImpl: encapsulated interface response parameters, execution order - > A/BCopy the code

As can be seen from the above results, threads controlled by CycliBarrier will wait for the other threads to complete, and the CycliBarrier will not block the main thread, so the response parameter encapsulation code may be executed before or after the CycliBarrier summary thread. Pay attention when using!

Semaphore limits the number of threads that can access a resource

Semaphore limits the number of threads that can access a shared resource. Take the parking lot thread as an example, the code is as follows:

@service @slf4j public class ParkServiceImpl implements ParkService {/** * private static Semaphore semaphore = new Semaphore(2); @Override public AccessParkBO accessPark(AccessParkDTO accessParkDTO) { AsyncManager.service.execute(() -> { if (semaphore. AvailablePermits () = = 0) {the info (Thread. The currentThread (). The getName () + ", license plate number - > "+ accessParkDTO. GetCarNo () + ", please wait patiently for insufficient parking space "); } else {try {// Get the token try to enter the parking lot semaphore.acquire(); Log.info (thread.currentThread ().getName() + ", accessparkTo.getCarno () + ", accessparkTo.getCarno () + "); Thread.sleep(30000); Semaphore.release (); semaphore.release(); Log.info (thread.currentThread ().getName() + ", id ->" + accessparkTo.getCarno () + ", exit parking lot "); } catch (InterruptedException e) { e.printStackTrace(); }}}); Return accessparkbo.builder ().carno (accessparkTo.getCarno ()) .currentPositionCount(semaphore.availablePermits()) .isPermitAccess(semaphore.availablePermits() > 0 ? true : false).build(); }}Copy the code

The above code simulates 2 parking Spaces in the parking lot, and each car will stay for 30 seconds after entering the parking lot, and then simulates 3 parking requests in parallel. The specific execution results are as follows:

[kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor - 1, the images - > 10 and success into the parking lot order - > A [kPoolExecutor - 2] C.W.C.S ervice. Impl. ParkServiceImpl: SingleBlockPoolExecutor - 2, license plate number - > 20, success into the parking lot order - > A [kPoolExecutor - 3] C.W.C.S ervice. Impl. ParkServiceImpl: SingleBlockPoolExecutor - 3, license plate number - > 30, parking lots, please wait patiently for a 00, execution order - > [] kPoolExecutor - 1 B C.W.C.S ervice. Impl. ParkServiceImpl: SingleBlockPoolExecutor - 1, license plate number - > 10, out of a parking lot [kPoolExecutor - 2] C.W.C.S ervice. Impl. ParkServiceImpl: SingleBlockPoolExecutor - 2, license plate number - > 20, out of a parking lot [kPoolExecutor - 4] C.W.C.S ervice. Impl. ParkServiceImpl: SingleBlockPoolExecutor-4, license plate ->30, successfully entered the parking lotCopy the code

As you can see, Semaphore limits the number of threads allowed to enter to 2, so the third request is denied until the fourth request is allowed to enter after the first two requests are released by **.release()**!

Afterword.

At the application level, this article summarizes the basic MEMORY model of the JVM and the atomic way threads operate on shared memory. It also focuses on thread pool, FutrueTask, CountDownLatch, CycliBarrier and Semaphore, which are commonly used in Java concurrent programming.

Write in the last

Welcome to pay attention to my public number [calm as code], massive Java related articles, learning materials will be updated in it, sorting out the data will be placed in it.

If you think it’s written well, click a “like” and add a follow! Point attention, do not get lost, continue to update!!