Redis (a) how to achieve a fixed size cache?

Java implements the principle of Redis expire from zero script

Java from zero handwritten implementation of Redis (three) memory data how to restart not lost?

Java from zero handwritten implementation of Redis (four) add listeners

Java from zero handwritten implementation of Redis (five) expiration strategy of another way of thinking

In front of us simple to achieve a few Redis features, Java from zero to achieve Redis (three) memory data restart not lost? Redis-like RDB pattern is implemented in.

Redis aof basis

Redis AOF persistence detail

Some personal understanding of AOF

Why choose AOF?

The performance of AOF mode is particularly good, how good is it?

For those of you who have used Kafka, Kafka also uses this feature of sequential writing.

Sequential write to add file content, avoid random file IO write problems, performance and memory can be comparable.

AOF has better real-time performance, which is compared with RDB mode.

We originally used RDB mode to persist all the cached contents. This is a time-consuming action, and it is usually persisted every few minutes.

The AOF mode is basically instructions for modifying the content, and then adding all the instructions in order to the file. In this way, the real-time will be much better, can be promoted to second level, even second level.

Throughput of AOF

The AOF mode can be persisted for every operation, but this can result in a significant throughput degradation.

The most common way to improve throughput is to batch, which is similar in Kafka. For example, we can persist once for 1s, putting all the operations within 1s into the buffer.

This is actually a trade-off problem, the art of balancing real-time and throughput.

In actual business, the error of 1s is generally acceptable, so this is also a relatively accepted way in the industry.

Asynchronous + multithreading of AOF

All operations in Kafka are implemented asynchronously plus callbacks.

Asynchronous + multithreading can really improve the performance of operations.

Of course, until Redis 6, it was always single-threaded. So why is performance still so good?

In fact, there is a cost to multi-threading, that is, thread context switching needs to be time-consuming, maintain concurrency security issues, also need to lock, thus reducing performance.

So here we need to consider whether the benefits of asynchrony are proportional to the cost of time.

AOF trading

Our AOF and RDB modes, in the final analysis, are based on the file system of the operating system to do persistence.

For developers, this may be done by calling an API, but the actual action of persisting a drop is not necessarily done in one step.

File systems also use buffer-like methods to improve throughput. This suddenly smells like a Russian doll.

But the good design is always the same, like the cache from the design of the CPU has L1/L2 and so on, the idea is the same.

Many of Ali’s open source technologies will be further optimized for the downloading of the operating system, which we will further study in the future.

The defect of AOF

There is no silver bullet where the road is short.

AOF is all well and good, but there is one drawback when compared to RDB: instructions

Java implementation


The interface is consistent with the RDB

/** * @author binbin.hou * @since 0.0.7 * @param <K> key * @param <V> value */ public interface IcachePersist <K, V> {/** * @Param cache * @Since 0.0.7 */ void Persist (final ICache<K, V> cache); }

Annotations to define

In order to be consistent with time statistics, refresh and other features, we also specify the actions of the operation class in the file (append to file) based on the annotation attribute, rather than fixed in the code, which is convenient for later expansion and adjustment.

/** ** * @Author binbin.hou * @Since 0.0.5 */ @Documented @Inherited @Target(elementTyp.method) @Retention(RetentionPolicy. Runtime) public @interface CacheInterceptor {/** ** @@Retention (RetentionPolicy. Runtime) public @interface CacheInterceptor { Defaults to false * for operations that have changed the cache contents, excluding query operations. * Includes delete, add, expire and other operations. * @return whether * @since 0.0.10 */ Boolean aof() default false; }

We added the AOF attribute to the original @CacheInterceptor annotation, which specifies whether or not to enable AOF mode for the operation.

A method to specify the AOF schema

We specify this attribute on methods that make changes to the data:

Overdue operation

Similar to Spring’s transaction interceptor, we use a proxy class to call ExpireAt.

The expire method does not need to add AOF intercepts.

