The business scenario

The scene in the recent project needs to get a batch of key values, because the GET operation of Redis (not only get command) is blocked. If the value is cycled, it will take a lot of time even on the Intranet. So the Redis pipeline command comes to mind.

Introduction of pipeline

Non-pipeline: a request from the client and a response from the Redis server, during which the client blocks

Pipeline: a redis Pipeline command that allows the client to send multiple requests to the server in sequence without waiting for a reply.

Stand-alone version

Stand-alone version is relatively simple, directly on the code

// Change to a real redis instance
Jedis jedis = new Jedis();
// Get the pipe
Pipeline p = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
    p.get(i + "");
}
// Get the result
List<Object> results = p.syncAndReturnAll();

Copy the code

Cluster version

Because JedisCluster itself does not support pipelines, we need to do some packaging for JedisCluster.

Again, go straight to the code


import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;

import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.BiConsumer;



@Slf4j
public class JedisClusterPipeline extends PipelineBase implements Closeable {


    /** * Get JedisClusterInfoCache */
    private JedisSlotBasedConnectionHandler connectionHandler;
    /** * get the connection */ based on the hash value
    private JedisClusterInfoCache clusterInfoCache;

    / * * * can also go to inherit JedisCluster and JedisSlotBasedConnectionHandler to provide access interface * JedisCluster inheritance in BinaryJedisCluster * In BinaryJedisCluster, connectionHandler protected, So you need to reflection and * * * JedisClusterInfoCache properties in JedisClusterConnectionHandler, but this class is an abstract class, * but it does have an implementation class JedisSlotBasedConnectionHandler * /
    private static final Field FIELD_CONNECTION_HANDLER;
    private static final Field FIELD_CACHE;
    static {
        FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
        FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
    }


    /** * Stores the Client */ corresponding to each command in order
    private Queue<Client> clients = new LinkedList<>();
    /** * Used to cache connections * jedis cache */ used during a pipeline process
    private Map<JedisPool, Jedis> jedisMap = new HashMap<>();
    /** * Whether there is data in the cache */
    private boolean hasDataInBuf = false;

    /** * Close () must be called to close the pipeline The pipelineXX method in this class does not use close(), but it is best to call close() * in finally@param
     * @return* /
    public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
        JedisClusterPipeline pipeline = new JedisClusterPipeline();
        pipeline.setJedisCluster(jedisCluster);
        return pipeline;
    }


    public JedisClusterPipeline(a) {}public void setJedisCluster(JedisCluster jedis) {
        connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
        clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
    }

    /** * Refresh the cluster information and call * when the cluster information changes@param
     * @return* /
    public void refreshCluster(a) {
        connectionHandler.renewSlotCache();
    }

    /** * Read all data synchronously. In contrast to syncAndReturnAll(), sync() just doesn't deserialize the data */
    public void sync(a) {
        innerSync(null);
    }

    /** * synchronously reads all data and returns a list ** in command order@returnReturn all data */ in command order
    public List<Object> syncAndReturnAll(a) {
        List<Object> responseList = new ArrayList<>();

        innerSync(responseList);

        return responseList;
    }


    @Override
    public void close(a) {
        clean();

        clients.clear();

        for (Jedis jedis : jedisMap.values()) {
            if (hasDataInBuf) {
                flushCachedData(jedis);
            }

            jedis.close();
        }

        jedisMap.clear();

        hasDataInBuf = false;
    }

    private void flushCachedData(Jedis jedis) {
        try {
            jedis.getClient().getAll();
        } catch (RuntimeException ex) {
        }
    }

    @Override
    protected Client getClient(String key) {
        byte[] bKey = SafeEncoder.encode(key);

        return getClient(bKey);
    }

    @Override
    protected Client getClient(byte[] key) {
        Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

        Client client = jedis.getClient();
        clients.add(client);

        return client;
    }

    private Jedis getJedis(int slot) {
        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        // Get Jedis from the cache according to the pool
        Jedis jedis = jedisMap.get(pool);
        if (null == jedis) {
            jedis = pool.getResource();
            jedisMap.put(pool, jedis);
        }

        hasDataInBuf = true;
        return jedis;
    }



