Simple CacheVerticle design idea

As a simple cache, the first field you should have is a Map<String,Any> that represents a cache of key-value pairs

In order not to pay extra cost of thread synchronization to operate cache, we can deploy it in Verticle under a worker model and expose the API of cache operation, such as adding and deleting, externally based on EventBus

Now you can write pseudo code like this

class CacheVerticle:AbstractVerticle(){
    private val cache = mutableMapOf<String,Any>()
    override fun start() {
        val eventbus = vertx.eventBus()
        eventbus.localConsumer(GET_CACHE_ADDRESS){//todo}
        eventbus.localConsumer(ADD_CACHE_ADDRESS){//todo}
        eventbus.localConsumer(REMOVE_CACHE_ADDRESS){//todo}
    }    
}
Copy the code

The next step is to complete the handler content for localConsumer

The add method

Because Kotlin contains an infix function to that is more intuitive to use, I decided to design the exposed API as

fun <T> addCache(pair: Pair<String,T>) = Vertx.currentContext().owner().eventBus().send(ADD_CACHE_ADDRESS,pair)
Copy the code

Just addCache(” key “to 012)

Then we can write an EventBus MessageCodec for the Pair and pass it over

In this video, I designed a wrapper class AddWrapper

(val addPair:Pair

). There is no need for wrapper. Just write a MessageCodec for the Pair and register it with the evenbus instance
,t>

So just implement it that way

eventbus.localConsumer<AddWrapper<Any>>(ADD_CACHE_ADDRESS){
      val add = it.body()
      cache[add.first] = add.second
}
Copy the code

The remove method

Vertx itself provides String MessageCodec, so it can be passed directly

 eventbus.localConsumer<String>(REMOVE_CACHE_ADDRESS){
      val key = it.body()
      cache.remove(key)
    }
Copy the code

The get method

Since objects placed in the cache do not necessarily have corresponding MessageCodec, it is not practical to customize a MessageCodec for each of them.

Object::getClass is used as the key to get the registered CODEC. If it does not get the registered CODEC, an exception will be thrown. Both Java lambda and KT lambda are essentially subclasses of a runtime interface, so there is no way to provide MessageCodec for lambda

So my solution is to wrap a layer around it, with all kinds of variables as its contents. The getClass method of the outer wrapping Class will always return the same Class, so just register the MessageCodec of the enclosing Class

Since I was designed to be thread-safe not only for reading and writing to the entire cache (map instance), but also for the operations on the objects placed in it (such as modifying properties), I set the contents as keys, and a handler for values (which can use Pair

Unit passed on eventBus)
,(t?)).>

This handler will run directly on the thread where the cache verticle resides, so there is no need to worry about thread insecurity if the value of the same cache is changed concurrently. Simple changes to the cache contents are atomized, and the performance of the cache will be affected if there are time-consuming operations

The exposed API is

fun <T> getCache(key:String, handler: (T?). ->Unit){
  Vertx.currentContext().owner().eventBus().send(GET_CACHE_ADDRESS,key to handler)
    }
Copy the code

Use a T? The type reason is to deal with the case where the key does not exist

eventbus.localConsumer<Pair<String,(Any?) ->Unit>>(GET_CACHE_ADDRESS){
      val body = it.body()
      body.second(cache[body.first])
    }
​
Copy the code

Attach full implementation

class CacheVerticle: AbstractVerticle() {companion object{
    private const val GET_CACHE_ADDRESS = "get_cache_address"
    private const val ADD_CACHE_ADDRESS = "add_cache_address"
    private const val REMOVE_CACHE_ADDRESS = "remove_cache_address"
    fun <T> addCache(pair: Pair<String,T>) = Vertx.currentContext().owner().eventBus().send(ADD_CACHE_ADDRESS,pair)
    fun <T> getCache(key:String, handler: (T?). ->Unit){
      Vertx.currentContext().owner().eventBus().send(GET_CACHE_ADDRESS,key to handler)
    }
    fun remove(key:String){
      Vertx.currentContext().owner().eventBus().send(REMOVE_CACHE_ADDRESS,key)
    }
  }
  private val cache = mutableMapOf<String,Any>()
​
  override fun start(a) {
    val eventbus = vertx.eventBus()
    eventbus.registerDefaultCodec(Pair::class.java,UnModifiableObjectCodec(Pair::class.java)) eventbus.localConsumer<Pair<String,(Any?) ->Unit>>(GET_CACHE_ADDRESS){
      val body = it.body()
      body.second(cache[body.first])
    }
​
    eventbus.localConsumer<Pair<String,Any>>(ADD_CACHE_ADDRESS){
      val add = it.body()
      cache[add.first] = add.second
    }
​
    eventbus.localConsumer<String>(REMOVE_CACHE_ADDRESS){
      val key = it.body()
      cache.remove(key)
    }
  }
  class UnModifiableObjectCodec<T>(private val msgClass:Class<T>):MessageCodec<T,T>{
    override fun encodeToWire(buffer: Buffer? , s:T) {
      TODO("Not yet implemented")}override fun decodeFromWire(pos: Int, buffer: Buffer?).: T {
      TODO("Not yet implemented")}override fun transform(s: T)=s
​
    override fun name(a)=msgClass.name
​
    override fun systemCodecID(a): Byte {
      return -1}}}Copy the code

Method of use

class NormalVerticle:AbstractVerticle() {override fun start(a) {
      CacheVerticle.addCache("string" to UUID.randomUUID())
      CacheVerticle.getCache<UUID>("string") { it? .let { println(it) } } CacheVerticle.remove("string")}}Copy the code