opening

What is a bounded queue?

BRPC implements a BoundedQueue class template, BoundedQueue. So let’s talk about what a bounded queue is. The bounded queue refers to a queue whose capacity is limited (fixed) and cannot be dynamically expanded. This doesn’t sound like a vector with the capacity to automatically scale up, but it’s all about performance. It is also used as producer and consumer mode. When the queue capacity is full, it usually means that it has exceeded the maximum throughput capacity of the queue, so it refuses to add new tasks.

In practice bounded queues are generally implemented based on ring buffers. Or the bounded queue is an alias for the ring buffer. A ring buffer is also called a circular buffer or circular queue. The whole idea is to simulate a circular queue with a linear continuous space of a fixed size. Detailed definitions can read wikipedia: en.wikipedia.org/wiki/Circul…

(Pictures from the Internet)

The definition of ring buffer is simple, and there are many different implementation details. Such as storing read and write Pointers and marking the location of the next pop and push. The trouble with this implementation is that it is ambiguous as to whether the queue is full or empty. Because when the queue is full and when the queue is empty, both Pointers point to the same place. Extra operations are required to distinguish between empty and full. For example, if an element in the queue is always kept unwritable, the buffer will be empty if the read/write pointer points to the same location. If the read pointer is one position after the write pointer, the buffer is full.

There are other implementations, such as not storing write Pointers, but the number of elements written to the queue. At each write time, the position of the write pointer is calculated by adding the number of elements (modulo required) to the read pointer. The BRPC implementation is such a scheme.

Bounded queues in BRPC

// A thread-unsafe bounded queue(ring buffer). It can push/pop from both
// sides and is more handy than thread-safe queues in single thread. Use
// boost::lockfree::spsc_queue or boost::lockfree::queue in multi-threaded
// scenarios.
Copy the code

BoundedQueue is a BoundedQueue that is not thread safe. So BoundedQueue doesn’t support concurrent reads and writes between multiple threads, but that’s why BoundedQueue’s code is simple enough. As simple as giving you 10 minutes to read it, it is basically a “naked” ring buffer implementation. That’s not to say BoundedQueue can’t be used in multi-threaded read and write scenarios, but you need to add additional thread synchronization control loops around it, such as RemoteTaskQueue, which we’ll cover later.

Take a look at the source code: butil\containers\bounded_queue.h

Template declarations and data members

Let’s take a look at the template declaration and its data members:

template <typename T>
class BoundedQueue {
public:...private:
    // Since the space is possibly not owned, we disable copying.
    DISALLOW_COPY_AND_ASSIGN(BoundedQueue); .uint32_t _count;  // The current number of elements
    uint32_t _cap;    / / capacity
    uint32_t _start;  // Start position
    StorageOwnership _ownership; // Enumeration type, ownership. Indicates whether the bounded queue holds the data
    void* _items;     // Data pointer
};
Copy the code

Definition of StorageOwnership:

enum StorageOwnership { OWNS_STORAGE, NOT_OWN_STORAGE };
Copy the code

As the name implies, the enumeration value OWNS_STORAGE indicates that bounded queues hold ownership of the data store, NOT_OWN_STORAGE does not.

The constructor

If you look at the constructor, it has three overloads. The first:

    // You have to pass the memory for storing items at creation.
    // The queue contains at most memsize/sizeof(T) items.
    BoundedQueue(void* mem, size_t memsize, StorageOwnership ownership)
        : _count(0)
        , _cap(memsize / sizeof(T))
        , _start(0)
        , _ownership(ownership)
        , _items(mem) {
        DCHECK(_items);
    };
    
Copy the code

Support for incoming data ownership, data address MEM and size MEMsize.

    // Construct a queue with the given capacity.
    // The malloc() may fail silently, call initialized() to test validity
    // of the queue.
    explicit BoundedQueue(size_t capacity)
        : _count(0)
        , _cap(capacity)
        , _start(0)
        , _ownership(OWNS_STORAGE)
        , _items(malloc(capacity * sizeof(T))) {
        DCHECK(_items);
    };
Copy the code

Only capacity is passed, which indicates that the queue holds ownership of the data store, and malloc is used to allocate memory at construction time. Of course, since malloc may not always succeed, DCHECK is used to do a bit of validation. TODO, how about failing?

Push correlation function

push()

    // Push |item| into bottom side of this queue.
    // Returns true on success, false if queue is full.
    bool push(const T& item) {
        if (_count < _cap) {
            new ((T*)_items + _mod(_start + _count, _cap)) T(item);
            ++_count;
            return true;
        }
        return false;
    }
Copy the code
    // Push a default-constructed item into bottom side of this queue
    // Returns address of the item inside this queue
    T* push(a) {
        if (_count < _cap) {
            return new ((T*)_items + _mod(_start + _count++, _cap)) T(a); }return NULL;
    }
Copy the code

If the queue is not full, it is constructed using placement New. While regular new does not need to specify the heap address allocated by the object itself, placement New constructs the object at the specified memory location. Here the memory location is located by starting position + number of elements.

    // This is faster than % in this queue because most |off| are smaller
    // than |cap|. This is probably not true in other place, be careful
    // before you use this trick.
    static uint32_t _mod(uint32_t off, uint32_t cap) {
        while (off >= cap) {
            off -= cap;
        }
        return off;
    }
Copy the code

This modular operation uses a Trick where the limit of off is always between [0, 2*cap].

push_top()

   // Push |item| into top side of this queue
   // Returns true on success, false if queue is full.
   bool push_top(const T& item) {
       if (_count < _cap) {
           _start = _start ? (_start - 1) : (_cap - 1);
           ++_count;
           new ((T*)_items + _start) T(item);
           return true;
       }
       return false;
   }    
   
   // Push a default-constructed item into top side of this queue
   // Returns address of the item inside this queue
   T* push_top(a) {
       if (_count < _cap) {
           _start = _start ? (_start - 1) : (_cap - 1);
           ++_count;
           return new ((T*)_items + _start) T(a); }return NULL;
   }
Copy the code

elim_push()

    // Push |item| into bottom side of this queue. If the queue is full,
   // pop topmost item first.
   void elim_push(const T& item) {
       if (_count < _cap) {
           new ((T*)_items + _mod(_start + _count, _cap)) T(item);
           ++_count;
       } else {
           ((T*)_items)[_start] = item;
           _start = _mod(_start + 1, _cap); }}Copy the code

Pop correlation function

Pop is the inverse of push, which you can read optionally here.

pop()

  // Pop top-most item from this queue
  // Returns true on success, false if queue is empty
  bool pop(a) {
      if (_count) {
          --_count;
          ((T*)_items + _start)->~T(a); _start = _mod(_start +1, _cap);
          return true;
      }
      return false;
  }

  // Pop top-most item from this queue and copy into |item|.
  // Returns true on success, false if queue is empty
  bool pop(T* item) {
      if (_count) {
          --_count;
          T* const p = (T*)_items + _start;
          *item = *p;
          p->~T(a); _start = _mod(_start +1, _cap);
          return true;
      }
      return false;
  }
Copy the code

It is worth noting that pop() and push() are symmetric operations, but their function parameters are not the same. In the parametric version of push(), the argument is const T&, while pop() takes T*. The difference is that we need to get the element by pop_bottom(). Good C++ coding specifications, such as Google’s, specify Pointers for function arguments and const & for input arguments.

pop_bottom()

    // Pop bottom-most item from this queue
    // Returns true on success, false if queue is empty
    bool pop_bottom(a) {
        if (_count) {
            --_count;
            ((T*)_items + _mod(_start + _count, _cap))->~T(a);return true;
        }
        return false;
    }

    // Pop bottom-most item from this queue and copy into |item|.
    // Returns true on success, false if queue is empty
    bool pop_bottom(T* item) {
        if (_count) {
            --_count;
            T* const p = (T*)_items + _mod(_start + _count, _cap);
            *item = *p;
            p->~T(a);return true;
        }
        return false;
    }
Copy the code

Elim_push () in the push series has no inverse operation in POP, which is understandable. But when the queue is full, I really want to insert data to remove the head element, but if the queue is empty and we really want to remove an element from the queue, then there is no way.

Empty the queue

    void clear(a) {
        for (uint32_t i = 0; i < _count; ++i) {
            ((T*)_items + _mod(_start + i, _cap))->~T(a); } _count =0;
        _start = 0;
    }
Copy the code

The main thing to do here is to iterate over all the elements and call their destructor explicitly: ->~T(). This operation is due to the fact that our object is constructed through placement new, and ->~T() is the destructor that must be paired. Just like new must have delete. Unlike new and delete, neither placement new nor ->~T() allocates or frees memory in the heap for this object! (Of course, if there are members in T, there will still be heap allocation and free, but that is for the members, not T itself)

With that said, let’s look at the destructor.

The destructor

    ~BoundedQueue() {
        clear(a);if (_ownership == OWNS_STORAGE) {
            free(_items);
            _items = NULL; }}Copy the code

First call the previous clear() function, then need to determine the ownership of the data store, only if it is clear that you own the memory, take free(), and malloc() pair. And for those that are not their own, not necessarily.

Some view operations

This section mainly introduces functions that observe some data and meta information of bounded queues.