/** * set the expiry date * @Param timeInMills * @Param timeInMills * @Return this */ @Override @CacheInterceptor public ICache<K, V> expire(K key, long timeInMills) { long expireTime = System.currentTimeMillis() + timeInMills; // Cache<K,V bb0 CachePoxy = (Cache<K, V>) CacheProxy.getProxy(this); return cachePoxy.expireAt(key, expireTime); } /** * @Param key * @Param timeinMills timestamp * @Return this */ @Override @CacheInterceptor(AOF = true) public ICache<K, V> expireAt(K key, long timeInMills) { this.expire.expire(key, timeInMills); return this; }

Changes to the operating

@Override @CacheInterceptor(aof = true) public V put(K key, V value) {// 1.1.1 try to remove CacheevictContext <K,V> context = new CacheevictContext <>(); context.key(key).size(sizeLimit).cache(this); boolean evictResult = evict.evict(context); If (evictResult) {/ / execution out listener ICacheRemoveListenerContext < K, V > removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code()); for(ICacheRemoveListener<K,V> listener : this.removeListeners) { listener.listen(removeListenerContext); }} // if(isSizeLimit()) {throw new CacherUntimeException (" This queue is full. ); } //3. Execute add return map.put(key, value); } @Override @CacheInterceptor(aof = true) public V remove(Object key) { return map.remove(key); } @Override @CacheInterceptor(aof = true) public void putAll(Map<? extends K, ? extends V> m) { map.putAll(m); } @Override @CacheInterceptor(refresh = true, aof = true) public void clear() { map.clear(); }

AOF persistence interception implementation

Persistence object definitions

@PersistaOfEntry {/** * @PersistaOfEntry ** @PersistaOfEntry */ ** ** @PersistaOfEntry */ ** * @PersistaOfEntry */ ** * @PersistaOfEntry */ ** * @PersistaOfEntry */ ** * @PersistaOfEntry */ private Object[] params; Private String methodName; private String methodName; private String methodName; //getter & setter &toString }

All we need here is the name of the method and the parameter object.

The temporary implementation of some simple can.

Persistence interceptors

We define an interceptor that puts information about the operation into the list of buffers that CachePersistaOf holds when the persistence class is defined in the cache.

public class CacheInterceptorAof<K,V> implements ICacheInterceptor<K, V> { private static final Log log = LogFactory.getLog(CacheInterceptorAof.class); @Override public void before(ICacheInterceptorContext<K,V> context) { } @Override public void After (icacheinterceptorContext <K,V> context) {// IcCache <K,V> cache = context.cache(); ICachePersist<K,V> persist = cache.persist(); if(persist instanceof CachePersistAof) { CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist; String methodName = context.method().getName(); PersistAofEntry aofEntry = PersistAofEntry.newInstance(); aofEntry.setMethodName(methodName); aofEntry.setParams(context.params()); String json = JSON.toJSONString(aofEntry); Debug ("AOF starts appending file contents: {}", json); cachePersistAof.append(json); Log.debug ("AOF completes appending file contents: {}", json); }}}

Interceptor call

This interceptor is called when the AOF annotation property is true.

To avoid waste, the call is made only if the persistence class is in AOF mode.

//3. AOF appends final IcachePersist to cachePersist = cache.Persist (); if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) { if(before) { persistInterceptors.before(interceptorContext); } else { persistInterceptors.after(interceptorContext); }}

AOF persistence implementation

The AOF schema here is just a different schema from the previous RDB persistence classes, which are actually the same interface.


Here we have uniformly defined the time of different persistence classes so that RDB and AOF can fire at different intervals for different tasks.

Public interface icachePersist <K, V> {/** * @Param cache * @Since 0.0.7 */ void Persist (final ICache<K, V>) {/** * @Param cache * @Since 0.0.7 */ V> cache); /** * delay(); /** * delay(); /** * period * @return period * @since 0.0.10 */ long period(); /** ** ** @return * @since 0.0.10 */ timeUnit timeUnit(); }

Persistence class implementation

Implement a list of buffers for each interceptor to be added directly and sequentially.

The implementation of persistence is also relatively simple. After appending to the file, the buffer list can be cleared directly.

Public class CachePersistaof <K,V> extends; /** * Persistaof <K,V> extends CachePersistAdaptor<K,V> { private static final Log log = LogFactory.getLog(CachePersistAof.class); Private final List<String bb0 BufferList = new ArrayList<>(); Private final String dbPath; /** ** private final String dbPath; public CachePersistAof(String dbPath) { this.dbPath = dbPath; } /** * persist * key length key+value * first space, get the length of key, * @Param cache cache */ @Override public void persist(ICache<K, V> cache) {" Start AOF persist to file "); // 1. Create the file if(! FileUtil.exists(dbPath)) { FileUtil.createFile(dbPath); } // 2. Persistence appends to file fileutil.append (dbPath, bufferList); // 3. Clear Buffer List (); (" Complete AOF persistence to file "); } @Override public long delay() { return 1; } @Override public long period() { return 1; } @Override public TimeUnit timeUnit() { return TimeUnit.SECONDS; } /** * Add file content to Buffer list * @param json json information * @since 0.0.10 */ public void append(final String json) { if(StringUtil.isNotEmpty(json)) { bufferList.add(json); }}}

Persistence tests

The test code

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .persist(CachePersists.<String, String>aof("1.aof"))
cache.put("1", "1");
cache.expire("1", 10);

The test log

Expire actually calls expireAt.

[the DEBUG] [the 2020-10-02 12:20:41. 979] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF began to add the file content: {" methodName ":" put ", "params" : [" 1 ", "1"]} [DEBUG] [the 2020-10-02 12:20:41. 980] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF complete additional content of the file: {" methodName ":" put ", "params" : [" 1 ", "1"]} [DEBUG] [the 2020-10-02 12:20:41. 982] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF began to add the file content: {"methodName":"expireAt","params":["1",1601612441990]} [DEBUG] [2020-10-02 12:20:41.982] [main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF complete additional content of the file: {"methodName":"expireAt","params":["1",1601612441990]} [DEBUG] [2020-10-02 12:20:41.984] [main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF began to add the file content: {" methodName ":" remove ", "params:"/" 2 "} [DEBUG] [the 2020-10-02 12:20:41. 984] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF complete additional content of the file: {" methodName ":" remove ", "params:"/" 2 "} [DEBUG] [the 2020-10-02 12:20:42. 088] [] - thread pool - 1-1 [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: Expire [INFO] [the 2020-10-02 12:20:42. 789] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.I nnerCachePersist. Run] - began to persistent cache information [INFO] [the 2020-10-02 12:20:42. 789] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.C achePersistAof. Persist] - start AOF persisted to the document [INFO] [in the 2020-10-02 s 12:20:42. 798] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.C achePersistAof. Persist] - complete AOF persisted to the document [INFO] [the 2020-10-02 12:20:42. 799] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.I nnerCachePersist. Run] - complete the persistent cache information

The file content

1. The contents of AOF documents are as follows


Each operation is simply stored in a file.

AOF loading implementation


Similar to the loading mode of RDB, the loading mode of AOF is similar.

We need to restore the contents of the previous cache based on the contents of the file.

Implement idea: traverse the file content, reflection calls the original method.

Code implementation

Parse the file

@Override public void load(ICache<K, V> cache) { List<String> lines = FileUtil.readAllLines(dbPath); ("[load] starts processing path: {}", dbPath); If (CollectionUtil.IsEmpty (lines)) {"[load] path: {} ", dbPath); if(CollectionUtil.IsEmpty (lines)) {"[load] path: {} ", dbPath); return; } for(String line : lines) { if(StringUtil.isEmpty(line)) { continue; } // Perform // simple type will work, but complex deserialization will fail. PersistaOfEntry = JSSON. ParseObject (line, PersistaOfEntry. final String methodName = entry.getMethodName(); final Object[] objects = entry.getParams(); final Method method = METHOD_MAP.get(methodName); ReflectMethodUdutil. Invoke (Cache, Method, Objects); }}

Preloading of method mappings

Method reflection is fixed, in order to improve performance, we do some pre-processing.

/ * * * * * method cache temporarily is simpler, can be directly through the method of judgment, don't have to introduce the parameter types increase complexity. * @since 0.0.10 */ private static final Map<String, Method bb0 method_Map = new HashMap<>(); static { Method[] methods = Cache.class.getMethods(); for(Method method : methods){ CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class); if(cacheInterceptor ! = null) {if(CacheInterceptor.aof ()) {String MethodName = Method.getName (); METHOD_MAP.put(methodName, method); }}}}


The file content

  • default.aof


ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .load(CacheLoads.<String, String>aof("default.aof"))

Assert.assertEquals(1, cache.size());

Load the Default. aof file directly into the cache cache.


Redis’s file persistence is actually richer.

Mixed use of RDB and AOF modes can be supported.

The size of the files in AOF mode can be very large, and Redis solves this problem by periodically compressing the commands.

So you can think of AOF as an operating flow table, and we really only care about the final state, no matter how many steps we take, we only care about the final value.

The article mainly tells about the train of thought, the realization part because of the space limitation, did not post all out.

Open source address:

If you found this article helpful, please feel free to join us in the comments collection

Your encouragement is my greatest motivation