<?php
|
|
/*
|
* @Author: your name
|
* @Date: 2021-12-24 10:26:10
|
* @LastEditTime: 2021-12-24 17:21:42
|
* @LastEditors: Please set LastEditors
|
* @Description: 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
|
* @FilePath: /hy-websocket/app/Controller/WebsocketController.php
|
*/
|
declare (strict_types=1);
|
namespace App\Controller;
|
|
use Hyperf\Contract\OnCloseInterface;
|
use Hyperf\Contract\OnMessageInterface;
|
use Hyperf\Contract\OnOpenInterface;
|
use Swoole\Http\Request;
|
use Swoole\Http\Response;
|
use Swoole\WebSocket\Frame;
|
use Swoole\WebSocket\Server;
|
use Hyperf\Di\Annotation\Inject;
|
use App\Service\TransferService;
|
use Hyperf\Logger\LoggerFactory;
|
use Hyperf\Contract\ConfigInterface;
|
class WebsocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
|
{
|
use \Hyperf\Di\Aop\ProxyTrait;
|
use \Hyperf\Di\Aop\PropertyHandlerTrait;
|
/**
|
* @Inject
|
* @var TransferService
|
*/
|
private $transferService;
|
/**
|
*
|
* @var \Psr\Log\LoggerInterface
|
*/
|
public $logger;
|
/**
|
* @Inject()
|
* @var ConfigInterface
|
*/
|
private $config;
|
private $server_config;
|
private $allow_ip;
|
public function __construct(LoggerFactory $loggerFactory)
|
{
|
$this->__handlePropertyHandler(__CLASS__);
|
// 第一个参数对应日志的 name, 第二个参数对应 config/autoload/logger.php 内的 key
|
$this->logger = $loggerFactory->get('log', 'default');
|
//获取 server.php 里的内容
|
$this->server_config = $this->config->get('server.servers', '');
|
$this->allow_ip = $this->server_config[0]['allow_ip'];
|
}
|
public function onClose($server, int $fd, int $reactorId) : void
|
{
|
//客户端异常关闭
|
//语音文件传输完毕,关闭文件。
|
$host = $this->allow_ip;
|
$group_key = $this->transferService->get($host . ':' . $fd . ':group');
|
$this->transferService->lrem($group_key, $fd, 0);
|
$set_ket = $host . ':' . $fd . ':group';
|
$this->transferService->delete($set_ket);
|
}
|
public function onStart($server) : void
|
{
|
$host = $this->allow_ip;
|
$keyList = $this->transferService->keys("*{$host}*");
|
foreach ($keyList as $key => $value) {
|
$this->transferService->delete($value);
|
}
|
}
|
/**
|
* @param Response|Server $server
|
*/
|
public function onMessage($server, Frame $frame) : void
|
{
|
$ret = array('code' => 0, 'data' => null);
|
$msgData = $this->is_json($frame->data, true);
|
if ($msgData) {
|
$frameData = $msgData;
|
$url = $frameData['url'];
|
$data = $frameData['data'];
|
$action = substr($url, strrpos($url, "/") + 1);
|
switch ($action) {
|
case "reg":
|
$groupId = $data['group_id'];
|
$session = $data['session_id'];
|
$this->bind($groupId, '', $frame->fd);
|
$key = "practice_scenes_ws_cur_session_info:" . $session;
|
$node_key = 'practice_scenes_ws_cur_session_node_info:' . $session;
|
$redisData = $this->transferService->get($key);
|
if ($redisData) {
|
$ret['data'] = json_decode($redisData, true);
|
$this->transferService->delete($key);
|
} else {
|
$redisData = $this->transferService->get($node_key);
|
if ($redisData) {
|
$ret['data'] = json_decode($redisData, true);
|
} else {
|
$ret['data'] = '';
|
}
|
}
|
$ret['event'] = "re_bind";
|
$server->push($frame->fd, json_encode($ret));
|
break;
|
case "data":
|
$groupId = $data['group_id'];
|
$ret['event'] = "data";
|
$ret['data'] = $data;
|
$connections = array();
|
if (empty($groupId)) {
|
// 获取所有无分组标识在线终端
|
$onlines = count($server->connections);
|
if ($onlines > 0) {
|
$connections = $server->connections;
|
}
|
} else {
|
// 获取所有分组标识在线终端 key = groupID:host
|
$key = $groupId . ':' . $this->allow_ip;
|
$connections = $this->transferService->lrange($key, 0, -1);
|
}
|
$connections = array_unique($connections);
|
// 广播获取的在线终端
|
foreach ($connections as $fd) {
|
$ingfd = (int) $fd;
|
$server->push($ingfd, json_encode($ret));
|
}
|
break;
|
case "ping":
|
$server->push($frame->fd, $data . 'pong');
|
break;
|
default:
|
$ret['code'] = 404;
|
$ret['msg'] = 'event no existent';
|
$server->push($frame->fd, json_encode($ret));
|
return;
|
}
|
} else {
|
$ret['code'] = -1;
|
$ret['msg'] = 'data is null or data no json';
|
$server->push($frame->fd, json_encode($ret));
|
return;
|
}
|
}
|
public function onOpen($server, Request $request) : void
|
{
|
// $server->push($request->fd, 'Opened');
|
// var_dump($request);
|
}
|
/**
|
* 功能:1、判断是否是json
|
* @param string $data
|
* @param bool $assoc
|
* @return bool|mixed|string
|
*/
|
public function is_json($data = '', $assoc = false)
|
{
|
$data = json_decode($data, $assoc);
|
if ($data && is_object($data) || is_array($data) && !empty(current($data))) {
|
return $data;
|
}
|
return false;
|
}
|
/**
|
* 当前链接的绑定事件。
|
* 绑定的几种状态:
|
* 1.无分组
|
* host -> fds (获取当前节点的在线链接列表)
|
* host:fd -> value (获取当前节点当前链接的绑定数据)
|
* host:fd:group -> host (获取当前节点当前链接的绑定分组)
|
* 2.有分组
|
* groupID:host -> fds
|
* host:fd -> value
|
* host:fd:group -> groupID:host
|
* @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组
|
* @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定
|
*/
|
public function bind($groupID = '', $value = '', $fd)
|
{
|
$host = $this->allow_ip;
|
// 绑定分组和当前链接对应的分组标识
|
if (empty($groupID)) {
|
$this->transferService->rpush($host, $fd);
|
$this->transferService->set($host . ':' . $fd . ':group', $host);
|
} else {
|
$this->transferService->rpush($groupID . ':' . $host, $fd);
|
$this->transferService->set($host . ':' . $fd . ':group', $groupID . ':' . $host);
|
}
|
// 绑定业务数据
|
if (!empty($value)) {
|
$this->transferService->set($host . ':' . $fd, $value);
|
}
|
}
|
}
|