top()

    // Get address of top-most item, NULL if queue is empty
    T* top(a) { 
        return _count ? ((T*)_items + _start) : NULL; 
    }
    const T* top(a) const { 
        return _count ? ((const T*)_items + _start) : NULL; 
    }

    // Randomly access item from top side.
    // top(0) == top(), top(size()-1) == bottom()
    // Returns NULL if |index| is out of range.
    T* top(size_t index) {
        if (index < _count) {
            return (T*)_items + _mod(_start + index, _cap);
        }
        return NULL;   // including _count == 0
    }
    const T* top(size_t index) const {
        if (index < _count) {
            return (const T*)_items + _mod(_start + index, _cap);
        }
        return NULL;   // including _count == 0
    }
Copy the code

bottom()

    // Get address of bottom-most item, NULL if queue is empty
    T* bottom(a) { 
        return _count ? ((T*)_items + _mod(_start + _count - 1, _cap)) : NULL; 
    }
    const T* bottom(a) const {
        return _count ? ((const T*)_items + _mod(_start + _count - 1, _cap)) : NULL; 
    }
    
    // Randomly access item from bottom side.
    // bottom(0) == bottom(), bottom(size()-1) == top()
    // Returns NULL if |index| is out of range.
    T* bottom(size_t index) {
        if (index < _count) {
            return (T*)_items + _mod(_start + _count - index - 1, _cap);
        }
        return NULL;  // including _count == 0
    }
    const T* bottom(size_t index) const {
        if (index < _count) {
            return (const T*)_items + _mod(_start + _count - index - 1, _cap);
        }
        return NULL;  // including _count == 0
    }
Copy the code

initialized()

Check whether the queue has been initialized

    // True if the queue was constructed successfully.
    bool initialized(a) const { return_items ! =NULL; }
Copy the code

The empty () and full ()

    bool empty(a) const { return! _count; }bool full(a) const { return _cap == _count; }
Copy the code

The size () and capacity ()

    // Number of items
    size_t size(a) const { return _count; }

    // Maximum number of items that can be in this queue
    size_t capacity(a) const { return _cap; }
Copy the code

max_capacity()

    // Maximum value of capacity()
    size_t max_capacity(a) const { return (1UL< < (sizeof(_cap) * 8)) - 1; }
Copy the code

The contents of this function are determined at compile time. That is, a fixed value is returned. Sizeof (_cap) returns the number of bytes of _cap, multiplied by 8 because one byte equals eight bits. So sizeof(_cap) * 8) represents the number of bits of the uint32_t type, and 1UL << (sizeof(_cap) * 8)) -1 results in each of the uint32_t bits being 1, which is the maximum uint32_t can represent.

This function should be implemented like the max_size() interface of a vector. It’s just called max_capacity(). Max_size () in a vector is the maximum number of elements that a vector can hold. We use this function because vectors can be expanded dynamically. Bounded queues, in fact, do not have dynamic expansion. So max_capacity() isn’t really that useful.

Application in BRPC

BoundedQueue is used heavily in BRPC. For example, when implementing bthread, each pthread used as worker has a TaskGroup, and this TaskGroup has two queues:

class TaskGroup {. WorkStealingQueue<bthread_t> _rq; RemoteTaskQueue _remote_rq; . };Copy the code

The _remote_rq class RemoteTaskQueue is stored via BoundedQueue.

class RemoteTaskQueue {
public:
    RemoteTaskQueue() {}

    int init(size_t cap) {
        const size_t memsize = sizeof(bthread_t) * cap;
        void* q_mem = malloc(memsize);
        if (q_mem == NULL) {
            return - 1;
        }
        butil::BoundedQueue<bthread_t> q(q_mem, memsize, butil::OWNS_STORAGE);
        _tasks.swap(q);
        return 0;
    }
Copy the code

Because the BoundedQueue constructor doesn’t handle malloc failure very well. So we malloc a chunk of memory externally and pass it to BoundedQueue to manage (OWNS_STORAGE).

class RemoteTaskQueue {
// ...
    bool pop(bthread_t* task) {
        if (_tasks.empty()) {
            return false;
        }
        _mutex.lock(a);const bool result = _tasks.pop(task);
        _mutex.unlock(a);return result;
    }

    bool push(bthread_t task) {
        _mutex.lock(a);const bool res = push_locked(task);
        _mutex.unlock(a);return res;
    }

    bool push_locked(bthread_t task) {
        return _tasks.push(task);
    }
Copy the code

You can see that when pushing and popping data into RemoteTaskQueue, you need to add your own locking and unlocking code, as we’ve already covered at the beginning:

BoundedQueue is not thread-safe!

Push_locked (), which is not locked internally, calls the user of RemoteTaskQueue to lock the outer part of the queue. Look at the only place this function is called:

void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
    _remote_rq._mutex.lock(a);while(! _remote_rq.push_locked(tid)) {
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
        LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                                << _remote_rq.capacity(a); : :usleep(1000);
        _remote_rq._mutex.lock(a); }... . }Copy the code