`

建立一个支持并发的Laravel任务模块

 
阅读更多
laravel/lumen的事件、任务调度等都是基于队列来实现的,但由于php是进程模式,除非部署第三方模块,否则无法像java那样通过创建线程来实现并发执行。这对于laravel来说实在非常不方便,只能一个任务执行完再执行下一个任务,效率极其低下,而且容易阻塞。但并发任务的需求又非常常见,下面就基于laravel自行实现一个并发任务模块。

1、要实现并发,就要先解决在PHP进程内创建新进程的问题。通过PHP本身是没有办法的,因此采用执行外部Shell脚本的方式启动新的PHP进程。但默认情况下,system、exec等方式执行shell时都是阻塞的,按网上文章说的用popen测试,也不成功。最后采用的是
shell_exec($shell . ' > /dev/null 2>&1 &');

上面的方法只能用在linux/unix环境,因为windows没有/dev/null这个空设备句柄。

2、任务模块不再依赖laravel队列来实现,触发任务时使用数据库保存任务信息,表结构如下:
引用

CREATE TABLE `sq_task` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `creationtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
  `modifiedtime` int(11) NOT NULL DEFAULT '0' COMMENT '最后更新时间',
  `starttime` int(11) NOT NULL DEFAULT '0' COMMENT '启动时间',
  `endtime` int(11) NOT NULL DEFAULT '0' COMMENT '完成时间',
  `status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '状态,0未执行,1正在执行,2已完成,3出错',
  `task` varchar(50) NOT NULL DEFAULT '' COMMENT '任务名,与Commands中的$signature一致',
  `parameters` varchar(1000) NOT NULL DEFAULT '' COMMENT '启动参数',
  `total_num` int(11) NOT NULL DEFAULT '0' COMMENT '总共完成子任务数量',
  `success` int(11) NOT NULL DEFAULT '0' COMMENT '成功数量',
  `fail` int(11) NOT NULL DEFAULT '0' COMMENT '失败数量',
  `memo` varchar(2000) NOT NULL DEFAULT '' COMMENT '备注',
  PRIMARY KEY (`id`),
  KEY `idx_status` (`status`),
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;


<?php

namespace App\Http\Model;

use App\Http\Model\BaseModel;
use Illuminate\Support\Facades\DB;

class TaskModel extends BaseModel
{
    public $table = 'sq_task';
    public $primaryKey = 'id';
    const CREATED_AT = 'creationtime';
    const UPDATED_AT = 'modifiedtime';


    public function __construct()
    {
        parent::__construct();
    }

    public function search($shopId=false, $status=false, $skip=0, $limit=10){
        $db = DB::table($this->table);
        if($shopId !== false){
            $db->where('shopid', $shopId);
        }
        if($status !== false){
            $db->where('status', $status);
        }
        return $db->skip($skip)->take($limit)->get()->toArray();
    }

    /**
     * 保存商家管理员
     * @param $data
     * @return mixed
     */
    public function add($data){
        return DB::table($this->table)->insertGetId($data);
    }

    /**
     * @param $id
     * @return mixed
     */
    public function get($id){
        return DB::table($this->table)->where('id', $id)->first();
    }

    /**
     * @param $id
     * @param $array
     * @return mixed
     */
    public function modify($id, $array){
        if ($array) {
            $array['modifiedtime'] = time();
        }
        return DB::table($this->table)
            ->where('id', $id)
            ->update($array);
    }

    public function incrSuccess($id){
        return DB::table($this->table)
            ->where('id', $id)
            ->increment('success');
    }

    public function incrFail($id){
        return DB::table($this->table)
            ->where('id', $id)
            ->increment('fail');
    }
}


3、还需要一个单独执行的Command来替代原有队列的功能,负责检查并执行待执行的任务
<?php
namespace App\Console\Commands;


use App\Http\Model\TaskModel;

class ShopCommandHandler extends Command
{
    protected $signature = 'CommandHandler';

    protected $description = '任务触发器';

    public function handle()
    {
        info($this->description . "启动");

        $taskModel = new TaskModel();
        while(true) {
            $tasks = $taskModel->search(false, 0, 0, 10);
            foreach ($tasks as $task) {
                try {
                    $shell = 'php artisan ' . $task['task'] . ' ' . $task['id'];
                    shell_exec($shell . ' > /dev/null 2>&1 &');
                    info("准备执行任务:" . $task['task'] . '(' . $task['id'] . ')');
                } catch(\Exception $e){
                    info(json_encode($task) . '===>' . $e->getMessage() ."  :  " . $e->getTraceAsString());
                }
            }
            sleep(2);
        }
        info($this->description  ."结束");
    }
}


4、为了符合laravel的编程习惯,shell命令使用的是artisan命令行方式开发。使用artisan的好处是可以直接复用laravel里面的各种类,并且开发出来的命令,也可以单独执行。为了统一命令的行为,自定义了一个Command基类
<?php

namespace App\Console\Commands;

use App\Http\Model\TaskModel;

abstract class BaseCommand extends Command{
    private $key = null;
    private $subTasks = [];
    private $taskModel;
    protected $task;
    protected $taskId;
	/**
	 * Create a new command instance.
	 */
	public function __construct(){
		parent::__construct();

                $this->taskModel = new TaskModel();
	}

    /**
     * 获取待处理对象
     * @return array - 处理对象集合
     */
    abstract protected function getSubTasks();

    /**
     * 执行单个子任务
     * @param $item - 单个处理对象
     * @return bool - 处理结果
     */
    abstract protected function do($item);

    public function handle(){
        try{
            // 获取命令行参数
            $this->task = $this->getTask();
            if(!$this->task){
                return;
            }
            info($this->logName);
            info('任务开始:' . $this->signature . $this->taskId);

            // 设置任务名称
            $name = $this->task['task'];
            $this->key = $this->lockPrefix . $name . '_' . $this->task['id'];

            // 获取任务信息
            $this->subTasks = $this->getSubTasks();

            // 计算任务子任务数量
            $this->updateSubTaskTotal();

            // 更新任务信息(子任务数量、任务状态、启动时间)
            $this->taskModel->modify($this->taskId, ['starttime'=>time(), 'status' => 1]);

            // 设置任务锁
            $this->setOrCheckLocked($this->key, $ttlSecond = 60 * 60 * 24);

            // 执行任务
            foreach ($this->subTasks as $item) {
                // 执行子任务
                try {
                    $r = $this->do($item);
                } catch(\Exception $e){
                    info('任务异常,' . json_encode($item) . '===>' . $e->getMessage() .': ' . $e->getTraceAsString());
                    $r = false;
                }
                // 更新当前进度
                $this->incrProcess($r);
            }
            // 更新任务信息(任务状态、完成时间)
            $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 2]);

            info('任务结束:' . $this->signature . $this->taskId);
        } catch(\Exception $e){
            // 更新任务信息(任务状态、完成时间)
            $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 3, 'memo'=>$e->getMessage()]);

            info('任务异常(' . $e->getMessage() .'): ' . $e->getTraceAsString());
        } finally{
            // 释放任务锁
            if($this->key) {
                $this->releaseLocked($this->key);
            }
        }
    }

    /**
     * @throws \Exception
     */
    private function getTaskId(){
        $taskId = $this->argument('id');
        if(!$taskId){
            throw new \Exception('获取任务ID失败', 9901);
        }
        return $taskId;
    }

    /**
     * @throws \Exception
     */
    private function getTask(){
        $this->taskId = $this->getTaskId();
        $task = $this->taskModel->get($this->taskId);
        if(!$task){
            throw new \Exception('任务不存在', 9902);
        }
        if($task['status'] === 2){
            $this->log("任务[$this->taskId]已完成,放弃执行。");
            return false;
        }
        if($task['status'] === 1 && time()-$task['starttime']<86400*2){
            info("任务[$this->taskId]正在执行,且未超过2小时,放弃执行。");
            return false;
        }
        return $task;
    }

    private function updateSubTaskTotal(){
        $this->taskModel->modify($this->taskId, ['total_num'=>count($this->subTasks)]);
    }

    private function incrProcess($result){
        if($result) {
            $this->taskModel->incrSuccess($this->taskId);
        } else {
            $this->taskModel->incrFail($this->taskId);
        }
    }

    protected function getConf($name){
        $conf = json_decode($this->task['parameters'],true);
        if($conf && isset($conf[$name])){
            return $conf[$name];
        } else {
            return null;
        }
    }
}


5、剩下的就是开发具体的任务了,下面是一个发放向用户优惠券示例:
<?php
<?php
namespace App\Console\Commands;

use App\Http\Service\OrderService;
use App\Libs\MicroService\CouponMicroService;

class SendActiveUserCoupon extends BaseCommand
{
    protected $signature = 'SendActiveUserCoupon {id}';

    protected $description = '向活跃用户发放优惠券';


    /**
     * 获取待处理对象
     * @return array - 处理对象集合
     */
    protected function getSubTasks(){

        $result = (new OrderService())->searchUser($this->task['shopid'], $this->getConf('conditions'));
        return array_column($result, 'cid');

    }

    /**
     * 执行单个子任务
     * @param $item - 单个处理对象
     * @return bool - 处理结果
     * @throws
     */
    protected function do($item){
        $couponMicroService = new CouponMicroService();
        $couponMicroService->entity_create($this->getConf('provide_id'), $item);
        return true;
    }

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics