1. What is message queue

A Message queue is a means of communication between processes or between different threads of the same process

2. Why message queues

Message queue technology is a technology to exchange information between distributed applications. Message queues can reside in memory or on disk, and queues store messages until they are read out by applications. With message queues, applications can execute independently without knowing each other’s location or waiting for the receiver to receive the message before continuing.

3. When are message queues used

The first thing you need to understand is the difference between a message queue and a remote procedure call. When many readers ask me, I find that they need AN RPC(remote procedure call), not a message queue.

Message queue can be implemented synchronously or asynchronously. Usually, we use message queue asynchronously and remote procedure call synchronously.

What is the difference between MQ and RPC? MQ typically delivers an unruly protocol, which is user-defined and store-and-forward; RPC, on the other hand, is usually a proprietary protocol, and the call procedure returns the result.

4. When to use message queues

Synchronous requirements, remote procedure calls (PRC) are more suitable for you.

Asynchronous requirements, message queues are more suitable for you.

At present, many message queue software also support RPC function, and many RPC systems can also call asynchronously.

Message queues are used to fulfill the following requirements

Store-and-forward distributed transaction publish-subscribe content-based routing point-to-point connections

Here is an example of a message queue in use


      

/** * Created by PhpStorm. * User: Lin * Date: 2017/6/9 * Time: 11:19 * Implement PHP shared memory message queue */
class ShmQueue
{
    private $maxQSize = 0;// Maximum queue length
    private $front = 0;// queue head pointer
    private $rear = 0; // end of queue pointer
    private $blockSize = 256;  // Block size (byte)
    private $memSize = 1280;  // Maximum shared memory (byte)
    private $shmId = 0;// The shared memory segment can be manipulated according to this ID
    private $filePtr = APP_PATH.'public/shmq.ptr';
    private $semId = 0;
    public function __construct()
    {
        $shmkey = ftok(__FILE__.'t');// Generate the system ID
        $this->shmId = shmop_open($shmkey."c".0644.$this->memSize );// Create a memory segment
        $this->maxQSize = $this->memSize / $this->blockSize;
        // Request a semaphore
        $this->semId = sem_get($shmkey.1);
        sem_acquire($this->semId); // The application goes critical
        $this->init();
    }
    private function init()
    {
        if ( file_exists($this->filePtr) ){
            $contents = file_get_contents($this->filePtr);
            $data = explode( '|'.$contents );
            if ( isset($data[0&&])isset($data[1])){
                $this->front = (int)$data[0];
                $this->rear  = (int)$data[1]; }}}public function getLength()
    {
        return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
    }
    public function enQueue( $value )
    {
        if ( $this->ptrInc($this->rear) == $this->front ){ / / team
            return false;
        }
        //echo $this->front;
        $data = $this->encode($value);
        shmop_write($this->shmId, $data.$this->rear );
        $this->rear = $this->ptrInc($this->rear);
        return  $this->decode($data);
    }
    public function deQueue()
    {
        if ( $this->front == $this->rear ){ / / is empty
            throw new Exception(" block size is null!");
        }
        $value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
        $this->front = $this->ptrInc($this->front);
        return $this->decode($value);
    }
    private function ptrInc( $ptr )
    {
        return ($ptr + $this->blockSize) % ($this->memSize);
    }
    private function encode( $value )
    {
        $data = serialize($value)."__eof";
        //echo '';
        //echo strlen($data);
        //echo '';
       // echo $this->blockSize -1;
       // echo '';

        if ( strlen($data) > $this->blockSize -1) {throw new Exception(strlen($data)." is overload block size!");
        }
        return $data;
    }
    public function exist($value){// Determine the queue header data

        $data = shmop_read($this->shmId, $this->front, $this->blockSize-1);
        if($value= =$this->decode($data)) {return 1;
        }
        return 0;
    }
    private function decode( $value )
    {
        //return $value;
        $data = explode("__eof".$value);
        return unserialize($data[0]);
    }
    public function __destruct()
    {
        // Save the first and last team Pointers
        $data = $this->front . '|' . $this->rear;
        file_put_contents($this->filePtr, $data);
        sem_release($this->semId); // Release the semaphore}}Copy the code

How to call

$shmq = newShmQueue(); Team:$data = 125;
$shmq->enQueue($data); The team:$shmq->deQueue();
Copy the code

The above content hopes to help you, more free PHP factory PDF, PHP advanced architecture video materials, PHP wonderful good article can be wechat search concerns: PHP open source community

2021 Jinsanyin four big factory interview real questions collection, must see!

Four years of PHP technical articles collation collection – PHP framework

A collection of four years’ worth of PHP technical articles – Microservices Architecture

Distributed Architecture is a four-year collection of PHP technical articles

Four years of PHP technical essays – High Concurrency scenarios

Four years of elite PHP technical article collation collection – database