queue type Think Queue消息队列介绍:操作、安装使用流程及消息处理方式

网安智编 厦门萤点网络科技 2025-08-12 00:07 90 0
Think Queue 是 官方提供的一个消息队列服务,是专门支持队列服务的扩展包。Think Queue 消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。Think Queue...

Think Queue 是 官方提供的一个消息队列服务,是专门支持队列服务的扩展包。Think Queue 消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。Think Queue 消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

支持消息队列的基本特性

安装

composer require topthink/think-queue

使用流程

消息的消费与删除

消息的创建与推送

任务处理

任务发布

消息的消费与删除

创建消费者类


/**
 * @desc TinywanJob
 */

declare(strict_types=1);

namespaceapp\job;

usethink\facade\Log;
usethink\queue\Job;

class TinywanJob
{
    /**
     * @desc fire 是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param $data array|mixed $data 发布任务时自定义的数据
     * @author Tinywan(ShaoBo Wan)
     */

    publicfunction fire(Job $job, $data)
    
{
        // 有效消息到达消费者时可能已经不再需要执行了
        if (!$this->checkJob($data)) {
            $job->delete();
            return;
        }
        //执行业务处理
        if ($this->doJob($data)) {
            $job->delete();//任务执行成功后删除
            Log::info("dismiss job has been down and deleted");
        } else {
            //检查任务重试次数
            if ($job->attempts() > 3) {
                Log::info('dismiss job has been retried more that 3 times');
                $job->delete();
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean 任务执行的结果
     */

    privatefunction checkJob($data)
    
{
        $ts = $data["ts"];
        $bizid = $data["bizid"];
        $params = $data["params"];
        returntrue;
    }

    /**
     * 根据消息中的数据进行实际的业务处理
     */

    privatefunction doJob($data)bool
    
{
        // 实际业务流程处理
        returntrue;
    }
}

消息创建与推送

在业务控制器中创建一个新消息并推送到指定的队列中


/**
 * @desc TinywanJob
 */

declare(strict_types=1);

namespaceapp\controller;

useapp\BaseController;

class Queue extends BaseController
{
    /**
     * @desc 立即执行
     * @author Tinywan(ShaoBo Wan)
     */

    publicfunction push()
    
{
        // 1. 当前任务将由哪个类来负责处理。当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        $jobHandlerClassName = \app\job\TinywanJob::class;

        // 2. 当前任务归属的队列名称,如果为新队列,会自动创建
        $jobQueueName = 'tinywanJobQueue';

        // 3. 当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串。(jobData 为对象时,存储其public属性的键值对 )
        $jobData = [
            'type' => 'test',
            'record_id' => 123,
        ];

        // 4. 将该任务推送到消息队列,等待对应的消费者去执行
        $isPushed = \think\facade\Queue::push($jobHandlerClassName, json_encode($jobData, JSON_UNESCAPED_UNICODE), $jobQueueName);

        // database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|false
        if ($isPushed !== false) {
            echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "
"
;
        } else {
            echo'Oops, something went wrong.';
        }
        return'push';
    }
}

Think Queue消息队列安装教程_ThinkPHP消息队列使用_queue type

这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。

处理任务

切换到当前终端到项目根目录

php think queue:work --queue tinywanJobQueue

实际使用过程中应安装这样的通用进程管理工具,它会监控php think queue:work的进程,一旦失败会帮助重启

简单来总结下使用流程

任务发布

访问接口 :8786/queue/push 查看推送是否成功

命令 Work模式 queue:work

用于启动一个工作进程来处理消息队列

php think queue:work --queue tinywanJobQueue

参数说明

模式 queue:

用于启动一个进程,然后由 进程通过 ('php think queue:work --queue="%s" --delay=%s --=%s --sleep=%s --tries=%s') 来周期性地创建一次性的work进程来消费消息队列,并且限制该work进程的执行事件,同时通过管道来监听work进程的输出。

php think queue:listen --queue tinywanJobQueue