    public static void pipelineSetEx(String[] keys, String[] values, int[] exps,JedisCluster jedisCluster) {
        operate(new Command() {
            @Override
            public List execute(a) {
                JedisClusterPipeline p = pipelined(jedisCluster);
                for (int i = 0, len = keys.length; i < len; i++) {
                    p.setex(keys[i], exps[i], values[i]);
                }
                returnp.syncAndReturnAll(); }}); }public static List<Map<String, String>> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) {
        return operate(new Command() {
            @Override
            public List execute(a) {
                JedisClusterPipeline p = pipelined(jedisCluster);
                for (int i = 0, len = keys.length; i < len; i++) {
                    p.hgetAll(keys[i]);
                }
                returnp.syncAndReturnAll(); }}); }public static List<Boolean> pipelineSismember(String[] keys, String members,JedisCluster jedisCluster) {
        return operate(new Command() {
            @Override
            public List execute(a) {
                JedisClusterPipeline p = pipelined(jedisCluster);
                for (int i = 0, len = keys.length; i < len; i++) {
                    p.sismember(keys[i], members);
                }
                returnp.syncAndReturnAll(); }}); }public static <O> List pipeline(BiConsumer<O, JedisClusterPipeline> function, O obj,JedisCluster jedisCluster) {
        return operate(new Command() {
            @Override
            public List execute(a) {
                JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);
                function.accept(obj, jcp);
                returnjcp.syncAndReturnAll(); }}); }private void innerSync(List<Object> formatted) {
        HashSet<Client> clientSet = new HashSet<>();

        try {
            for (Client client : clients) {
                // There is no need to parse the result data when sync() is called, but if you do not call get, the application does not know that a JedisMovedDataException has occurred, so you need to call get() to trigger the error.
                // In fact, if the data attribute of Response can be obtained directly, it can save the time of parsing the data. However, it does not provide a corresponding method, so to obtain the data attribute, you need to use reflection, do not want to reflect, so it is left
                Object data = generateResponse(client.getOne()).get();
                if (null! = formatted) { formatted.add(data); }// If the size is the same, all clients have been added, so no need to call the add method
                if(clientSet.size() ! = jedisMap.size()) { clientSet.add(client); }}}catch (JedisRedirectionException jre) {
            if (jre instanceof JedisMovedDataException) {
                // if MOVED redirection occurred, rebuilds cluster's slot cache,
                // recommended by Redis cluster specification
                refreshCluster();
            }

            throw jre;
        } finally {
            if(clientSet.size() ! = jedisMap.size()) {// All clients that have not yet executed should be flush to prevent subsequent commands from being contaminated after being put back into the pool
                for (Jedis jedis : jedisMap.values()) {
                    if (clientSet.contains(jedis.getClient())) {
                        continue;
                    }

                    flushCachedData(jedis);
                }
            }

            hasDataInBuf = false; close(); }}private static Field getField(Class
        cls, String fieldName) {
        try {
            Field field = cls.getDeclaredField(fieldName);
            field.setAccessible(true);

            return field;
        } catch (NoSuchFieldException | SecurityException e) {
            throw new RuntimeException("cannot find or access field '" + fieldName + "' from "+ cls.getName(), e); }}@SuppressWarnings({"unchecked" })
    private static <T> T getValue(Object obj, Field field) {
        try {
            return (T)field.get(obj);
        } catch (IllegalArgumentException | IllegalAccessException e) {
            log.error("get value fail", e);

            throw newRuntimeException(e); }}private static <T> T operate(Command command) {
        try  {
            return command.execute();
        } catch (Exception e) {
            log.error("redis operate error");
            throw newRuntimeException(e); }}interface Command {
        /** * Execute the ** command@param <T>
         * @return* /
        <T> T execute(a); }}Copy the code

Use the demo

    public Object testPipelineOperate(a) {
        // String[] keys = {"dylan1","dylan2"};
        // String[] values = {"dylan1-v1","dylan2-v2"};
        // int[] exps = {100,200};
        // JedisClusterPipeline.pipelineSetEx(keys, values, exps, jedisCluster);
        long start = System.currentTimeMillis();

        List<String> keyList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            keyList.add(i + "");
        }
        // List
      
        pipeline = JedisClusterPipeline.pipeline(this::getValue, keyList, jedisCluster);
      
        // List
      
        pipeline = JedisClusterPipeline.pipeline(this::getHashValue, keyList, jedisCluster);
      
        String[] keys = {"dylan-test1"."dylan-test2"};

        List<Map<String, String>> all = JedisClusterPipeline.pipelineHgetAll(keys, jedisCluster);
        long end = System.currentTimeMillis();
        System.out.println("testPipelineOperate cost:" + (end-start));

        return Response.success(all);
    }

Copy the code