preface

I. Business background

Some service requests are time-consuming operations and need to be locked to prevent concurrent operations. In addition, operations on database data must be performed at the same time to prevent adverse impact on services.

Second, the analysis process

Redis is used as a distributed lock, and the lock status is maintained uniformly in Redis to solve the problem of information incommunication between single JVM in the cluster, specify the operation sequence, and protect the correct data of users.

Combing design process

1. Create a @interface annotation and set the parameter flag in the annotation

2. Add AOP pointcuts to scan specific annotations

3. Create @aspect Aspect tasks, register beans, and intercept specific methods

4. The specified method parameter ProceedingJoinPoint intercepts before and after the method pjp.proceed()

5. Lock the key before the pointcut and delete the key after the task is executed

Core steps: lock, unlock and renew

lock

Use the opsForValue.setifAbsent method of RedisTemplate to check whether a key is present, set a random number uuID.random ().tostring, and generate a random number as value.

After obtaining the lock from Redis, set the expire time for the key and release the lock automatically when it expires.

According to this design, only the first request with a successful Key can proceed with subsequent data operations, and subsequent requests will fail because the 🔐 resource cannot be obtained.

Timeout problems

Worry that the method pjp.proceed() pointcut is too time-consuming, causing the key in Redis to be freed prematurely due to timeout.

For example, if thread A obtains the lock first, the proceed method takes time, exceeds the lock timeout time, and the lock is released when the proceed method expires. In this case, another thread B successfully obtains the Redis lock, and the two threads operate on the same batch of data at the same time, resulting in inaccurate data.

Solution: Add a “continuation”

The lock will not be released until the task is completed:

A scheduled thread pool ScheduledExecutorService is maintained, which scans tasks in the queue every 2s to determine whether the expiry time is approaching. The formula is: [expiry time] <= [current time] + [expiry interval (1/3)]

/** * Thread pool, one thread per JVM to maintain keyAliveTime, Regularly perform runnable * / private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor (1, new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build()); static { SCHEDULER.scheduleAtFixedRate(() -> { // do something to extend time }, 0, 2, TimeUnit.SECONDS); }Copy the code

Iii. Design scheme

After the above analysis, my colleagues designed this plan:

Having said the overall process, here are a few key steps:

Intercept the @redislock annotation to get the necessary parameters

Lock operation

The operation of renewal

End services and release locks

Fourth, in field

Before also had collated AOP use method, can refer to

Related attribute class configuration

Business property enumeration setting

Public enum RedisLockTypeEnum {/** * User-defined key prefix */ ONE("Business1", "Test1"), TWO("Business2", "Test2"); private String code; private String desc; RedisLockTypeEnum(String code, String desc) { this.code = code; this.desc = desc; } public String getCode() { return code; } public String getDesc() { return desc; } public String getUniqueKey(String key) { return String.format("%s:%s", this.getCode(), key); }}Copy the code

The task queue saves the parameters

Public class RedisLockDefinitionHolder {/ * * * the only key business * / private String businessKey; /** * private Long lockTime; /** * private Long lastModifyTime; /** * private Thread currentthread; /** ** private int tryCount; /** ** private int currentCount; /** * private Long modifyPeriod; public RedisLockDefinitionHolder(String businessKey, Long lockTime, Long lastModifyTime, Thread currentTread, int tryCount) { this.businessKey = businessKey; this.lockTime = lockTime; this.lastModifyTime = lastModifyTime; this.currentTread = currentTread; this.tryCount = tryCount; this.modifyPeriod = lockTime * 1000 / 3; }}Copy the code

Set the name of the intercepted annotation

@Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD, Elementtype.type}) public @interface RedisLockAnnotation {/** * parameter identification, default is the 0th subtext */ int lockFiled() default 0; */ int tryCount() default 3; /** * RedisLockTypeEnum typeEnum(); /** * Long lockTime() default 30; }Copy the code

The core section interception operation redislockAspect.java class is divided into three sections to describe the specific role

Set the Pointcut

/ * * * @ said of path in the annotation to intercept a specific comment * / the @pointcut (" @ the annotation (cn. Sevenyuan. Demo. Aop. Lock. RedisLockAnnotation) ") public void redisLockPC() { }Copy the code

The previous steps define the pointcuts we want to intercept. The next step is to do some custom actions Around the pointcuts:

@around (value = "redisLockPC()") public Object Around(ProceedingJoinPoint PJP) throws Throwable {// Method Method = resolveMethod(pjp); RedisLockAnnotation annotation = method.getAnnotation(RedisLockAnnotation.class); RedisLockTypeEnum typeEnum = annotation.typeEnum(); Object[] params = pjp.getArgs(); String ukString = params[annotation.lockFiled()].toString(); String businessKey = typeEnum. GetUniqueKey (ukString); String uniqueValue = UUID.randomUUID().toString(); Object result = null; try { boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue); if (! IsSuccess) {throw new Exception("You can't do it, because another has got the lock =-="); } redisTemplate.expire(businessKey, annotation.lockTime(), TimeUnit.SECONDS); Thread currentThread = Thread.currentThread(); / / to add the Task information to "delay" queue holderList. Add (new RedisLockDefinitionHolder (businessKey, the annotation lockTime (), System.currentTimeMillis(), currentThread, annotation.tryCount())); Result = pjp.proceed(); // The thread is interrupted, an exception is thrown, Interrupt request if this (currentThread. The isInterrupted ()) {throw new InterruptedException (" You had interrupted = - = "); } } catch (InterruptedException e ) { log.error("Interrupt exception, rollback transaction", e); throw new Exception("Interrupt exception, please send request again"); } catch (Exception e) { log.error("has some error, please check again", e); Redistemplate. delete(businessKey); log.info("release the lock, businessKey is [" + businessKey + "]"); } return result; }Copy the code

The above process can be summarized briefly:

Parsing annotation parameters to get annotation values and parameter values on methods

Redis locks and sets the timeout

The Task is added to the Delay queue to continue the Task in advance

Added a thread interrupt flag

End the request and release the lock in finally

The operation of renewal

ScheduledExecutorService (ScheduledExecutorService) is used to maintain a thread that continuously evaluates tasks in the task queue and extends the timeout:

/ / scanning task queue private static ConcurrentLinkedQueue < RedisLockDefinitionHolder > holderList = new ConcurrentLinkedQueue (); /** * thread pool, Maintain keyAliveTime * / private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor (1, new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build()); {/ / two seconds to perform a "sequel" operating SCHEDULER. The scheduleAtFixedRate (() - > {/ / remember add try-catch here, Or timing task after an error will no longer perform = - = Iterator < RedisLockDefinitionHolder > Iterator = holderList. The Iterator (); while (iterator.hasNext()) { RedisLockDefinitionHolder holder = iterator.next(); If (holder == null) {iterator.remove(); continue; } / / whether the key is valid and invalid words to remove the if (redisTemplate. OpsForValue () get (holder. GetBusinessKey ()) = = null) {iterator. Remove (); continue; If (holder.getCurrentCount() > holder.getTryCount()) {holder.getCurrentTread().interrupt(); iterator.remove(); continue; } // If currentTimeMillis(); boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime; if (shouldExtend) { holder.setLastModifyTime(curTime); redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS); log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount()); holder.setCurrentCount(holder.getCurrentCount() + 1); } } }, 0, 2, TimeUnit.SECONDS); }Copy the code

This code, used to implement the idea of dotted boxes in the design diagram, prevents a request from being too time-consuming and causing the lock to be released prematurely.

Thread#interrupt is added to interrupt the thread after the number of retries.

However, if you encounter such time-consuming requests, you are advised to search for the root cause, analyze the time-consuming path, and optimize services or handle other operations to avoid time-consuming operations.

So remember to Log more and analyze problems faster. How to use SpringBoot AOP to record operation logs, exception logs?

5. Start testing

This annotation is used in an entry method, and then time consuming requests are simulated in the business using Thread#sleep

@GetMapping("/testRedisLock") @RedisLockAnnotation(typeEnum = RedisLockTypeEnum.ONE, LockTime = 3) public Book testRedisLock(@requestParam ("userId") Long userId) {try {log.info(" sleep before execute "); Thread.sleep(10000); Log.info (" sleep after execution "); } catch (Exception e) { // log error log.info("has some error", e); } return null; }Copy the code

When used, add the annotation to the method and set the corresponding parameters. According to typeEnum, multiple services can be distinguished and limited to be operated at the same time.

Test results:

The 2020-04-04 14:55:50. 9326-864 the INFO [nio - 8081 - exec - 1] C.S.D emo. Controller. BookController: Sleep to perform before the 2020-04-04 14:55:52. 9326-855 the INFO - the schedule - pool [k] C.S.D emo. Aop. Lock. RedisLockAspect: businessKey: [Business1:1024], try count : 0 14:55:54 2020-04-04. 9326-851 the INFO - the schedule - pool [k] C.S.D emo. Aop. Lock. RedisLockAspect: businessKey: [Business1:1024], try count : 1 the 14:55:56 2020-04-04. 9326-851 the INFO - the schedule - pool [k] C.S.D emo. Aop. Lock. RedisLockAspect: businessKey: [Business1:1024], try count : 2 the 14:55:58 2020-04-04. 9326-852 the INFO - the schedule - pool [k] C.S.D emo. Aop. Lock. RedisLockAspect: businessKey: [Business1:1024], try count : 3 the 14:56:00 2020-04-04. 9326-857 the INFO [nio - 8081 - exec - 1] C.S.D emo. Controller. BookController: Has some error Java. Lang. InterruptedException: sleep interrupted at Java lang. Thread.sleep (Native Method) [na: 1.8.0 comes with _221]Copy the code

What I’m testing here is a failure scenario with too many retries, and if you reduce the sleep time, you can keep the business going.

If both requests are made, you will find the following error message:

Indicates that our lock 🔐 is in effect, avoiding duplicate requests.

Six, summarized

For time-consuming services and core data, data cannot be manipulated by repeated requests at the same time to avoid incorrect data. Therefore, distributed locks are used to protect them.

Let’s review the design process again:

1. Create a @interface annotation and set the parameter flag in the annotation

2. Add AOP pointcuts to scan specific annotations

3. Create @aspect Aspect tasks, register beans, and intercept specific methods

4. The specified method parameter ProceedingJoinPoint intercepts before and after the method pjp.proceed()

5. Lock the key before the pointcut and delete the key after the task is executed

This study was based on the code design of Review partner, from which I learned the specific implementation of distributed lock, and copied his design to write a simplified version of business processing. For “continuous” operations that were not previously considered, daemons are used to periodically determine and extend timeouts, avoiding early lock release.

As a result, three knowledge points were reviewed at the same time:

1. Implementation and common methods of AOP

2. Usage and parameter Description of ScheduledExecutorService

Thread#interrupt meaning and usage of Thread#interrupt

The last

I here organized a: SpringBoot related information, Spring bucket series, Java systematic information, (including Java core knowledge points, interview topics and 20 years of the latest Internet real questions, e-books, etc.) need friends can pay attention to the public number [procedures Yuan Small wan] can be obtained.