Tools
首页
画图
音乐
采集
记事
博客
实验室
登录
lypeng
146
文章
11
分类
46
记事
分类
生活-[23]
Linux-[24]
前端-[9]
数据库-[16]
PHP-[31]
git-[7]
其他-[6]
python-[20]
算法-[4]
React-Native-[4]
中草药-[2]
广告位1
广告位2
首页
/ PHP
返回列表
Beanstalk队列学习
阅读:428
发布:2019-08-20
作者:lypeng
## 先看作者给的两张流程图: 来自:https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt?_blank ``` Here is a picture of the typical job lifecycle: put reserve delete -----> [READY] ---------> [RESERVED] --------> *poof* Here is a picture with more possibilities: put with delay release with delay ----------------> [DELAYED] <------------. | | | (time passes) | | | put v reserve | delete -----------------> [READY] ---------> [RESERVED] --------> *poof* ^ ^ | | | \ release | | | `-------------' | | | | kick | | | | bury | [BURIED] <---------------' | | delete `--------> *poof* ``` 再放一张来自CSDN的图:  ## 简要描述 一个beanstalk服务,可以开启很多个tube(即,发邮件的,发短信的,执行抓取文章的,好多任务),每一个tube有很多个job组成(给用户一发AAA,给用户二发BBB),job有ready(准备好了)、dalayed(延迟稍候执行)、 reserved(正在执行)、bury(掩埋起来,以后再执行)几个状态。 示例: 我们有一个发送邮件的tube,send_mail ``` { job1 : {username:123@qq.com,content:'con1'} status:ready job2 : {username:456@qq.com,content:'con2'} status:bury } ``` 如果安装了beanstalkd console可以看到每个job的更多字段信息,如下图:  我们每次从状态值为ready里面读取job,然后执行!执行成功可以删除,执行失败,可以放入延迟里面稍候执行,或者掩埋起来先不处理 ## 上代码: 安装略过,php使用的Pheanstalk\Pheanstalk类库 ```php namespace app\controller; use Pheanstalk\Pheanstalk; class BeanstalkController { public function __construct(){ $this->p = Pheanstalk::create('192.168.1.190',11300); } // 这个是交给定时任务crontab执行的 // */1 * * * * php index.php /beanstalk/index public function index(){ if($this->check_tube('send_mail')){ $this->consumer(); } } // 生成任务 public function productor(){ // put命令的四个参数(string $data, int $priority, int $delay, int $ttr) // priority 优先级,数字小的优先执行,范围0-2^32-1 // delay 延迟秒数,范围0-2^32-1 // ttr 运行时间,(没有理解透),范围1-2^32-1 // max-job-size (default: 2**16) $mail_json = json_encode(['name'=>'ddd@qq.com','content'=>'hello world!','try_time'=>0]); $this->p->useTube('send_mail')->put($mail_json); //正常放入reday队列 // $this->p->useTube('send_mail')->put($mail_json,10,3600); //延迟1个小时执行,优先级10,(必须传递个优先级,否则报错) } // 消费任务 public function consumer(){ $job = $this->p->watch('send_mail')->reserve(); $data = json_decode($job->getData(),true); // 发送邮件 sleep(3); $success = true; if($success){ // 发送成功处理 // 记录日志 $log_str = date('Y-m-d H:i:s',time()).":给".$data['name']."发送邮件成功!".PHP_EOL; file_put_contents('beanstalk.log', $log_str, FILE_APPEND); // 删除任务 $this->p->delete($job); }else{ // 发送失败处理 $data['try_time'] = empty($data['try_time']) ? 1 : $data['try_time']+1; $log_str = date('Y-m-d H:i:s',time()).":给".$data['name']."发送邮件失败!第".$data['try_time']."次.".PHP_EOL; file_put_contents('beanstalk.log', $log_str, FILE_APPEND); if($data['try_time'] == 3){ $this->p->bury($job); //掩埋 }else{ // 放入新任务(try_time+1),延迟60秒执行 $this->p->useTube('send_mail')->put(json_encode($data),10,60); //删除原任务 $this->p->delete($job); } // 或者线下记录try_time,$job->getId()可以获取到任务id,然后用release // $this->p->release($job,10,60); //延迟一分钟后放入ready执行 // $this->p->watch('send_mail')->peekBuried(); //取出bury状态的任务 } } // 检测队列是否需要执行 public function check_tube($tube_name=''){ if($tube_name == ''){ return false; } $tubes = $this->p->listTubes(); if(!in_array($tube_name, $tubes)){ // 该tube不存在 return false; }else{ $r = $this->p->StatsTube($tube_name); if($r->current_jobs_ready == 0){ // ready状态没有任务需要执行 return false; } } return true; } } ``` 参见: https://github.com/beanstalkd/beanstalkd?_blank https://blog.csdn.net/m_nanle_xiaobudiu/article/details/80466702?_blank
------本文结束
感谢阅读------
上一篇:
redis cluster构建
下一篇:
PHP图像识别测试