Code address of this project:
https://github.com/HashZhang/…

We use Spring Cloud LoadBalancer, the official Spring Cloud recommendation, as our client-side LoadBalancer. In the last section, we learned about the structure of Spring Cloud Loadbalancer. Next, let’s talk about the functions we need to achieve when using Spring Cloud Loadbalancer:

  1. We are going to implementDifferent clusters do not call each other through the instancemetamapIn thezoneconfigurationTo distinguish between instances of different clusters. instance-onlymetamapIn thezoneInstances of the same configuration can only call each other. This is done by implementing a customServiceInstanceListSupplierCan be realized
  2. A load-balancing polling algorithm requires isolation between requests, the inability to share a Position causes a failed request to be retried with the original failed instance. The default as we saw in the previous sectionRoundRobinLoadBalancerIs that all threads share the same atomic variablepositionEach time I ask for atoms to be incremented by 1. The problem in this case is that you have A microservice A that has two instances: instance 1 and instance 2. When request A arrives,RoundRobinLoadBalancerReturns instance 1, when request B arrives,RoundRobinLoadBalancerReturns instance 2. And then if request A fails and retries,RoundRobinLoadBalancerInstance 1 is returned again. This is not what we expected.

We write our own implementations for each of these functions.

Implement that different clusters do not call each other

Zone configuration in Spring Cloud LoadBalancer

Spring Cloud LoadBalancer defines the LoadBalancerZonConfig:

Public class LoadBalancerZonConfig {// identifies the zone in which the current load balancer is located; public LoadBalancerZoneConfig(String zone) { this.zone = zone; } public String getZone() { return zone; } public void setZone(String zone) { this.zone = zone; }}

If does not rely on introduction of a Eureka, this zone by spring. Cloud. Loadbalancer. The zone configuration: LoadBalancerAutoConfiguration

@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
    return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}

If the Eureka dependency is introduced, then if the zone is configured in the Eureka metadata, then the zone will overwrite the LoadBalancerZonConfig in the Spring Cloud LoadBalancer:

EurekaLoadBalancerClientConfiguration

@PostConstruct public void postprocess() { if (! StringUtils.isEmpty(zoneConfig.getZone())) { return; } String zone = getZoneFromEureka(); if (! StringUtils.isEmpty(zone)) { if (LOG.isDebugEnabled()) { LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone); } // Set 'LoadBalancerZonConfig' ZonConfig. setZone(zone); } } private String getZoneFromEureka() { String zone; / / is configured with a spring. Cloud. Loadbalancer. Eureka. ApproximateZoneFromHostname to true Boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname(); // If configured, try to extract the host name from the Eureka configuration. Split the host, and then the second is the zone / / such as www.zone1.com is zone1 if (approximateZoneFromHostname && eurekaConfig! = null) { return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false)); } else {// Otherwise, get the key zone from the metadata map zone = eurekaConfig == null? null : eurekaConfig.getMetadataMap().get("zone"); If (StringUtils.isEmpty(zone) && ClientConfig!) If (StringUtils.isEmpty(zone) && ClientConfig!) = null) { String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); // Pick the first one from the regions we want to connect to zone = zones ! = null && zones.length > 0 ? zones[0] : null; } return zone; }}

Implement SameZoneOnlyServiceInstanceListSupplier

To filter instances in the same zone through zones and never return instances that are not in the same zone, write the code:

SameZoneOnlyServiceInstanceListSupplier

/** * only returns a service instance on the same Zone as the current instance, Service does not call each other between different zones * / public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier Private final String zone = "zone"; /** * private final String zone = "zone"; /** * current Spring Cloud LoadBalancer zone configuration */ private final LoadBalancerZoneconfig zoneConfig; private String zone; public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) { super(delegate); this.zoneConfig = zoneConfig; } @Override public Flux<List<ServiceInstance>> get() { return getDelegate().get().map(this::filteredByZone); } private List<ServiceInstance> FilteredByZone (List<ServiceInstance> ServiceInstances) {if (zone == null) { zone = zoneConfig.getZone(); } if (zone ! = null) { List<ServiceInstance> filteredInstances = new ArrayList<>(); for (ServiceInstance serviceInstance : serviceInstances) { String instanceZone = getZone(serviceInstance); if (zone.equalsIgnoreCase(instanceZone)) { filteredInstances.add(serviceInstance); } } if (filteredInstances.size() > 0) { return filteredInstances; }} / * * * @ see ZonePreferenceServiceInstanceListSupplier when no instances of the same zone returns all instances * we don't call each other between different zones here in order to realize the need to return an empty list * / return List.of(); } // Read the instance's zone, Private String getZone(serviceInstance serviceInstance) {Map<String, String> metadata = serviceInstance.getMetadata(); if (metadata ! = null) { return metadata.get(ZONE); } return null; }}

A load balancing algorithm for separating requests from each other

In the previous section, we mentioned that we used spring-cloud-sleuth as the link tracing library. We think we can distinguish whether it is the same request through the traceId.

RoundRobinWithRequestSeparatedPositionLoadBalancer

