This is the 16th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

This series code address: github.com/JoJoTec/spr…

In the previous section, we verified retries through unit tests. In this section, we will verify thread isolation.

  1. Verify that the configuration loads correctly: that is, we configure in Spring (e.gapplication.ymlThe Resilience4j configuration added in) is loaded and applied correctly.
  2. Different threads (pools) are used to invoke different instances of the same microservice.

Verify that the configuration loads correctly

Similar to the validation retry before, we can define a different FeignClient and then check the thread isolation configuration loaded by Resilience4J to verify that the thread isolation configuration is loaded correctly.

And, unlike the retry configuration, we know from the previous series of source code analyses that Spring-Cloud-OpenFeign’s FeignClient is lazily loaded. So the thread isolation that we implemented is lazily loaded and needs to be called before the thread pool is initialized. So here we need to validate the thread pool configuration after making the call.

First, define two FeignClients. The microservices are testService1 and testService2, and the contextId is testService1Client and testService2Client

@FeignClient(name = "testService1", contextId = "testService1Client") public interface TestService1Client { @GetMapping("/anything") HttpBinAnythingResponse  anything(); } @FeignClient(name = "testService2", contextId = "testService2Client") public interface TestService2Client { @GetMapping("/anything") HttpBinAnythingResponse  anything(); }Copy the code

Then, we add the Spring configuration, add an instance to each of the two microservices, and write the unit test classes using SpringExtension:

//SpringExtension also includes Mockito related Extension, So annotations like @mock also work @extendWith (springExtension.class) @springbooTtest (properties = {// The default request retry count is 3 "resilience4j.retry.configs.default.maxAttempts=3", / / all method request retry count testService2Client inside a 2 "resilience4j. Retry. Configs. TestService2Client. MaxAttempts = 2", / / the default thread pool configuration "resilience4j thread pool - bulkhead. The configs. Default. CoreThreadPoolSize = 10", "resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10", "resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" , / / testService2Client thread pool configuration "resilience4j. Thread pool - - bulkhead. Configs. TestService2Client. CoreThreadPoolSize = 5", "resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5", "resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1", }) @Log4j2 public class OpenFeignClientTest { @SpringBootApplication @Configuration public static class App { @Bean Public DiscoveryClient DiscoveryClient () {// Simulate two service instances. ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class); ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class); Map<String, String> zone1 = Map.ofEntries( Map.entry("zone", "zone1") ); when(service1Instance1.getMetadata()).thenReturn(zone1); when(service1Instance1.getInstanceId()).thenReturn("service1Instance1"); when(service1Instance1.getHost()).thenReturn("www.httpbin.org"); when(service1Instance1.getPort()).thenReturn(80); when(service2Instance2.getInstanceId()).thenReturn("service1Instance2"); when(service2Instance2.getHost()).thenReturn("httpbin.org"); when(service2Instance2.getPort()).thenReturn(80); DiscoveryClient spy = Mockito.spy(DiscoveryClient.class); Mockito.when(spy.getInstances("testService1")) .thenReturn(List.of(service1Instance1)); Mockito.when(spy.getInstances("testService2")) .thenReturn(List.of(service2Instance2)); return spy; }}}Copy the code

Write test code to verify that the configuration is correct:

@test public void testConfigureThreadPool() {// Prevent the impact of circuit breakers circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset); / / call the two FeignClient ensure corresponding NamedContext initialized testService1Client. Anything (); testService2Client.anything(); // Verify the actual configuration of thread isolation, Meet our fill in the configuration of the ThreadPoolBulkhead ThreadPoolBulkhead = threadPoolBulkheadRegistry. GetAllBulkheads () asJava () .stream().filter(t -> t.getName().contains("service1Instance1")).findFirst().get(); Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 10); Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 10); threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava() .stream().filter(t -> t.getName().contains("service1Instance2")).findFirst().get(); Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 5); Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 5); }Copy the code

Different threads (pools) are used to invoke different instances of the same microservice.

We need to ensure that the thread pool for the execution of the last call (that is, the sending of the HTTP request) must be the thread pool in the corresponding ThreadPoolBulkHead. This requires us to implement the ApacheHttpClient with the annotation @enableaspectjautoproxy (proxyTargetClass = true) :

//SpringExtension also includes Mockito related Extension, So annotations like @mock also work @extendWith (springExtension.class) @springbooTtest (properties = {// The default request retry count is 3 "resilience4j.retry.configs.default.maxAttempts=3", / / all method request retry count testService2Client inside a 2 "resilience4j. Retry. Configs. TestService2Client. MaxAttempts = 2", / / the default thread pool configuration "resilience4j thread pool - bulkhead. The configs. Default. CoreThreadPoolSize = 10", "resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10", "resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" , / / testService2Client thread pool configuration "resilience4j. Thread pool - - bulkhead. Configs. TestService2Client. CoreThreadPoolSize = 5", "resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5", "resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1", }) @Log4j2 public class OpenFeignClientTest { @SpringBootApplication @Configuration @EnableAspectJAutoProxy(proxyTargetClass = true) public static class App { @Bean public DiscoveryClient DiscoveryClient () {ServiceInstance service1Instance1 = mockito.spy (serviceinstance.class); ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class); Map<String, String> zone1 = Map.ofEntries( Map.entry("zone", "zone1") ); when(service1Instance1.getMetadata()).thenReturn(zone1); when(service1Instance1.getInstanceId()).thenReturn("service1Instance1"); when(service1Instance1.getHost()).thenReturn("www.httpbin.org"); when(service1Instance1.getPort()).thenReturn(80); when(service2Instance2.getInstanceId()).thenReturn("service1Instance2"); when(service2Instance2.getHost()).thenReturn("httpbin.org"); when(service2Instance2.getPort()).thenReturn(80); DiscoveryClient spy = Mockito.spy(DiscoveryClient.class); Mockito.when(spy.getInstances("testService1")) .thenReturn(List.of(service1Instance1)); Mockito.when(spy.getInstances("testService2")) .thenReturn(List.of(service2Instance2)); return spy; }}}Copy the code

ApacheHttpClient’s execute method is used to retrieve the actual thread pool responsible for the HTTP call and place the thread in the request Header:

@aspect public static class ApacheHttpClientAop {// In the last step of the ApacheHttpClient @Pointcut("execution(*) com.github.jojotech.spring.cloud.webmvc.feign.ApacheHttpClient.execute(..) )") public void annotationPointcut() { } @Around("annotationPointcut()") public Object around(ProceedingJoinPoint pjp) Throws Throwable {// Set Header, do not pass the Feign RequestInterceptor, Request Request = (Request) ppj.getargs ()[0]; Field headers = ReflectionUtils.findField(Request.class, "headers"); ReflectionUtils.makeAccessible(headers); Map<String, Collection<String>> map = (Map<String, Collection<String>>) ReflectionUtils.getField(headers, request); HashMap<String, Collection<String>> stringCollectionHashMap = new HashMap<>(map); stringCollectionHashMap.put(THREAD_ID_HEADER, List.of(String.valueOf(Thread.currentThread().getName()))); ReflectionUtils.setField(headers, request, stringCollectionHashMap); return pjp.proceed(); }}Copy the code

In this way, we can get the specific bearing the name of the thread of the request, can be seen from the name of what he has in the thread pool (format for “bulkhead – thread isolation name – n”, for example, bulkhead-testService1Client:www.httpbin.org: 80-1), Let’s see if different instances are called from different thread pools:

@ Test public void testDifferentThreadPoolForDifferentInstance () throws InterruptedException {/ / prevent the circuit breaker circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset); Set<String> threadIds = Sets.newConcurrentHashSet(); Thread[] threads = new Thread[100]; For (int I = 0; i < 100; i++) { threads[i] = new Thread(() -> { Span span = tracer.nextSpan(); try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { HttpBinAnythingResponse response = testService1Client.anything(); Header String threadId = response.getheaders ().get(THREAD_ID_HEADER); threadIds.add(threadId); }}); threads[i].start(); } for (int i = 0; i < 100; i++) { threads[i].join(); } / / confirm instance testService1Client:httpbin.org: 80 thread pool threads exist Assertions. AssertTrue (threadIds. Stream (). AnyMatch (s - > s.contains("testService1Client:httpbin.org:80"))); / / confirm instance testService1Client:httpbin.org: 80 thread pool threads exist Assertions. AssertTrue (threadIds. Stream (). AnyMatch (s - > s.contains("testService1Client:www.httpbin.org:80"))); }Copy the code

Thus, we have successfully verified the thread pool isolation of the instance call.

Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers