Redis-py is a library for Python operations. This article includes the following sections:

  • Redis protocol specification
  • Redis – py overview
  • Basic use of Redis-Py

    • RedisCommand
    • Redis connection
    • The connection pool
  • pipeline
  • LuaScript
  • lock

Redis protocol specification

Resp (Redis Serialization Protocol) is the communication Protocol between Redis client and server. Examples of data are as follows:

+OK\r\n
-Error message\r\n
:1000\r\n
$6\r\nfoobar\r\n
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
*3\r\n:1\r\n:2\r\n:3\r\n

The protocol defines five types:

  1. +The prefix denotes a string, followed by string text, with\r\nEnding, usually used for command results
  2. -The prefix represents an exception message, followed by two strings concatenated by a space, with\r\nAt the end
  3. :The prefix denotes an integer, followed by an integer, with\r\nAt the end
  4. $The prefix denotes a fixed length string, followed by the length of the string,\r\nAnd string text to\r\nAt the end
  5. *The prefix denotes an array, followed by the sum of its length\r\n, each element of an array can be composed of one of the four types described above

The protocol also specifies the implementation of NULL, etc. See the link section for details. Here is an example of the request and response for Llen MyList

C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n

S: :48293\r\n
  • The client sends the LLEN mylist directive, which is serialized into an array of RESP length 2. The two fixed-length strings are LLEN and mylist.
  • The server responds to the integer 48293, which is the length of the MyList data.

Request-response model is the Request Response model of Redis service, which can be compared with the HTTP protocol pattern. The Redis server responds to the instructions of the client and responds to the client after processing, which can be simply understood as a question and answer. Except for Pipeline, Pub/Sub and Monitor, of course.

Redis-py source code overview

The version of Redis-py used in this paper is 3.5.3, and the file and package information are:

The name of the describe
client Redis API
connection Connections, connection pooling, etc
exceptions Exceptions and errors
lock The realization of the lock
sentinel Extended Sentinel connections
utils tool
_compat Both version adapters pack

Redis-py does not rely on other packages. Although the amount of code is small (around 6000 lines), it still requires some time and foundation to understand 100% of the code. This article introduces the implementation of these basic functions in the source code from the daily use of Redis-Py, which is also the content of Redis-Py’s README.

Basic use of Redis-Py

RedisCommand

Simple use of redis-py:

>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
b'bar'

Trace the implementation of Redis-py:

# client.py

class Redis(object)
    
     def __init__(self, host='localhost', port=6379,
                 db=0, ..):
        ...
        connection_pool = ConnectionPool(**kwargs)
        self.connection_pool = connection_pool
        ...
                 
    def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False)
        ...
        return self.execute_command('SET', *pieces)
    
    def get(self, name):
        return self.execute_command('GET', name)
    
     # COMMAND EXECUTION AND PROTOCOL PARSING
    def execute_command(self, *args, **options):
        "Execute a command and return a parsed response"
        conn = self.connection or pool.get_connection(command_name, **options)
        conn.send_command(*args)
        return self.parse_response(conn, command_name, **options)

Note: For ease of understanding, the sample code differs from the actual code, eliminating complex logic, exceptions, and so on

  • Redis first created a connection to the Redis service,
  • Redis wraps all Redis instructions and executes them in command mode.
  • Executing a command is to use the connection created to send the command, then parse and get the response. This is consistent with the behavior of the request-response model on the Redis protocol.

Redis connection

Continue to view connection creation and execution:

# connection.py class Connection(object) def __init__(...) : self.host = host self.port = int(port) self._sock = connect() def connect(): for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = socket.socket(family, socktype, proto) ... # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # connect sock.connect(socket_address) ... return sock def pack_command(self, *args): command = [] args = tuple(args[0].encode().split()) + args[1:] ... buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF)) for arg in imap(self.encoder.encode, args): buff = SYM_EMPTY.join( (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)) output.append(buff) output.append(arg) ... return command def send_command(self, *args, **kwargs): command = self.pack_command(args) if isinstance(command, str): command = [command] for item in command: self._sock.sendall(*args, **kwargs)
  • Connection maintains a socket connection
  • The command received by Redis uses pack\_command for serialized packaging of RESP
  • Packets are sent using sockets
# connection.py class Connection(object) def __init__(... ,parser_class=PythonParser,...) ; self._parser = parser_class(socket_read_size=socket_read_size) def read_response(self): response = self._parser.read_response() return response class PythonParser(BaseParser): "Plain Python parsing class" def __init__(self, socket_read_size): self.socket_read_size = socket_read_size ... self._sock = connection._sock self._buffer = SocketBuffer(self._sock, self.socket_read_size, connection.socket_timeout) self.encoder = connection.encoder def read_response(self): raw = self._buffer.readline() byte, response = raw[:1], raw[1:] # server returned an error if byte == b'-': response = nativestr(response) ... # single value elif byte == b'+': pass # int value elif byte == b':': response = long(response) # bulk response elif byte == b'$': length = int(response) response = self._buffer.read(length) # multi-bulk response elif byte == b'*': length = int(response) response = [self.read_response() for i in xrange(length)] if isinstance(response, bytes): response = self.encoder.decode(response) return response
  • Connection creates a Parser to read and parse the service response
  • The default PythonParser uses SocketBuffer to read socket data
  • Read_response implements the parsing process of the RESP protocol. For each row of data\r\n, the first character is the response type, and the rest of the data content, if it is multi-bulk, needs to loop through multiple rows. It is recommended to read the protocol and send the request in detail.

PythonParser is an implementation of Pure Python, and if you want to be more efficient, you can install additional Hiredis parsers, which provide a C-based HireDisparser.

The connection pool

Redis-py uses connection pool to improve execution efficiency. The main methods include three steps: create connection pool, obtain valid connection execution command from connection pool, and release connection after completion. The statement is as follows:

# redis.py
connection_pool = ConnectionPool(**kwargs)
pool.get_connection(command_name, **options)

try:
    conn.send_command(*args)
    ...
finally:
    ...
    pool.release(conn)

Connection pooling must be carefully released. You can either use a try/finally or use a context decorator, the former being used here

The specific implementation of connection pooling:

# connection.py class ConnectionPool(object): def __init__(...) : self._available_connections = [] self._in_use_connections = set() def make_connection(self): "Create a new connection" return self.connection_class(**self.connection_kwargs) def get_connection(self, command_name, *keys, **options) try: connection = self._available_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) ... connection.connect() return connection def release(self, connection): "Releases the connection back to the pool" try: self._in_use_connections.remove(connection) except KeyError: # Gracefully fail when a connection is returned to this pool # that the pool doesn't actually own pass self._available_connections.append(connection)
  • Within the connection pool, all connections are managed using an array of available connections and a collection of in-use connections
  • When retrieving a connection, it is preferred to retrieve it from the available connection array. A new connection is created when no connections are available
  • All acquired connections are added to the in-use connection. If the current connection is not connected, the connection is established first
  • When a connection is released, it is removed from the collection of in-use connections and added to an array of available connections, waiting for reuse

Here, we have basically straightened out a Redis instruction execution process:

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('foo', 'bar')

pipeline

Redis also supports the Pipeline mode, which allows you to batch some commands and get all the results:

>>> r = redis.Redis(...)
>>> pipe = r.pipeline()
>>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]

Pipeline inherits from Redis with some extensions

class Pipeline(Redis) def __init__(...) : self.command_stack = [] def execute_command(self, *args, **kwargs): self.command_stack.append((args, options)) return self def execute(self, raise_on_error=True): "Execute all the commands in the current pipeline" stack = self.command_stack execute = self._execute_pipeline execute(conn, stack, raise_on_error) def _execute_pipeline(self, connection, commands, raise_on_error): # build up all commands into a single request to increase network perf all_cmds = connection.pack_commands([args for args, _ in commands]) connection.send_packed_command(all_cmds) response = [] for args, options in commands: response.append( self.parse_response(connection, args[0], **options)) return response
  • Pipeline supports chained syntax by using a stack to temporarily store batch-sent commands and return them to itself
  • Execute is used to send the command
  • Send instructions and then get the service response, packaged as an array unified return

LuaScript

Redis uses the Lua script to handle transactions as follows:

>>> r = redis.Redis()
>>> lua = """
... local value = redis.call('GET', KEYS[1])
... value = tonumber(value)
... return value * ARGV[1]"""
>>> multiply = r.register_script(lua)
>>> r.set('foo', 2)
>>> multiply(keys=['foo'], args=[5])
10
  • The Lua script defines two arrays, KEYS and ARGV, to accept arguments. The first value of KEY (the Lua array starts at 1) is the name of the KEY, and the first value of ARGV is a multiple
  • Scripts need to be registered
  • In redis-py, parameters are passed to the script and the result is executed

The implementation principle of the script:

# client.py
class Redis(object):

    def register_script(self, script):
        return Script(self, script
        
    def script_load(self, script):
        "Load a Lua ``script`` into the script cache. Returns the SHA."
        return self.execute_command('SCRIPT LOAD', script)
    
    def evalsha(self, sha, numkeys, *keys_and_args):
        return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)

class Script(object):
    "An executable Lua script object returned by ``register_script``"

    def __init__(self, registered_client, script):
        self.registered_client = registered_client
        self.script = script
        # Precalculate and store the SHA1 hex digest of the script.
        ...
        self.sha = hashlib.sha1(script).hexdigest()

    def __call__(self, keys=[], args=[], client=None):
        "Execute the script, passing any required ``args``"
        args = tuple(keys) + tuple(args)
        # make sure the Redis server knows about the script
        ...
        try:
            return client.evalsha(self.sha, len(keys), *args)
        except NoScriptError:
            # Maybe the client is pointed to a differnet server than the client
            # that created this instance?
            # Overwrite the sha just in case there was a discrepancy.
            self.sha = client.script_load(self.script)
            return client.evalsha(self.sha, len(keys), *args
  • The Lua script passesscript loadLoads into the Redis service and gets a SHA value that can be reused to avoid loading the same script multiple times
  • throughevalshaExecute the script

lock

Redis-py also provides an implementation of a global lock that can be synchronized across processes:

try:
    with r.lock('my-lock-key', blocking_timeout=5) as lock:
        # code you want executed only after the lock has been acquired
except LockError:
    # the lock wasn't acquired

Here’s the actual implementation:

# lock.py
class Lock(object):

    LUA_RELEASE_SCRIPT = """
        local token = redis.call('get', KEYS[1])
        if not token or token ~= ARGV[1] then
            return 0
        end
        redis.call('del', KEYS[1])
        return 1
    ""
    
    def __init__(...):
        ...
        self.redis = redis
        self.name = name
        self.local = threading.local() if self.thread_local else dummy()
        self.local.token = None
        cls = self.__class__
        cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
  
    def __enter__(self):
        # force blocking, as otherwise the user would have to check whether
        # the lock was actually acquired or not.
        if self.acquire(blocking=True):
            return self
        raise LockError("Unable to acquire lock within the time specified")

    def __exit__(self, exc_type, exc_value, traceback):
        self.release()
        
    def acquire(self, blocking=None, blocking_timeout=None, token=None):
        ...
        token = uuid.uuid1().hex.encode()
        self.redis.set(self.name, token, nx=True, px=timeout)
        ...
        self.local.token = token
        ...
    
    def release(self):
        expected_token = self.local.token
        self.local.token = None
        self.lua_release(keys=[self.name],
                                     args=[expected_token],
                                     client=self.redis)
  • LUA\_RELEASE_SCRIPT uses the LUA script to handle the transaction of removing tokens
  • Lock uses thread variables to store token values, ensuring that multithread concurrency is normal
  • __enter__ and __exit__ are decorator syntax guarantees that they can be legally retrieved and released
  • When applying for a lock, obtain a temporary token, and then set it to the Redis service. This token has a life cycle and can be automatically released after a timeout.
  • Cleanup thread local variables and variables in the Redis service upon release

TODO

Publish/Subscibe, Monitor,Sentinel and Transaction are not part of the main task in the source code, so it will be described later.

Refer to the link

  • https://redis.io/topics/protocol
  • https://github.com/andymccurd…
  • https://pypi.org/project/hire…