/ / must be must be implemented ReactorServiceInstanceLoadBalancer / / not ReactorLoadBalancer < ServiceInstance > / / because when registration is ReactorServiceInstanceLoadBalancer @ Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; // Each request will not take more than 1 minute to retry. // If the request is longer than 1 minute, it must be heavy. Private final LoadingCache<Long, AtomicInteger> positionCache = Caffin.newBuilder (). ExpireAfter Write(1, TimeUnit. MINUTES) / / random initial value, prevent begins with the first call. Every time the build (k - > new AtomicInteger (ThreadLocalRandom. The current () nextInt (0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; } @Override public Mono<Response<ServiceInstance>> choose(Request request) { return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); } private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } Span currentSpan = tracer.currentSpan();} // To resolve the original algorithm different call concurrency may cause a request to retry the same instance if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); Return new DefaultResponse(ServiceInstances. Stream ()) // The list order may be different. First take again for sorting sorted (Comparator.com paring (ServiceInstance: : getInstanceId)). Collect (Collectors. ToList ()). The get (pos)); }}

Add the above two elements to our custom LoadBalancerClient and enable them

In the previous section, we mentioned that the default load balancer configuration can be configured using the @LoadBalancerClients annotation, which is how we do it here. First add the automatic configuration class in spring.factories:

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

Then write the autoconfiguration class. It’s as simple as adding a @LoadBalancerClients annotation to set the default configuration class:

LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
public class LoadBalancerAutoConfiguration {
}

Write this default configuration class and assemble it with the two classes we implemented above:

DefaultLoadBalancerConfiguration

@Configuration(proxyBeanMethods = false) public class DefaultLoadBalancerConfiguration { @Bean public ServiceInstanceListSupplier serviceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, ConfigurableApplicationContext context, LoadBalancerZoneConfig zoneConfig ) { ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); Return / / open service instance cache new CachingServiceInstanceListSupplier (/ new/can only return to the same zone service instance SameZoneOnlyServiceInstanceListSupplier (/ / enabled through discoveryClient service discovery new DiscoveryClientServiceInstanceListSupplier ( discoveryClient, env ), zoneConfig ) , cacheManagerProvider.getIfAvailable() ); } @Bean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer ) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinWithRequestSeparatedPositionLoadBalancer( serviceInstanceListSupplier, name, tracer ); }}

In this way, we have implemented a custom load balancer. I also understand the use of Spring Cloud LoadBalancer. Next, let’s unit test these features. There will be a separate section after the integration tests, so don’t worry.

Unit test the above functions

Through this session of unit testing, we can also understand how to unit test the basic components that we generally implement Spring Cloud customization.

The unit tests here mainly test three scenarios:

  1. Only instances in the same zone are returned. No other zones are returned
  2. For multiple requests, each request returns a different instance than the last one.
  3. For each request from multiple threads, if retried, a different instance will be returned

Write the code LoadBalancerTest

//SpringRunner also includes mockitoJUnitRunner, So annotations like @Mock also take effect @RunWith(springrunner.class) @SpringBoottest (properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"}) public class LoadBalancerTest { @EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class) @Configuration public static class App { @Bean public DiscoveryClient discoveryClient() { ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class); ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class); ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class); Map<String, String> zone1 = Map.ofEntries( Map.entry("zone", "zone1") ); Map<String, String> zone2 = Map.ofEntries( Map.entry("zone", "zone2") ); when(zone1Instance1.getMetadata()).thenReturn(zone1); when(zone1Instance1.getInstanceId()).thenReturn("instance1"); when(zone1Instance2.getMetadata()).thenReturn(zone1); when(zone1Instance2.getInstanceId()).thenReturn("instance2"); when(zone2Instance3.getMetadata()).thenReturn(zone2); when(zone2Instance3.getInstanceId()).thenReturn("instance3"); DiscoveryClient mock = Mockito.mock(DiscoveryClient.class); Mockito.when(mock.getInstances("testService")) .thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3)); return mock; } } @Autowired private LoadBalancerClientFactory loadBalancerClientFactory; @Autowired private Tracer tracer; @test public void testFilteredByZone() {reactiveLoadBalancer <ServiceInstance> TestService = loadBalancerClientFactory.getInstance("testService"); for (int i = 0; i < 100; i++) { ServiceInstance server = Mono.from(testService.choose()).block().getServer(); // Must be in the same zone as the current instance assert.AssertQuals (Server.getMetadata ().get("zone"), "zone1"); }} /** * @Test public void TestReturnNext () {ReactiveLoadBalancer<ServiceInstance> TestService = loadBalancerClientFactory.getInstance("testService"); Server1 = mono.from (TestService.choose ()).block().getServer(); ServiceInstance server2 = Mono.from(testService.choose()).block().getServer(); Assert.AssertNoteQuals (server1.getInstanceId(), server2.getInstanceId())); } /** * By default, it is possible to return the same instance across threads. In our implementation, keeping * span will return the next instance, * @throws Exception */ @test public void testSamesPanReturnNext () throws Exception {Span  span = tracer.nextSpan(); // for (int I = 0; i < 100; i++) { try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { ReactiveLoadBalancer<ServiceInstance> testService = loadBalancerClientFactory.getInstance("testService"); Server1 = mono.from (TestService.choose ()).block().getServer(); AtomicReference<ServiceInstance> server2 = new AtomicReference<>(); Thread Thread = new Thread(() -> { Try (tracer.spaninscope cleared2 = tracer.withspaninscope (span)) { server2.set(Mono.from(testService.choose()).block().getServer()); }}); thread.start(); thread.join(); System.out.println(i); Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId()); }}}}

Run the test, and the test passes.

WeChat search “my programming cat” attention public number, add the author WeChat, a daily brush, easy to improve the technology, won a variety of offers: