This is the sixth day of my participation in the August More text Challenge. For details, see: August More Text Challenge

Delay queue premise

  1. Close idle connections regularly: The server has many client connections that need to be closed after a period of idle time.
  2. Periodically clear extra cache: Objects in the cache that have exceeded idle time need to be removed from the cache.
  3. Implementation of task timeout processing: in the network protocol sliding window request response-based interaction, the processing of time-out unresponded requests.
  4. Application to Session timeout management: Processing the timeout of a network reply protocol request.

Pain point scheme mechanism

  • A more violent approach is to use a background thread to traverse all the objects, examining them one by one. This method is simple and easy to use, but there may be performance problems when the number of objects is too large. It is not easy to set the check interval. If the interval is too large, the accuracy will be affected.

  • And it’s not possible to process the timeouts in chronological order. For this scenario, DelayQueue is perfect.

DelayQueue is an interesting class provided in java.util.concurrent. Very clever, very good! However, neither the Java Doc nor the Java SE 5.0 source has a Sample. When I first read ScheduledThreadPoolExecutor source code, find DelayQueue use.

This article will give an introduction to DelayQueue and then enumerate application scenarios. It also provides an implementation of the Delayed interface and Sample code.

  • DelayQueue is a BlockingQueue specialized to Delayed.

  • Delayed extends the Comparable interface by comparing the time value of the delay, and the return value of the Delayed interface’s implementation class getDelay should be a fixed (final) value.

  • DelayQueue is implemented internally using PriorityQueue.

  • DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue key elements BlockingQueue, PriorityQueue, Delayed. In other words, DelayQueue is a BlockingQueue implemented using a PriorityQueue, whose benchmark is time.

The basic definition is as follows

public interface Comparable<T> {
    public int compareTo(T o);
}

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { 
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}
Copy the code

The internal implementation of DelayQueue uses a priority queue. When DelayQueue’s offer method is called, the Delayed object is added to the priority queue Q. As follows:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        q.offer(e);
        if (first == null || e.compareTo(first) < 0)
            available.signalAll();
        return true;
    } finally{ lock.unlock(); }}Copy the code

The take method of DelayQueue takes the first of the priority queue Q (peek) and performs await processing if the delay threshold is not reached. As follows:

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                available.await();
            } else {
                long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                if (delay > 0) {
                    long tl = available.awaitNanos(delay);
                } else {
                    E x = q.poll();
                    assertx ! =null;
                    if(q.size() ! =0)
                        available.signalAll(); // wake up other takers
                    returnx; }}}}finally{ lock.unlock(); }}Copy the code

Here is Sample, a simple implementation of caching. There are three classes: Pair, DelayItem, and Cache. As follows:

public class Pair<K.V> {
    public K first;
    public V second;
    public Pair(a) {}
    public Pair(K first, V second) {
        this.first = first;
        this.second = second; }}Copy the code

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class DelayItem<T> implements Delayed {

    /** Base of nanosecond timings, to avoid wrapping */
    private static final long NANO_ORIGIN = System.nanoTime();

	/** * Returns nanosecond time offset by origin */
    final static long now(a) {
        return System.nanoTime() - NANO_ORIGIN;
    }
    /** * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied * entries. */
    private static final AtomicLong sequencer = new AtomicLong(0);

    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;

    /** The time the task is enabled to execute in nanoTime units */
    private final long time;

    private final T item;

    public DelayItem(T submit, long timeout) {
        this.time = now() + timeout;
        this.item = submit;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    public T getItem(a) {
        return this.item;
    }

    public long getDelay(TimeUnit unit) {
        long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
        return d;
    }

    public int compareTo(Delayed other) {
        if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof DelayItem) {
            DelayItem x = (DelayItem) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0)?0 : ((d < 0)? -1 : 1); }}Copy the code

The following is the Cache implementation, including the PUT and get methods, and the executable main function.

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Cache<K.V> {private static final Logger LOG = Logger.getLogger(Cache.class.getName());
    private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>();
    private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>();
    private Thread daemonThread;
    public Cache(a) {
        Runnable daemonTask = new Runnable() {
            public void run(a) { daemonCheck(); }}; daemonThread =new Thread(daemonTask);
        daemonThread.setDaemon(true);
        daemonThread.setName("Cache Daemon");
        daemonThread.start();
    }

    private void daemonCheck(a) {
        if (LOG.isLoggable(Level.INFO))
            LOG.info("cache service started.");
        for (;;) {
            try {
                DelayItem<Pair<K, V>> delayItem = q.take();
                if(delayItem ! =null) {
                    // Timeout object handling
                    Pair<K, V> pair = delayItem.getItem();
                    cacheObjMap.remove(pair.first, pair.second); // compare and remove}}catch (InterruptedException e) {
                if (LOG.isLoggable(Level.SEVERE))
                    LOG.log(Level.SEVERE, e.getMessage(), e);
                break; }}if (LOG.isLoggable(Level.INFO))
            LOG.info("cache service stopped.");
    }

    // Add the cache object
    public void put(K key, V value, long time, TimeUnit unit) {
        V oldValue = cacheObjMap.put(key, value);
        if(oldValue ! =null)
            q.remove(key);
        long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
        q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime));
    }

    public V get(K key) {
        return cacheObjMap.get(key);
    }

    // Test the entry function
    public static void main(String[] args) throws Exception {
        Cache<Integer, String> cache = new Cache<Integer, String>();
        cache.put(1."aaaa".3, TimeUnit.SECONDS);
        Thread.sleep(1000 * 2);
        {
            String str = cache.get(1);
            System.out.println(str);
        }
        Thread.sleep(1000 * 2);
        {
            String str = cache.get(1); System.out.println(str); }}}Copy the code

When we run Sample, the result of main is two lines, the first line is aaa, and the second line is NULL.

Delay queue parameter configuration hot refresh

Do not spray the configuration center. The scenarios are different

  • The cache delay queue information is stored in the configuration file, such as the number of caches, latency timeout, event timeout, and so on. The process needs to be restarted for these configuration changes to take effect, and sometimes online applications cannot tolerate such outages.

  • Apache Common Configuration provides us with the ability to detect that the Configuration can take effect in a short time after the file is modified. The specific usage is as follows:

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.log4j.Logger;

public class SystemConfig {
    private static Logger logger = Logger.getLogger(SystemConfig.class);
    private static  PropertiesConfiguration config;
    static {
        try {
            // Instantiate a PropertiesConfiguration
            config = new PropertiesConfiguration("/Users/hzwangxx/ IdeaProjects/app-test/src/main/resources/conf.properties");
            // Set the reload policy to reload when the file is modified (5s by default)
            config.setReloadingStrategy(new FileChangedReloadingStrategy());
        } catch (ConfigurationException e) {
            logger.error("init static block error. ", e); }}public static synchronized String getProperty(String key) {
        return (String) config.getProperty(key);
    }

    public static void main(String[] args) throws InterruptedException {
        for (;;) {
            System.out.println(SystemConfig.getProperty("key"));
            Thread.sleep(2000); }}}Copy the code