For all sample code, see/download at

Github.com/Wasabi1234/…





If a program is running on a single-core processor, multiple threads will alternately swap in and out of memory. These threads are “existing” at the same time, and each thread is in some state during execution. If a program is running on a multi-core processor, at this point, Each thread in the program will be assigned to a processor core, so it can run simultaneously. The system is designed to handle many requests in parallel at the same time

  • Concurrency: Multiple threads operate the same resource, ensuring thread safety and proper use of resources
  • High concurrency: The service can process many requests simultaneously, improving application performance

    #2 CPU

    ##2.1 CPU multi-level cache

  • Why CPU Cache The CPU frequency is too fast for the main memory to keep up with. Therefore, during the processor clock cycle, the CPU often has to wait for the main memory, wasting resources. The cache is designed to alleviate the speed mismatch between the CPU and memory.
  • Meaning of CPU cache
    1. Time locality If some data is accessed, it is likely to be accessed again in the near future
    2. Spatial locality If a piece of data is accessed, then adjacent data may soon be accessed as well. #2.2 Cache consistency (MESI) Is used to ensure the consistency of cached shared data across multiple CPU caches
  • This cache row is only cached in the CPU’s cache and is modified, inconsistent with the data in main memory, and needs to be written back to main memory at a future point in time before other cpus can read the corresponding memory in main memory. After this value is written to main memory, the state of the cache row changes to E
  • E-exclusive cache rows are cached only in the CPU’s cache and are not modified. Consistent with the data in main memory, e-EXCLUSIVE cache rows can change to the S state when read by another CPU at any time, and to the M state when modified
  • S-shared This cache row can be cached by multiple cpus and is consistent with the data in main memory
  • I – invalid invalid

  • Perform optimization out of order

    The processor makes optimizations that go against the original order of the code to make it faster

    Advantages and risks of concurrency



    #3 Project preparation

    ##3.1 Project initialization







    ##3.2 Concurrent simulation -Jmeter pressure measurement











    ##3.3 Concurrent simulation – code

    ###CountDownLatch



    ###Semaphore



    Both of these are usually paired with thread pools

Let’s do the concurrent simulation

package com.mmall.concurrency; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng * @date 18/4/1 */ @slf4j @notThreadSafe Public class ConcurrencyTest {/** ** total requests */ public static int clientTotal = 5000; Public static int threadTotal = 200; public static int threadTotal = 200; public static int count = 0; Public static void main(String[] args) throws Exception {// Define the thread pool ExecutorService = Executors.newCachedThreadPool(); Final Semaphore Semaphore = new Semaphore(threadTotal); Final CountDownLatch CountDownLatch = new CountDownLatch(clientTotal); For (int I = 0; i < clientTotal ; Executorservice.execute (() -> {try {// Semaphore. Acquire (); add(); / / release the semaphore. The release (); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); Executorservice.shutdown (); log.info("count:{}", count); } private static void add() {count++; }}Copy the code

Running results randomly, so not thread-safe # 4 # # 4.1 thread safety thread safety When multiple threads access a class, no matter which kind of scheduling or runtime environment using alternate with these processes will be how to, and does not require any additional in advocate tone code synchronization or synergy, this class can show the correct behavior, Call this class a thread-safe ##4.2 Atomic ###4.2.1 package

  • PareAndSwapInt AtomicXXX: CAS, Unsafe.com provides exclusive access, at the same time there is only one thread to operate it
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; /** * @author shishusheng */ @slf4j @threadSafe Public class AtomicExample2 {/** * public static int clientTotal = 5000; Public static int threadTotal = 200; public static int threadTotal = 200; Public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { System.out.println(); semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); // Main memory log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); }}Copy the code
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /** * @author shishusheng * @date 18/4/3 */ @Slf4j @ThreadSafe public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { // 2 count.compareAndSet(0, 2); // no count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // 4 count.compareAndSet(2, 4); // no count.compareAndSet(3, 5); log.info("count:{}", count.get()); }}Copy the code

  • AtomicReference,AtomicReferenceFieldUpdater

  • AtomicBoolean

  • AtomicStampReference: CAS ABA problem ###4.2.2 Synchronized: dependent JVM

  • Modifier block: Code enclosed in curly braces that applies to the called object

  • Modifier method: The entire method, applied to the called object

  • Modify static methods: The entire static method, applied to all objects

package com.mmall.concurrency.example.count; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @slf4j @threadSafe public class CountExample3 {/** ** public static int clientTotal = 5000; Public static int threadTotal = 200; public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private synchronized static void add() { count++; }}Copy the code

Synchronized corrects counting class methods

  • Synchronized modifier class: the part enclosed in parentheses that does not apply to synchronized modifier methods that all object subclasses inherit from their parent class.

Lock: rely on special CPU instructions, code implementation ###4.2.3 comparison

  • Synchronized: can not interrupt the lock, suitable for competition is not fierce, good readability
  • Lock: interruptible Lock, diversified synchronization, can maintain the normal when the competition is fierce
  • Atomic: Can maintain normal performance when competition is fierce, better than Lock; Only one value can be synchronized. ##4.3 Visibility Changes made to main memory by one thread can be observed by other threads in a timely manner
  • Thread cross execution
  • Reordering in combination with thread crossing
  • The updated value of a shared variable is not updated between working memory and main memory. ###
  • The thread must flush the latest value of the shared variable to main memory before it can be unlocked
  • When a thread locks, it clears the value of a shared variable in working memory, thus making

    With shared variables, you need to read the latest value from main memory (Locking and unlocking are the same lock)

    ###4.3.3 Volatile visibility

    This is done by adding memory barriers and disallowing reordering optimizations
  • Writes to volatile variables are followed by a store barrier instruction that flusher shared variable values from local memory to main memory
  • Read operations on volatile variables are preceded by a load

    Barrier instruction that reads shared variables from main memory





  • Volatile use
volatile boolean inited = false; // thread 1: context = loadContext(); inited= true; // thread 2: while(! inited ){ sleep(); } doSomethingWithConfig(context)Copy the code

A thread observes the order in which instructions are executed in other threads. This observation is usually disordered due to instruction reordering

The JMM allows the compiler and processor to reorder instructions, but the reordering process does not affect the execution of a single-threaded program, but does affect the correctness of multithreaded concurrent execution

# # # 4.4.1 happens-before rules

#5 Publish objects







##5.1 Secure publishing objects







package com.mmall.concurrency.example.singleton; import com.mmall.concurrency.annoations.NotThreadSafe; @author shishusheng */ @notThreadSafe Public class SingletonExample4 {/** ** Private constructor */ private SingletonExample4() {} // 1, memory = allocate() allocates memory space for the object // 2, ctorInstance() initializes the object // 3, instance = // JVM and CPU optimizations, Memory = allocate(); instance = memory sets instance to the newly allocated memory; // ctorInstance() initializes object /** * Singleton */ private static SingletonExample4 instance = null; /** * static factory method ** @return */ public static SingletonExample4 getInstance() {// Double check mechanism // B if (instance == null) {// Synchronized (singletonexample4.class) {if (instance == null) {// a-3 instance = new SingletonExample4(); } } } return instance; }}Copy the code





#7 AQS

# # 7.1 is introduced

  • FIFO queues are implemented using Node and can be used to build the basic framework for locks and other synchronization devices
  • An int is used to represent the state
  • The method of use is inheritance
  • Subclasses manipulate state by inheriting and managing their state {acquire and release} through methods that implement it
  • It is possible to implement both exclusive locking and shared locking mode (exclusive, shared) synchronization component ###CountDownLatch
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); }}); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); }}Copy the code
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @author shishusheng ** / @slf4j public class CountDownLatchExample2 {private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); }}); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); }}Copy the code

# # Semaphore usage







##CycliBarrier

package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); }}); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); }}Copy the code

package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); }}); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); }}Copy the code

package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Exec.execute (() -> {try {if (semaphore.tryacquire ()) {test(threadNum); // Release a license semaphore.release(); } } catch (Exception e) { log.error("exception", e); }}); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); }}Copy the code

# 9 thread pool

9.1 newCachedThreadPool # #



9.2 newFixedThreadPool # #



NewSingleThreadExecutor # # 9.3

You can see that it’s sequential



NewScheduledThreadPool # # 9.4





# 10 deadlocks



11 Capacity Expansion with high Concurrency

11.1 capacity

11.1 Capacity Expansion – Database

High concurrency cache idea

12.1 the cache

1 Cache Features

2 Factors affecting cache hit ratio

3 Cache classification and application scenarios

12.2 High-concurrency Caching – Features, scenarios, and components

1 Guava Cache

2 Cache – Memchche





3 cache – Redis

12.3 Use of Redis

  • The configuration class



  • Service class







12.4 High-concurrency scenarios and Actual combat

Cache consistency

Cache concurrency issues

Cache penetration problem

Avalanche of cache problems

Cache high concurrency combat – stock timesharing line

13 High concurrency message queue

13.1 Service Cases



Encapsulate sending SMS messages into a message queue. If too many SMS messages occur and the queue is full, you need to control the sending frequency.

By encapsulating events into messages and putting them into queues, service decoupling and asynchronous design are realized, ensuring that short messages will be sent to users successfully as long as the short message service is normal.

13.2 Features of message queues

  • Why do you need a message queue

  • advantages



The queue

kafka

rabbitmq

reference

Coding.imooc.com/class/195.h…