Recently, due to work needs, I have to get a Node.js Redis Client component out, temporarily chose to use ioredis as the fork object. Because Redis has been unable to connect to the server when using Twemproxy. For details, see issues: github.com/luin/ioredi… So I will modify the source code to fix this problem, but after the modification is completed, the unit test found that things are not so simple, not just info -> ping, so I had to familiarize myself with the source code, and then adjust the logic accordingly.

Ioredis project structure

From the project, the source code is in the lib folder, is a pure TS project. Files in the lib directory provide general purpose capabilities such as command, pipeline, and data transfer.

. ├ ─ ─ DataHandler. Ts# Data processing├── ─ High School exercises. Partial Exercises# Implementation of Redis Cluster mode│ │ ├ ─ ─ ClusterOptions. Ts ├ ─ ─ ClusterSubscriber. Ts │ ├ ─ ─ ConnectionPool. Ts │ ├ ─ ─ DelayQueue. Ts │ ├ ─ ─ but ts │ └ ─ ─ Util. Ts ├ ─ ─ command. TsA concrete implementation of the command├ ─ ─ commander. Ts# command dispatcher├ ─ ─ connectors# Network connection is relevant│ │ ├ ─ ─ AbstractConnector. Ts ├ ─ ─ SentinelConnector │ ├ ─ ─ StandaloneConnector. Ts │ └ ─ ─ but ts ├ ─ ─ errors# Exception information is related│ │ ├ ─ ─ ClusterAllFailedError. Ts ├ ─ ─ MaxRetriesPerRequestError. Ts │ └ ─ ─ but ts ├ ─ ─ index. The ts# import file├ ─ ─ pipeline. Ts# pipeline logic├ ─ ─ promiseContainer. Ts# a wrapper for Promise├ ─ ─ Redis# 'Redis instance implementation'│ │ ├ ─ ─ RedisOptions. Ts ├ ─ ─ event_handler. Ts │ └ ─ ─ but ts ├ ─ ─ script. The ts ├ ─ ─, ts ├ ─ ─ types. The ts └ ─ ─ utils# Some tool function implementation├── ├─ ├─ bass exercisesCopy the code

The two folders, Redis and Cluster, are specific Redis client implementations, and cluster is the corresponding cluster implementation. So when we watch the README we will find that there are two instances can use, www.npmjs.com/package/ior…

new Redis
new Redis.Cluster
Copy the code

Let’s start with the most common Redis. This note is mainly aimed at Redis, combined with README step by step to clarify the logic.

const `Redis` = require("ioredis");
const `Redis` = `new Redis`(a); redis.set("foo"."bar");
redis.get("foo".function (err, result) {
  if (err) {
    console.error(err);
  } else {
    console.log(result); }});Copy the code

The basic order of use is to instantiate a Redis object and then call the Redis command. If you are not familiar with Redis commands, take a look at the website Redis. IO /commands#

The entry code is located in redis/index.ts. Although Ioredis uses TS, the constructor is implemented using the ancient ES5 method, inheriting the EventEmitter and Commander classes, respectively. The second is a class provided by Ioredis itself, implemented in the commander. Ts file.

Redis instantiation

What Redis does is:

  • Establish and maintain withRedis ServerNetwork connection
  • Health check
  • The maintenance queue ensures that requests are not lost when exceptions occur and can be retried

If you go back to Redis, you’ll see an assignment for this.connector. Leaving out the custom connector and Sentinels and just looking at the last one, the most common StandaloneConnector, This is used to establish a connection to Redis Server. Through lib/connectors/StandaloneConnector ts files will be found that the final call is net. The createConnection, this also can actually and we in the above mentioned RESP corresponds, Is to use the most basic Redis communication protocol to complete the operation.

After the parameters are initialized, connect is called to establish a real connection with Redis Server.

Net module createConnection can only establish a network connection and is not guaranteed to be the Redis service we expect. Socket client: github.com/luin/ioredi… In the connect method, the main task is to establish a link to the Redis Server. After the connection is established, the event_Handler.connecthandler method is called. There are two main things done here:

  1. Try to checkRedis ServerThat’s the pit we were talking about at the beginning, we can get throughRedis.prototype._readyCheckMethod to see the concrete implementation,ioredisusinginfoCommand as probe, but this is intwemproxyClustered mode causes some problems because it disables commands, among othersinfo, then this will lead toRedis ClientAlways assume that the service is unavailable.
  2. Added targets forsocket clientdataEvent listening, which is used to receive returned data later, the main logic inDataHandler.ts, which will be mentioned later.

The logic for readyCheck exists in the redis/index.ts and redis/event_handler.ts files

Redis.prototype._readyCheck = function (callback) {
  const _this = this;
  this.info(function (err, res) {
    if (err) {
      return callback(err);
    }
    if (typeofres ! = ="string") {
      return callback(null, res);
    }

    const info: { [key: string]: any } = {};

    const lines = res.split("\r\n");
    for (let i = 0; i < lines.length; ++i) {
      const [fieldName, ...fieldValueParts] = lines[i].split(":");
      const fieldValue = fieldValueParts.join(":");
      if(fieldValue) { info[fieldName] = fieldValue; }}if(! info.loading || info.loading ==="0") {
      callback(null, info);
    } else {
      const loadingEtaMs = (info.loading_eta_seconds || 1) * 1000;
      const retryTime =
        _this.options.maxLoadingRetryTime &&
        _this.options.maxLoadingRetryTime < loadingEtaMs
          ? _this.options.maxLoadingRetryTime
          : loadingEtaMs;
      debug("Redis server still loading, trying again in " + retryTime + "ms");
      setTimeout(function () { _this._readyCheck(callback); }, retryTime); }}); };Copy the code

Ioredis does not send an error telling you that the connection was not established. If Redis is available, ioredis does not send an error telling you that the connection was not established. Instead, it is stored in a queue of its own and sent out in order when it is available. This is what Redis does in the process of instantiation. We will look at the logic of Redis command execution.

Commander

The role of the Commander is to implement the various Redis Client orders, through www.npmjs.com/package/red… I’m going to iterate. The Ready state of the Client is processed, and some operations such as temporary commands are performed before the Ready state. More like an abstract class, because both Redis and Redis Cluster inherit and override apis to get things done.

commands.forEach(function (commandName) {
  Commander.prototype[commandName] = generateFunction(commandName, "utf8");
  Commander.prototype[commandName + "Buffer"] = generateFunction(
    commandName,
    null
  );
});

function generateFunction(_encoding: string);
function generateFunction(_commandName: string | void, _encoding: string);
function generateFunction(_commandName? : string, _encoding? : string) {
  if (typeof _encoding === "undefined") {
    _encoding = _commandName;
    _commandName = null;
  }

  return function (. args) {
    const commandName = _commandName || args.shift();
    let callback = args[args.length - 1];

    if (typeof callback === "function") {
      args.pop();
    } else {
      callback = undefined;
    }

    const options = {
      errorStack: this.options.showFriendlyErrorStack
        ? new Error().stack
        : undefined.keyPrefix: this.options.keyPrefix,
      replyEncoding: _encoding,
    };

    if (this.options.dropBufferSupport && ! _encoding) {return asCallback(
        PromiseContainer.get().reject(new Error(DROP_BUFFER_SUPPORT_ERROR)),
        callback
      );
    }

    // No auto pipeline, use regular command sending
    if(! shouldUseAutoPipelining(this, commandName)) {
      return this.sendCommand(
        new Command(commandName, args, options, callback)
      );
    }

    // Create a new pipeline and make sure it's scheduled
    return executeWithAutoPipelining(this, commandName, args, callback);
  };
}
Copy the code

All commands are implemented along with a number of Buffer apis, whose main difference can be seen in the implementation of the generateFunction function, passed into the Command instance. A Command object is a concrete Command implementation, so we need to look at Command first.

Command

Command is responsible for processing parameters, processing return values, generating the actual value of the Command transfer, and firing the callback.

instantiation

During the instantiation of Command, a Promise object is generated internally by calling an initPromise method in addition to assigning some attributes. There are two important processing, one is the conversion of parameters, and the other is the processing of return values.

private initPromise() {
  const Promise = getPromise();
  const promise = new Promise((resolve, reject) = > {
    if (!this.transformed) {
      this.transformed = true;
      const transformer = Command._transformer.argument[this.name];
      if (transformer) {
        this.args = transformer(this.args);
      }
      this.stringifyArguments();
    }

    this.resolve = this._convertValue(resolve);
    if (this.errorStack) {
      this.reject = (err) = > {
        reject(optimizeErrorStack(err, this.errorStack, __dirname));
      };
    } else {
      this.reject = reject; }});this.promise = asCallback(promise, this.callback);
}
Copy the code

Special processing of parameters and return values

If you retrieve the Command. Ts file, you’ll find that the Command._transformer. Argument is set through the setArgumentTransformer method. Then look at the few hset commands in the code that are useful to setArgumentTransformer, as well as the mset commands.

Command.setArgumentTransformer("hmset".function (args) {
  if (args.length === 2) {
    if (typeof Map! = ="undefined" && args[1] instanceof Map) {
      return [args[0]].concat(convertMapToArray(args[1]));
    }
    if (typeof args[1= = ="object" && args[1]! = =null) {
      return [args[0]].concat(convertObjectToArray(args[1])); }}return args;
});
Copy the code

If you’ve used Redis’s hash set operation, you’ll know that multiple keys are appended:

> HMSET key field value [field value ...]
Copy the code

In this way, it is also necessary to pass in an array to use in JS, and maintain the key value of the array by the user. Such a sequential operation mode, is certainly not comfortable to pass parameters to write JS accustomed Object, so Ioredis provides a parameter conversion logic, To convert an Object to a one-dimensional array:

export function convertObjectToArray(obj) {
  const result = [];
  const keys = Object.keys(obj);

  for (let i = 0, l = keys.length; i < l; i++) {
    result.push(keys[i], obj[keys[i]]);
  }
  return result;
}

export function convertMapToArray<K.V> (map: Map<K, V>) :Array<K | V> {
  const result = [];
  let pos = 0;
  map.forEach(function (value, key) {
    result[pos] = key;
    result[pos + 1] = value;
    pos += 2;
  });
  return result;
}
Copy the code

If you look closely at Command._transformer, you’ll see that there is also a reply property value, where the logic is mainly reflected in _convertValue, which basically means that upon receiving the return value, the custom function we passed in will be called first to process the return value. At present, the only thing used in code translation is the processing logic of HGEtall. Hmget and Hgetall both return an array of data in Redis, while Ioredis splines the array into an Object according to KV format to facilitate user operation.

Command.setReplyTransformer("hgetall".function (result) {
  if (Array.isArray(result)) {
    const obj = {};
    for (let i = 0; i < result.length; i += 2) {
      obj[result[i]] = result[i + 1];
    }
    return obj;
  }
  return result;
});
Copy the code

Set up thekeyThe prefix

If you look at the Command instantiation process, you’ll also see a call to _iterateKeys that does two things:

  1. Extract all keys from the parameter
  2. Optionally add a prefix to the key

The redis-commands function internally uses two APIS, EXISTS and getKeyIndexes, to get the subscripts of all keys in the parameter array. Because this function does two things, when you first see the use of the constructor and look at the implementation of the function, you will be confused by the return of this.keys at the end, but when you see that Command also provides a getKeys API it becomes clear what the logic is.

If keyPrefix is set, _iterateKeys is triggered to adjust the key name and stored in keys to return the value. When getKeys is called, if keyPrefix is not set, the same logic is performed using the default null handler, which fetches all keys and returns them. If keyPrefix has been set before, this. Keys is returned and the logic is not repeated.

// the logic inside the constructor
if (options.keyPrefix) {
  this._iterateKeys((key) = > options.keyPrefix + key);
}

// The location of the other call
public getKeys(): Array<string | Buffer> {
  return this._iterateKeys();
}

private _iterateKeys(
  transform: Function = (key) = > key
): Array<string | Buffer> {
  if (typeof this.keys === "undefined") {
    this.keys = [];
    if (commands.exists(this.name)) {
      const keyIndexes = commands.getKeyIndexes(this.name, this.args);
      for (const index of keyIndexes) {
        this.args[index] = transform(this.args[index]);
        this.keys.push(this.args[index] asstring | Buffer); }}}return this.keys;
}
Copy the code

Send command data generation

You should use Redis mostly through the Client code to call various commands, occasionally through the Redis – CLI direct command line operation. However, Redis uses a Protocol called RESP (Redis Serialization Protocol) for transmission. If the native machine has Redis, we can easily demonstrate in the local.

> echo -e '*1\r\n$4\r\nPING\r\n'| nc 127.0.0.1 6379 + PONGCopy the code

We’re going to get a +PONG string. Such an interaction is actually the format used by most clients to interact with Redis Server.

P.S. RESP provides a human-readable version for interaction, but with lower performance.

To give an example of how to write a set and get command:

# indicates a comment

# SET hello world
# number of parameters* 3The length of the command line (set command)
$3
The value of the command (set)
SET
The length of the command line (key: hello)
A $5
The value of the command (key: hello)
hello
# The length of the command line (value)
A $5
# value ontology
world

# GET hello
# number of parameters* 2The length of the command line (get command)
$3
The value of the command (get)
GET
The length of the command line (key: hello)
A $5
The value of the command (key: hello)
hello
Copy the code

The return value of set is an unsurprising +OK, while the return value of GET has two lines, the first line $5 indicates the length of the return value, and the second line is the actual return value world. So if you look at the toWritable function of Command, it implements this logic, because it is too long, so I will not post: github.com/luin/ioredi…

Command mainly implements these logic. In the line of sight of Commander, we can see that this. SendCommand is executed at the end of all Command calls, and the specific scheduling is done in Redis, Redis Cluster and other specific implementations. So we can go back to Redis and look at the implementation logic.

Redis sends commands

The sendCommand implementation checks for Redis status and handles wait or end. We then check to see if the current state is in a state that can send commands:

let writable =
    this.status === "ready"| | (! stream &&this.status === "connect" &&
      commands.exists(command.name) &&
      commands.hasFlag(command.name, "loading"));
  if (!this.stream) {
    writable = false;
  } else if (!this.stream.writable) {
    writable = false;
  } else if (this.stream._writableState && this.stream._writableState.ended) {
    writable = false;
  }
Copy the code

The code is relatively clear. It should also be mentioned here that the problem with the info command is that the ping command was used to replace info, which was initially stuck here. The subsequent debug found that the ping command does not have the flag feature of loading. So the ping commands are put in offlineQueue, and in this case, we add an extra judgment logic to ping to make sure write is true.

Then, if write is true, we will send our actual command using stream, which is the socket connection established earlier. In this case, we will call write and pass in the return value of Command#toWritable as data. The resP-based serialization mentioned earlier. It also puts some information into a commandQueue, which is an instance of the same type as offlineQueue, which will be explained later.

this.commandQueue.push({
  command: command,  / / Command instances
  stream: stream,    // Socket client (there is no place to use it, don't know why to pass it)
  select: this.condition.select, // This is also not used
});
Copy the code

Another open source module, denque: www.npmjs.com/package/den…

If write is false, the command is placed in offlineQueue.

The command. Promise object is instantiated, and resolve and reject are referenced, which will be used when data is returned. After the React command has been sent, the next step is to wait for the data to be returned, which is what the connect DataHandler instance did after the React instantiation.

DataHandler

DataHandler is an unusual way to write a class because it is used as new but does not receive a return value. In the constructor, it does two things, one is to instantiate a RedisParser object and the other is to listen for the redis.stream.on(‘data’) event, which is the socket client that we passed when we instantiated Redis. Call the Redisparser.execute to complete the resolution when the data event is triggered. RedisParser is another open source module, interested friends can look here: www.npmjs.com/package/red… We can now assume that after we call the execute method, we will call the return Reply passed in during the instantiation. This is a parsed response, and we will get this response and then we will fetch the objects passed in from the commandQueue. Fetching is done as a queue, shift, fetching the first element in the queue at a time. We then call the resolve method of the command attribute on the element, which is the callback passed in when we call the various Redis commands.

Here we need to add some knowledge about Redis, we can see from the whole logical link, roughly like this:

  1. User Running commands
  2. RedisinstantiationCommandAnd put it in a queue
  3. After the data response is received, the data is parsed and the first element in the queue is retrieved, and the correspondingcallback

A lot of Redis requests may be sent at the same time, but it is not necessary to determine which command corresponds to the response after receiving data, because Redis itself is also a single-process working mode, and the command processing will be processed according to the sequence of receiving data. Since ioredis uses the same socket, there is no need to change the order in which commands are sent to the remote end. So we can safely handle data in the simplest way, push + Shift.

This is why some large key operations cause the entire Redis service to respond slowly. (Without doing sharding or anything like that)

summary

So far, we have combed through the logic of the Redis Client in normal mode, from creation to sending commands to receiving return values. We will post a note on Redis Cluster and see how the processing logic is different in Cluster mode.

The resources

  • ioredis
  • redis commands
  • Node.js | net
  • Why Redis is single-threaded and why is Redis so fast!
  • Redis is single-threaded, then how does it do concurrent I/O?