instructions

Think-queue is an extension to TP5.1. x that requires installation to use

Following is the basic usage, and this article follows a database-driven model

Install the think-queue extension

Note: The latest version of Think-Queue requires tp6.x support, so the installation version of this article is 2.0.4

Composer require topthink/think - queue 2.0.4

Create a table

CREATE TABLE `prefix_jobs` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) NOT NULL,
  `payload` longtext NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

3. Modify the configuration file and add the corresponding driver configuration

File location: root directory /config/queue.php

Return ['connector' => 'Database', // Database driver 'expire' => 60, // Task expire time, default is 60 seconds; To disable, set to NULL 'default' => 'default', // Default queue name 'table' => 'jobs', // table name for storing messages without prefix 'DSN' => [],]Copy the code

Expire parameter description in the configuration file The EXPIRE parameter refers to the expire time of the task, in seconds. + expire > When expire is not null, thinkphp-queue checks and reissues expired tasks before fetching the next task. When expire is null, thinkphp-queue does not check expired tasks, which is relatively high performance. Note, however, that these timed out tasks remain in the message queue and need to be handled by the developer (either deleted or resent)!

4. Message creation and push

1. Create a new controller: \application\ Index \controller\ jobtest.php 2. Add a new method to the controller: test() 3

<? php namespace app\admin\controller; use think\Queue; use think\Controller; class JobTest extends Controller { public function test() { // 1. Which class will handle the current task? // When it is the task's turn, the system will generate an instance of the class and call its fire method $jobHandlerClassName = 'app\ Jobs \JobTest'; $jobQueueName = "helloJobQueue"; $jobQueueName = "helloJobQueue"; // 3. The service data required by the current task cannot be of the resource type. Other types are eventually converted to json strings. Store the key values of the public property to) $jobData = [' ts' = > time (), 'bizId' = > uniqid (), 'a' = > 1); $isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName); / / database driver, the return value is 1 | false; Redis drive, the return value for the random string | false if ($isPushed! == false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>"; }else{ echo 'Oops, something went wrong.'; }}}Copy the code

V. Consumption and deletion of messages

1. Add a new consumer class: \ Application \ Jobs \ jobtest.php

<? php namespace app\jobs; use think\queue\Job; use app\common\model\JobsTest as JobsTestModel; Class JobTest {/ * * * fire method is the default message queue the method called * @ param Job $Job current task object * @ param array | mixed $data release task * / public custom data Function fire(Job $Job, $data) { Determine in advance whether to need to perform $isJobStillNeedToBeDone = $this - > checkDatabaseToSeeIfJobNeedToBeDone ($data); if(! $isJobStillNeedToBeDone){ $job->delete(); return; $isJobDone = $this->doHelloJob($data); $job->delete(); if ($isJobDone) {$job->delete(); } else {if ($job->attempts() > 3) {$job->delete(); $job->release(2); $delay = $delay; }}} /** * Some messages may not need to be executed by the time they reach consumers * @param $data User-defined data when publishing tasks * @return bool Result of task execution */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * Actual business processing based on the data in the message... * @param $data * @return bool */ private function doHelloJob($data) { $model = new JobsTestModel(); $inData = [' uniqId '= > $data [' uniqId],' time '= > $data [' ts'],' content '= >' queue insert data of success]. $res = $model->save($inData); if (! $res) { return false; } return true; }}Copy the code

At this point, our code is complete

Debug/test

1. Create table jobs_test

CREATE TABLE `st_jobs_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `uniqId` varchar(255) DEFAULT NULL,
  `time` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
Copy the code

2. Publish tasks: Perform the message creation method http://xxx/admin/job_test/test

If the operation is successful, you can see the page return display: “2020-08-07 10:53:00 A new Hello Job is Pushed to the MQ”

id queue payload attempts reserved reserved_at available_at created_at
6 helloJobQueue {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769329,”uniqId”:”5f2cc4317c6a3″}} 0 0 1596769329 1596769329
7 helloJobQueue {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769332,”uniqId”:”5f2cc4349c42d”}} 0 0 1596769332 1596769332
8 helloJobQueue {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769334,”uniqId”:”5f2cc4367f8f1″}} 0 0 1596769334 1596769334
9 helloJobQueue {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769335,”uniqId”:”5f2cc437cfcaf”}} 0 0 1596769335 1596769335
10 helloJobQueue {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769337,”uniqId”:”5f2cc439086dd”}} 0 0 1596769337 1596769337

PHP think queue:work –queue helloJobQueue PHP think queue:work –queue helloJobQueue App \jobs\JobTest” At this point, we check that a message in the database table JOBS has been consumed, and a new consumed data is added to jobs_test

id uniqId time content
6 5f2cc4317c6a3 1596769329 The queue successfully inserted data. Procedure

At this point, we have successfully gone through the basic process of creating -> pushing -> consuming -> deleting a message

“Appendix”

Use of tp5.1.x queue documentation github.com/coolseven/n…