- 浏览: 58759 次
- 性别:
- 来自: 广州
文章分类
最新评论
laravel/lumen的事件、任务调度等都是基于队列来实现的,但由于php是进程模式,除非部署第三方模块,否则无法像java那样通过创建线程来实现并发执行。这对于laravel来说实在非常不方便,只能一个任务执行完再执行下一个任务,效率极其低下,而且容易阻塞。但并发任务的需求又非常常见,下面就基于laravel自行实现一个并发任务模块。
1、要实现并发,就要先解决在PHP进程内创建新进程的问题。通过PHP本身是没有办法的,因此采用执行外部Shell脚本的方式启动新的PHP进程。但默认情况下,system、exec等方式执行shell时都是阻塞的,按网上文章说的用popen测试,也不成功。最后采用的是
上面的方法只能用在linux/unix环境,因为windows没有/dev/null这个空设备句柄。
2、任务模块不再依赖laravel队列来实现,触发任务时使用数据库保存任务信息,表结构如下:
CREATE TABLE `sq_task` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`creationtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`modifiedtime` int(11) NOT NULL DEFAULT '0' COMMENT '最后更新时间',
`starttime` int(11) NOT NULL DEFAULT '0' COMMENT '启动时间',
`endtime` int(11) NOT NULL DEFAULT '0' COMMENT '完成时间',
`status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '状态,0未执行,1正在执行,2已完成,3出错',
`task` varchar(50) NOT NULL DEFAULT '' COMMENT '任务名,与Commands中的$signature一致',
`parameters` varchar(1000) NOT NULL DEFAULT '' COMMENT '启动参数',
`total_num` int(11) NOT NULL DEFAULT '0' COMMENT '总共完成子任务数量',
`success` int(11) NOT NULL DEFAULT '0' COMMENT '成功数量',
`fail` int(11) NOT NULL DEFAULT '0' COMMENT '失败数量',
`memo` varchar(2000) NOT NULL DEFAULT '' COMMENT '备注',
PRIMARY KEY (`id`),
KEY `idx_status` (`status`),
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
3、还需要一个单独执行的Command来替代原有队列的功能,负责检查并执行待执行的任务
4、为了符合laravel的编程习惯,shell命令使用的是artisan命令行方式开发。使用artisan的好处是可以直接复用laravel里面的各种类,并且开发出来的命令,也可以单独执行。为了统一命令的行为,自定义了一个Command基类
5、剩下的就是开发具体的任务了,下面是一个发放向用户优惠券示例:
1、要实现并发,就要先解决在PHP进程内创建新进程的问题。通过PHP本身是没有办法的,因此采用执行外部Shell脚本的方式启动新的PHP进程。但默认情况下,system、exec等方式执行shell时都是阻塞的,按网上文章说的用popen测试,也不成功。最后采用的是
shell_exec($shell . ' > /dev/null 2>&1 &');
上面的方法只能用在linux/unix环境,因为windows没有/dev/null这个空设备句柄。
2、任务模块不再依赖laravel队列来实现,触发任务时使用数据库保存任务信息,表结构如下:
引用
CREATE TABLE `sq_task` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`creationtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`modifiedtime` int(11) NOT NULL DEFAULT '0' COMMENT '最后更新时间',
`starttime` int(11) NOT NULL DEFAULT '0' COMMENT '启动时间',
`endtime` int(11) NOT NULL DEFAULT '0' COMMENT '完成时间',
`status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '状态,0未执行,1正在执行,2已完成,3出错',
`task` varchar(50) NOT NULL DEFAULT '' COMMENT '任务名,与Commands中的$signature一致',
`parameters` varchar(1000) NOT NULL DEFAULT '' COMMENT '启动参数',
`total_num` int(11) NOT NULL DEFAULT '0' COMMENT '总共完成子任务数量',
`success` int(11) NOT NULL DEFAULT '0' COMMENT '成功数量',
`fail` int(11) NOT NULL DEFAULT '0' COMMENT '失败数量',
`memo` varchar(2000) NOT NULL DEFAULT '' COMMENT '备注',
PRIMARY KEY (`id`),
KEY `idx_status` (`status`),
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
<?php namespace App\Http\Model; use App\Http\Model\BaseModel; use Illuminate\Support\Facades\DB; class TaskModel extends BaseModel { public $table = 'sq_task'; public $primaryKey = 'id'; const CREATED_AT = 'creationtime'; const UPDATED_AT = 'modifiedtime'; public function __construct() { parent::__construct(); } public function search($shopId=false, $status=false, $skip=0, $limit=10){ $db = DB::table($this->table); if($shopId !== false){ $db->where('shopid', $shopId); } if($status !== false){ $db->where('status', $status); } return $db->skip($skip)->take($limit)->get()->toArray(); } /** * 保存商家管理员 * @param $data * @return mixed */ public function add($data){ return DB::table($this->table)->insertGetId($data); } /** * @param $id * @return mixed */ public function get($id){ return DB::table($this->table)->where('id', $id)->first(); } /** * @param $id * @param $array * @return mixed */ public function modify($id, $array){ if ($array) { $array['modifiedtime'] = time(); } return DB::table($this->table) ->where('id', $id) ->update($array); } public function incrSuccess($id){ return DB::table($this->table) ->where('id', $id) ->increment('success'); } public function incrFail($id){ return DB::table($this->table) ->where('id', $id) ->increment('fail'); } }
3、还需要一个单独执行的Command来替代原有队列的功能,负责检查并执行待执行的任务
<?php namespace App\Console\Commands; use App\Http\Model\TaskModel; class ShopCommandHandler extends Command { protected $signature = 'CommandHandler'; protected $description = '任务触发器'; public function handle() { info($this->description . "启动"); $taskModel = new TaskModel(); while(true) { $tasks = $taskModel->search(false, 0, 0, 10); foreach ($tasks as $task) { try { $shell = 'php artisan ' . $task['task'] . ' ' . $task['id']; shell_exec($shell . ' > /dev/null 2>&1 &'); info("准备执行任务:" . $task['task'] . '(' . $task['id'] . ')'); } catch(\Exception $e){ info(json_encode($task) . '===>' . $e->getMessage() ." : " . $e->getTraceAsString()); } } sleep(2); } info($this->description ."结束"); } }
4、为了符合laravel的编程习惯,shell命令使用的是artisan命令行方式开发。使用artisan的好处是可以直接复用laravel里面的各种类,并且开发出来的命令,也可以单独执行。为了统一命令的行为,自定义了一个Command基类
<?php namespace App\Console\Commands; use App\Http\Model\TaskModel; abstract class BaseCommand extends Command{ private $key = null; private $subTasks = []; private $taskModel; protected $task; protected $taskId; /** * Create a new command instance. */ public function __construct(){ parent::__construct(); $this->taskModel = new TaskModel(); } /** * 获取待处理对象 * @return array - 处理对象集合 */ abstract protected function getSubTasks(); /** * 执行单个子任务 * @param $item - 单个处理对象 * @return bool - 处理结果 */ abstract protected function do($item); public function handle(){ try{ // 获取命令行参数 $this->task = $this->getTask(); if(!$this->task){ return; } info($this->logName); info('任务开始:' . $this->signature . $this->taskId); // 设置任务名称 $name = $this->task['task']; $this->key = $this->lockPrefix . $name . '_' . $this->task['id']; // 获取任务信息 $this->subTasks = $this->getSubTasks(); // 计算任务子任务数量 $this->updateSubTaskTotal(); // 更新任务信息(子任务数量、任务状态、启动时间) $this->taskModel->modify($this->taskId, ['starttime'=>time(), 'status' => 1]); // 设置任务锁 $this->setOrCheckLocked($this->key, $ttlSecond = 60 * 60 * 24); // 执行任务 foreach ($this->subTasks as $item) { // 执行子任务 try { $r = $this->do($item); } catch(\Exception $e){ info('任务异常,' . json_encode($item) . '===>' . $e->getMessage() .': ' . $e->getTraceAsString()); $r = false; } // 更新当前进度 $this->incrProcess($r); } // 更新任务信息(任务状态、完成时间) $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 2]); info('任务结束:' . $this->signature . $this->taskId); } catch(\Exception $e){ // 更新任务信息(任务状态、完成时间) $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 3, 'memo'=>$e->getMessage()]); info('任务异常(' . $e->getMessage() .'): ' . $e->getTraceAsString()); } finally{ // 释放任务锁 if($this->key) { $this->releaseLocked($this->key); } } } /** * @throws \Exception */ private function getTaskId(){ $taskId = $this->argument('id'); if(!$taskId){ throw new \Exception('获取任务ID失败', 9901); } return $taskId; } /** * @throws \Exception */ private function getTask(){ $this->taskId = $this->getTaskId(); $task = $this->taskModel->get($this->taskId); if(!$task){ throw new \Exception('任务不存在', 9902); } if($task['status'] === 2){ $this->log("任务[$this->taskId]已完成,放弃执行。"); return false; } if($task['status'] === 1 && time()-$task['starttime']<86400*2){ info("任务[$this->taskId]正在执行,且未超过2小时,放弃执行。"); return false; } return $task; } private function updateSubTaskTotal(){ $this->taskModel->modify($this->taskId, ['total_num'=>count($this->subTasks)]); } private function incrProcess($result){ if($result) { $this->taskModel->incrSuccess($this->taskId); } else { $this->taskModel->incrFail($this->taskId); } } protected function getConf($name){ $conf = json_decode($this->task['parameters'],true); if($conf && isset($conf[$name])){ return $conf[$name]; } else { return null; } } }
5、剩下的就是开发具体的任务了,下面是一个发放向用户优惠券示例:
<?php <?php namespace App\Console\Commands; use App\Http\Service\OrderService; use App\Libs\MicroService\CouponMicroService; class SendActiveUserCoupon extends BaseCommand { protected $signature = 'SendActiveUserCoupon {id}'; protected $description = '向活跃用户发放优惠券'; /** * 获取待处理对象 * @return array - 处理对象集合 */ protected function getSubTasks(){ $result = (new OrderService())->searchUser($this->task['shopid'], $this->getConf('conditions')); return array_column($result, 'cid'); } /** * 执行单个子任务 * @param $item - 单个处理对象 * @return bool - 处理结果 * @throws */ protected function do($item){ $couponMicroService = new CouponMicroService(); $couponMicroService->entity_create($this->getConf('provide_id'), $item); return true; } }
发表评论
-
ElementUI上传组件在Lumen环境下跨域问题的解决
2020-08-25 11:10 841在后台的路由中间件中要增加跨域设置: namespace ... -
lumen集成结巴分词
2020-03-31 16:46 279常规的方法是通过compoer集成 composer re ... -
Lumen/laravel动态分库的实现
2019-07-04 10:39 758lumen默认支持多数据源,但如果系统存在多个结构相同的数据库 ... -
lumen操作mongodb
2019-01-30 14:40 801前提:使用Eloquent访问mongodb class ... -
lumen5.5使用rabbitmq
2019-01-21 10:53 1075在composer.json中的require中增加以下语句 ... -
lumen使用mongodb
2019-01-09 16:16 16821. 安装mongodb扩展 执行sudo pecl in ... -
在lumen中开发和执行artisan命令行任务
2018-12-22 17:28 1850lumen是laravel的简化版,其中artisan部分删除 ... -
在lumen安装阿里云短信服务SDK
2018-12-12 12:15 8091、下载SDK:https://help.aliyun.com ... -
lumen日志权限冲突问题
2016-11-02 11:44 1721运行lumen项目一般使用nginx作为webserver,因 ... -
lumen中使用调度任务
2016-04-22 12:21 2971需要在crontab中增加一行 * * * * * php ... -
在lumen中使用smtp方式发送txt/plain邮件
2016-04-22 11:46 12411、安装邮件组件 修改composer.json,在re ... -
lumen下操作excel
2016-04-22 11:34 19851、安装excel组件 修改composer.json, ... -
lumen中使用redis队列
2016-04-22 11:18 18191、采用redis作为队列驱动 修改.env文件 QU ... -
lumen中安装及使用redis作为cache
2016-04-22 10:54 20771、安装redis模块 在compose.json的re ...
相关推荐
laravel分布式并发锁
易语言websocket模块,多线程并发稳定模块,实测稳定,保证可用
NULL 博文链接:https://mammahao.iteye.com/blog/2226890
多任务多线程管理模块,任务有优先级,一个任务执行完毕,按照优先级高低执行另一个任务 1)如何使用: 1. 声明一个HashTaskList,或在栈上动态获取; 2. 调用InitTaskList初始化上一步的HashTaskList 3. 调用...
仅用于测试学习EAS用,不能用于商业等其他用途。到期日2014年12月31日,全模块并发20。
并发和并行的区别:并发是指同时处理多个任务的能力,而并行是指同时执行多个任务的能力。并发通常发生在单处理器系统中,通过任务切换的方式实现多个任务之间的迅速切换。而并行则需要多个处理器或多核处理器,并且...
基于libcurl实现http post支持并发,异步 方便新手学习
用线程池控制的大并发量工作任务性能优化,实现工作任务的并发控制
MySql高并发操作模块源码 效率很高修改一下马上就可以用
C#版支持高并发的HTTP服务器源码,异步处理并发调用,应用于WINFORM程序中,创建自己的HTTP SERVER的首选办法。
基于springboot+mybatis redis构建的在线抽奖系统,管理后台,采用队列处理,支持高并发 项目经过严格测试,确保可以运行! 基于springboot+mybatis redis构建的在线抽奖系统,管理后台,采用队列处理,支持高并发...
Laravel开发-lock 支持redis与memcached的并发锁。
我一直自己在用的Curl写的http访问模块,功能经过深度封装,使用并不复杂,支持同步和异步高并发访问,里面写了两个例子,一个火山论坛登陆POST例子,用的是同步访问,一个是简单的HTTP异步高并发访问网站。...
Java + Netty 实现的高并发高可用MQTT服务broker,轻松支持10万并发,已用于生产环境 技术体系:(使用 netty 实现通信及协议解析,使用 nutzboot 提供依赖注入及属性配置,使用 redis 实现消息缓存,集群,使用 ...
volatile关键字的非原子性、volatile关键字的使用、AtomicInteger原子性操作、线程安全小例子:多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步...
并发执行线程thread-1和thread-2, 结果在同一时间执行了thread-1和thread-2。python实现在import _thread时,需pip install threadpool.
在项目的性能测试过程中,怎样确定一个实际系统的并发用户数。
自从java创建以来就已经支持并发的理念,如线程和锁。这篇指南主要是为帮助java多线程开发人员理解并发的核心概念以及如何应用这些理念。本文的主题是关于具有java语言风格的Thread、synchronized、volatile,以及...
1) 支持用户并发请求(注册,登录,聊天) 2) 当用户登录时,应该提示其所有在线好友“自己已经在线” 3) 当用户退出时,应该提示其所有在线好友“自己已经下线...6) 删除好友(目前没有做,只要添加一个函数就可以了)
并发实现模块和相关包1