编程技术记录 Good Luck To You!

thinkphp队列+supervisor实践 非原创 转


原创idkuangxiao 最后发布于2018-09-21 20:59:55 阅读数 2924  收藏

分类专栏: php

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/idkuangxiao/article/details/82765107

收起

thinkphp5之后,官方自带了一个队列扩展,thinkphp-queue:https://github.com/top-think/think-queue,以下为thinkphp-queue结合supervisor结合的一次任务队列实践,使用的队列驱动为redis。


环境准备

thinkphp主框架

thinkphp-queue扩展

supervisor

redis

以上环境为在linux下安装,网上已经有很多相关教程,这里不再说明。


发送任务以及任务处理

thinkphp-queue的详细教程可参考 https://www.kancloud.cn/gyh9711/thinkphp5_1_1-tpsnoopy/506339


假设现在的任务是往队列里面发送一些数据,然后队列从队列中取出数据,插入到数据库里面


准备数据库表

CREATE TABLE `task` (

  `id` int(11) NOT NULL AUTO_INCREMENT,

  `task_data` varchar(255) DEFAULT NULL,

  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,

  PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

发送任务到队列

class Task

{

    public function addTask()

    {

        $jobClass = 'app\\index\\job\\AddData';

        $data = ['name' => 'myname'];

        $queueName = 'addData';

        Queue::push($jobClass, $data, $queueName);

        echo 'add task';

    }

}

        $jobClass 为任务处理类的类名,$data为发送到队列中的数据,$queueName为队列名。调用该方法,则redis中对应的队列会有对应的数据



      可以看到redis中的队列queues:addData中已经有一条任务数据


消费任务

namespace app\index\job;

 

use think\Db;

use think\queue\Job;

 

class AddData

{

    public function fire(Job $job, $data)

    {

        Db::table('task')->insert(['task_data' => json_encode($data)]);

        $job->delete();

    }

}

     验证队列消费是否成功,命令行执行单次任务


sudo -u www php think queue:work --queue addData

结果可以看到数据库中增加了相应的数据,



 


使用supervisor管理消费进程

      上面的方式一次只能处理一个任务,配合supervisor可以以守护进程的模式不断的处理任务


supervisor配置

[supervisord]

logfile=/www/supervisor/log/supervisord.log ; 日志文件路径,可根据需要修改

logfile_maxbytes=50MB        ; 单个日志文件的最大大小

logfile_backups=10           ; 保留的日志文件数量

loglevel=info                ; 日志等级

pidfile=/www/supervisor/var/supervisord.pid ; supervisord pid 文件,可根据需要更改

nodaemon=false               ; start in foreground if true; default false

minfds=1024                  ; min. avail startup file descriptors; default 1024

minprocs=200                 ; min. avail process descriptors;default 200

 

[unix_http_server]

file=/www/supervisor/var/supervisor.sock   ; the path to the socket file

 

[rpcinterface:supervisor]

supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

 

[supervisorctl]

serverurl=unix:///www/supervisor/var/supervisor.sock ; 用于supervisorctl,路径需要与unix_http_server中的sock文件相同

 

[include]

files = /www/supervisor/etc/*.conf ; 包含的其他配置文件

        配置文件这里我保存在/www/supervisor/supervisord.conf,基本使用默认配置,改动的地方主要为log,pid,sock文件的路径,以及增加其他配置文件的路径,用于加载不同进程的配置文件。默认的配置文件,可以使用 echo_supervisord_conf 命令查看,以及 echo_supervisord_conf > file_to_save 保存默认配置文件到指定文件。


队列消费进程配置

以下配置文件保存在 /www/supervisor/etc/queue_worker.conf


[program:queue_worker] ;项目名称

directory = /www/tp5 ; 程序的启动目录

command = php think queue:work --queue addData --daemon  ; 启动命令

process_name=%(program_name)s_%(process_num)02d

numprocs = 3           ; 开启的进程数量

autostart = true     ; 在 supervisord 启动的时候也自动启动

startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了

autorestart = true   ; 程序异常退出后自动重启

startretries = 3     ; 启动失败自动重试次数,默认是 3

user = www          ; 用哪个用户启动

redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false

stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB

stdout_logfile_backups = 20     ; stdout 日志文件备份数

; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)

stdout_logfile = /www/supervisor/log/queue_worker.log

loglevel=info

启动supervisor

    命令行执行以下命令,即可以启动supervisor,命令中指定了supervisor配置文件的路径


supervisord -c /www/supervisor/supervisord.conf 

    使用supervisorctl 查看运行状态


supervisorctl -c /www/supervisor/supervisord.conf

queue_worker:queue_worker_00     RUNNING   pid 12270, uptime 0:00:08

queue_worker:queue_worker_01     RUNNING   pid 12271, uptime 0:00:08

queue_worker:queue_worker_02     RUNNING   pid 12269, uptime 0:00:08

    可以看到配置的3个进程已经正常启动。


    此时,如果执行 kill 12270,关闭queue_worker:queue_worker_00,会看到supervisor会自动重启对应进程。也可以配和官方的命令:


php think queue:restart

    达到重启队列的效果。


 


处理supervisor重启

  当修改了supervisor配置后,更新配置时会重启相关的进程。这就可能导致在任务处理进行到一半的时候,被supervisor杀掉然后重启。当然,可以选择在没有任务的时候进行重启来避免这种情况。下面给出一种通过信号量的方式处理这中情况的方法。


  在supervisor需要重启管理的进程时,默认会向进程发送TERM信号,如果无法关闭进程,则超过配置的时间(默认是10秒),会发送KILL信号强制关闭进程。我们可以通过修改supervisor的默认超时时间,并且处理TERM信号的方式,使进程安全的退出。


  首先,我们需要修改supervisor中的配置,修改处理TERM信号的超时时间,修改为我们预计的单个任务最大执行时间,这里假设任务最大执行时间为1分钟


stopwaitsecs=60      ; max num secs to wait b4 SIGKILL (default 10)

   然后,修改我们的php代码,在任务处理结束后,增加调用信号量分发方法


public function fire(Job $job, $data)

{

    Db::table('task')->insert(['task_data' => json_encode($data)]);

    $job->delete();

    // 增加信号量分发方法

    pcntl_signal_dispatch();

}

  然后,当没有任务时,会sleep一段时间后再获取是否有任务需要处理,这时也需要进行信号量分发。thinkphp-queue有对应的hook可以处理这种情况,在tags.php配置文件下增加以下配置


'worker_before_sleep' => ['app\\common\\behavior\\DispatchSignalBehavior']

  对应的处理方法


class DispatchSignalBehavior

{

    public function run()

    {

        pcntl_signal_dispatch();

    }

}

最后,我们增加对TERM信号的处理方法,在tags.php中添加以下配置


'worker_daemon_start' => ['app\\common\\behavior\\SignalBehavior']

对应的处理方法


class SignalBehavior

{

    public function run(&$params)

    {

        pcntl_signal(SIGTERM, function ($signo) {

            exit;

        });

    }

}

处理方法很简单,直接退出脚本就行。


这样,当supervisor关闭进程时,就不会中断当前所执行的任务了。

————————————————

版权声明:本文为CSDN博主「idkuangxiao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/idkuangxiao/article/details/82765107


作者:admin 分类:未分类 浏览:491 评论:0