Cache object in the Project Reactor

Although reactor-Netty’s use of NIO to read and write data sources greatly speeds up the response time of the program, remote data reads and writes cannot keep up with the speed of the local cache no matter what method is used.

Project Reactor also provides Cache access. You can use the cache object after adding the following dependencies:

<! -- https://mvnrepository.com/artifact/io.projectreactor.addons/reactor-extra --> <dependency> < the groupId > IO. Projectreactor. Addons < / groupId > < artifactId > reactor - extra < / artifactId > < version > 3.3.4. RELEASE < / version > </dependency>Copy the code
  1. CacheMono
  2. CacheFlux

As you can see from the name, CacheMono stands for [0..1] as opposed to Mono, and CacheFlux stands for [0..N] as opposed to Flux.

In fact, CacheFlux is used in the Spring Cloud Gateway source code to cache routing information.

public CachingRouteDefinitionLocator(RouteDefinitionLocator delegate) {
		this.delegate = delegate;
		routeDefinitions = CacheFlux.lookup(cache, "routeDefs", RouteDefinition.class)
				.onCacheMissResume(this.delegate::getRouteDefinitions);
	}
Copy the code

CacheFlux official document example

Here are two examples: Generic Cache entry Points and Map endpoints

Generic cache entry points

	 AtomicReference<Context> storeRef = new AtomicReference<>(Context.empty());
     Flux<Integer> cachedFlux = CacheFlux
                .lookup(k -> Mono.justOrEmpty(storeRef.get().getOrEmpty(k))
                                 .cast(Integer.class)
                                 .flatMap(max -> Flux.range(1, max)
                                                     .materialize()
                                                     .collectList()),
                                key)
                .onCacheMissResume(Flux.range(1, 10))
                .andWriteWith((k, sigs) -> Flux.fromIterable(sigs)
                                               .dematerialize()
                                               .last()
                                               .doOnNext(max -> storeRef.updateAndGet(ctx -> ctx.put(k, max)))
                                               .then());
Copy the code

Map endpoints

    String key = "myCategory";
    LoadingCache<String, Object> graphs = Caffeine
        .newBuilder()
        .maximumSize(10_000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .refreshAfterWrite(1, TimeUnit.MINUTES)
        .build(key -> createExpensiveGraph(key));

    Flux<Integer> cachedMyCategory = CacheFlux
        .lookup(graphs.asMap(), key, Integer.class)
        .onCacheMissResume(repository.findAllByCategory(key));
Copy the code

Let’s use the example of Map endpoints.

As seen in the Map endpoints example above, Caffeine Cache is used, which can also be changed to Guava Cache or ConcurrentHashMap or even Hashmap. The use of Hashmap depends on concurrency logic, especially when writing to Cache.

The code logic is simple: lookUp the cache or, if not, onCacheMissResume the data source.

Problems encountered

After defining LoadingCache, I wanted to update the cache using refresh(key), but something happened. Here is the loadingCache.build () method DEFINITION I used.

.build(new CacheLoader<String, Object>() { @Override public YourType load(String key) throws Exception { return getYourTypeValue(key); }})Copy the code

However, using the CacheFlux after flushing the cache will result in an error:

Content of cache for key xxx cannot be cast to List<Signal>
Copy the code

Say a premature conclusion:

Can’t passrefresh()Method to flush the cache

Why is that? Let’s take a look at how this piece of source code is written.

The source code parsing

public static <KEY, VALUE> FluxCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super List> cacheMap, KEY key, Class<VALUE> valueClass) {
	return otherSupplier ->
		Flux.defer(() -> {
			Object fromCache = cacheMap.get(key);
            if (fromCache == null) {
				return otherSupplier.get()
					.materialize()
					.collectList()
					.doOnNext(signals -> cacheMap.put(key, signals))
					.flatMapIterable(Function.identity())
					.dematerialize();
			} else if (fromCache instanceof List) {
				try {
					@SuppressWarnings("unchecked")
					List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
					return Flux.fromIterable(fromCacheSignals)
                    	.dematerialize();
				}
				catch (Throwable cause) {
					return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " cannot be cast to List<Signal>", cause));
				}
			} else {
				return Flux.error(new IllegalArgumentException("Content of cache for key " + key + " is not a List"));
			}
		});
}
Copy the code

The logic above is simple, where otherSupplier actually refers to the parameter in the onCacheMissResume method; Flux.defer is, as the name implies, lazy-loaded, loading only when needed.

First, the code retrieves the cache contents for the key from cacheMap, and then evaluates if it doesn’t, using otherSupplier to read the data source.

if (fromCache == null) { return otherSupplier.get() .materialize() .collectList() .doOnNext(signals -> cacheMap.put(key,  signals)) .flatMapIterable(Function.identity()) .dematerialize(); }Copy the code

There are three details. One is the Materialize method, which visualizes the corresponding data. We know that Flux and Mono operators in Reactive are descriptions of data operations, not data objects themselves, and we cannot cache operators. For example, our various operators are conduits through which data is manipulated to form the required content, which is consumed by the subscibe() method. The real object (or data) consumed is the subscription in the source object.

public final Flux<Signal<T>> materialize() {
	return onAssembly(new FluxMaterialize<>(this));
}
Copy the code

Through Materialize we convert data into Signal

, and this Singal contains the data we need to consume, which can be used as the data source for consumption.

Another detail is in the doOnNext method, which takes the Signals object and puts the data back into cacheMap, so we don’t have to explicitly insert the data into the cache in our code.

.doOnNext(signals -> cacheMap.put(key, signals))
Copy the code

The third method is dematerialize(), which, as opposed to materialize(), converts Signal

to T for the next operation or consumption.

public final <X> Flux<X> dematerialize() { @SuppressWarnings("unchecked") Flux<Signal<X>> thiz = (Flux<Signal<X>>) this;  return onAssembly(new FluxDematerialize<>(thiz)); }Copy the code

And what we find is thatSingalIs a container for storing actual data data.

Next, let’s see if we can get the cache contents of the key in cacheMap. If the cache is a List object, a type conversion is performed to convert Signal

to T, dematerialize().

else if (fromCache instanceof List) {
	try {
    	@SuppressWarnings("unchecked")
		List<Signal<VALUE>> fromCacheSignals = (List<Signal<VALUE>>) fromCache;
		return Flux.fromIterable(fromCacheSignals)
        	.dematerialize();
	}
Copy the code

So you can see why there’s an error in using the cache after a direct cache refresh, right? FromCache (List

>) fromCache (List< VALUE>>) fromCache (List< VALUE>>) fromCache (List< VALUE>>) fromCache (List< VALUE>>) fromCache (List< VALUE>>) fromCache

.build(new CacheLoader<String, Object>() { @Override public YourType load(String key) throws Exception { return getYourTypeValue(key); }})Copy the code

conclusion

1. CacheMap query -> If not -> Query data using the method specified in onCacheMissResume -> Materialize data using the materialize() method. To < T > T into a Signal - > write cache map - > by dematerialize () method, the realization of the Sinal turn < T > T - > to use 2. Next cacheMap query - - > > if you have passed the dematerialize () method, Turn Sinal<T> into T -> to the next stepCopy the code

How to improve? (D)

From the above code analysis, we can see that the cache holds Singal

, so we need to convert T to Singal

and put it in the cache.

.build(new CacheLoader<String, Object>() { @Override public YourType load(String key) throws Exception { return Signal.next(getYourTypeValue(key)); }})Copy the code

Use signal.next (getYourTypeValue(key)) to verify that it is available in the cache.