Hyperf的redis异步队列实战
突发奇想想写一个RTS战略类游戏。
RTS战略类游戏核心玩法肯定是有养成啊,比如建筑升级啊,征兵队列啊,之类的,总之就是用户现在点了操作但是拿不到结果,可能要等几十分钟或者更久拿到结果。
现在我们来实现以下建筑升级,主要逻辑是
用户升级城堡>后台为用户添加一个建造队列>建造时间达到>为城堡升级>删除建造队列>返回用户升级成功
这个建造时间不一定,随着城堡升级越高,肯定时间越久,那么用客服端轮询的方式去请求服务器问:我的城堡升级好没有啊?然后服务器再返回。。
这种方式既落后,又傻。。。
所以今天研究一下hyperf的redis异步队列,通过队列的方式,到时间了,自动把城堡升级好,然后通过websocket告诉用户城堡升级好了。
不过用redis有个不好的地方,就是持久化的问题,后面再研究一下RabbitMQ的实现方法吧,先上代码:
先创建一个QueueService方法,主要用来投递消息,其实有无这个也无所谓,主要是为了封装一下push方法。。
<?php
/**
* Author:陈杰
* Blog:http://blog.95shouyou.com
* Email:823380606@qq.com
* Git:https://gitee.com/chen95
* Date:2020/12/11 0011
* Time:15:57
*/
declare(strict_types=1);
namespace App\Service;
//
//use App\Job\ExampleJob;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Driver\DriverInterface;
use Hyperf\AsyncQueue\Job;
class QueueService
{
/**
* @var DriverInterface
*/
protected $driver;
public function __construct(DriverFactory $driverFactory)
{
$this->driver = $driverFactory->get('default');
}
public function push(Job $job, int $delay = 0): bool
{
// 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
// 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
// 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
return $this->driver->push($job, $delay);
}
}
然后我们要去创建我们的生产者
<?php
declare(strict_types=1);
namespace App\Controller;
use App\Job\UserBuildJob;
use App\Model\Pub\ConfigCityModel;
use App\Model\User\UserBuildModel;
use App\Model\User\UserCityModel;
use App\Service\QueueService;
class CityController extends AbstractController
{
public function get_city()
{
$rules = [
'id' => 'required|integer|min:1',
];
$params = $this->checkValidate($this->request->all(), $rules, ['id' => '城市编号']);
$data = UserCityModel::find($params['id']);
return $this->success($data);
}
public function build_city(QueueService $queueService)
{
$rules = [
'id' => 'required|integer|min:1',
'type' => 'required|min:1|max:20|alpha_num',
];
$user = $this->request->getAttribute('user');
$params = $this->checkValidate($this->request->all(), $rules, ['id' => '城市编号', 'type' => '建筑类型']);
$user = $this->request->getAttribute('user');
$city = UserCityModel::find($params['id']);
if (!isset($city) || $city['uid'] != $user['id'])
return $this->error('没有找到这座城市');
if (!isset($city[$params['type']]))
return $this->error('没有找到这种建筑');
$level = intval($city[$params['type']]) + 1;
$config_city = ConfigCityModel::where('type', $params['type'])->where('level', $level)->first();
if (!isset($config_city))
return $this->error('该建筑不能再升级了');
UserBuildModel::create([
'uid' => $user['id'],
'city_id' => $params['id'],
'create_time' => time(),
'need_time' => $config_city['time'],
'end_time' => time() + $config_city['time'],
'type' => $params['type'],
'build' => 0
]);
$queueService->push(new UserBuildJob($city->toArray()), $config_city['time']);
return $this->success($city);
}
}
生产者把逻辑业务判断写一下,一旦验证成功,就可以写入数据库一个建造信息了,然后就把消息投递出去
看看消费者
<?php
/**
* Author:陈杰
* Blog:http://blog.95shouyou.com
* Email:823380606@qq.com
* Git:https://gitee.com/chen95
* Date:2020/12/11 0011
* Time:15:58
*/
declare(strict_types=1);
namespace App\Job;
use App\Model\User\UserBuildModel;
use App\Model\User\UserCityModel;
use Hyperf\AsyncQueue\Job;
use Hyperf\DbConnection\Db;
class UserBuildJob extends Job
{
public $params;
/**
* 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
*
* @var int
*/
protected $maxAttempts = 2;
public function __construct($params)
{
$this->params = $params;
}
public function handle()
{
// 根据参数处理具体逻辑
// 通过具体参数获取模型等
// 这里的逻辑会在 ConsumerProcess 进程中执行
Db::beginTransaction();
try {
$userBuildModel = UserBuildModel::find($this->params['id']);
if (!isset($userBuildModel))
return false;//建造数据找不到了或者已经被处理了
$userCityModel = UserCityModel::find($this->params['city_id']);
if (!isset($userCityModel))
return false;//城市信息找不到了
$userCityModel[$this->params['type']] += 1;
$userCityModel->save();
$userBuildModel->delete();
/**
* 发送websocket消息,还没实现
*/
//Websocket::sendToUid($params['uid'],$msg);
return true;
} catch (\Exception $exception) {
Db::rollBack();
return false;
}
}
}
至此我们的队列就可以使用了。
后面再研究一下RabbitMQ吧,因为如果服务器宕机或者redis宕机,那么我们的消息队列就没了,用户就收不到建造好了的消息
解决办法有两个:
第一个是用RabbitMQ来持久化我们的消息队列,即时服务宕机,当我下次重启服务的时候一样可以把之前的消息处理掉。
第二个是写一个定时脚本定时扫描数据库中明明已经到了建造时间的建造队列却没被处理的数据(这种数据要么是处理失败,要么就是消息丢失,没有走队列)
从长远来说肯定第一个方法更好,但是带来了更多的维护成本,第二个方法简单有效,但不是长久之计,所以还是学习一下RabbitMQ吧。。
