<?php
|
/*
|
* @Author: your name
|
* @Date: 2021-12-24 10:26:10
|
* @LastEditTime: 2021-12-24 17:21:42
|
* @LastEditors: Please set LastEditors
|
* @Description: 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
|
* @FilePath: /hy-websocket/app/Controller/WebsocketController.php
|
*/
|
|
declare(strict_types=1);
|
|
namespace App\Controller;
|
|
use Hyperf\Contract\OnCloseInterface;
|
use Hyperf\Contract\OnMessageInterface;
|
use Hyperf\Contract\OnOpenInterface;
|
use Swoole\Http\Request;
|
use Swoole\Http\Response;
|
use Swoole\WebSocket\Frame;
|
use Swoole\WebSocket\Server;
|
use Hyperf\Di\Annotation\Inject;
|
use App\Service\TransferService;
|
use Hyperf\Logger\LoggerFactory;
|
use Hyperf\Contract\ConfigInterface;
|
|
class WebsocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
|
{
|
/**
|
* @Inject
|
* @var TransferService
|
*/
|
private $transferService;
|
/**
|
*
|
* @var \Psr\Log\LoggerInterface
|
*/
|
public $logger;
|
|
/**
|
* @Inject()
|
* @var ConfigInterface
|
*/
|
private $config;
|
|
private $server_config;
|
|
private $allow_ip;
|
|
public function __construct(LoggerFactory $loggerFactory)
|
{
|
// 第一个参数对应日志的 name, 第二个参数对应 config/autoload/logger.php 内的 key
|
$this->logger = $loggerFactory->get('log','default');
|
//获取 server.php 里的内容
|
$this->server_config = $this->config->get('server.servers','');
|
$this->allow_ip = $this->server_config[0]['allow_ip'];
|
}
|
|
public function onClose($server, int $fd, int $reactorId): void
|
{
|
//客户端异常关闭
|
//语音文件传输完毕,关闭文件。
|
$host = $this->allow_ip;
|
$group_key = $this->transferService->get($host.':'.$fd.':group');
|
$this->transferService->lrem($group_key,$fd,0);
|
$set_ket = $host.':'.$fd.':group';
|
$this->transferService->delete($set_ket);
|
}
|
|
public function onStart($server): void
|
{
|
$host = $this->allow_ip;
|
$keyList = $this->transferService->keys("*{$host}*");
|
foreach ($keyList as $key => $value) {
|
$this->transferService->delete($value);
|
}
|
}
|
/**
|
* @param Response|Server $server
|
*/
|
public function onMessage($server, Frame $frame): void
|
{
|
$ret = array('code' => 0, 'data' => null);
|
$msgData = $this->is_json($frame->data,true);
|
if($msgData){
|
$frameData = $msgData;
|
$url = $frameData['url'];
|
$data = $frameData['data'];
|
$action = substr($url, strrpos($url, "/") + 1);
|
switch ($action)
|
{
|
case "reg":
|
$this->logger->info('reg',$frameData);
|
$groupId = $data['group_id'];
|
$session = $data['session_id'];
|
$this->bind($groupId,'',$frame->fd);
|
$key = "practice_scenes_ws_cur_session_info:".$session;
|
$node_key = 'practice_scenes_ws_cur_session_node_info:'.$session;
|
$redisData = $this->transferService->get($key);
|
if($redisData){
|
$ret['data'] =json_decode($redisData,true);
|
$this->transferService->delete($key);
|
}else{
|
$redisData = $this->transferService->get($node_key);
|
if($redisData){
|
$ret['data'] = json_decode($redisData,true);
|
}else{
|
$ret['data'] = '';
|
}
|
}
|
$ret['event'] = "re_bind";
|
$server->push($frame->fd, json_encode($ret));
|
break;
|
case "data":
|
$this->logger->info('data',$frameData);
|
$groupId = $data['group_id'];
|
$ret['event'] = "data";
|
$ret['data'] = $data;
|
$connections = array();
|
if(empty($groupId)){
|
// 获取所有无分组标识在线终端
|
$onlines = count($server->connections);
|
if ($onlines > 0) {
|
$connections = $server->connections;
|
}
|
}else{
|
// 获取所有分组标识在线终端 key = groupID:host
|
$key = $groupId . ':' . $this->allow_ip;
|
$connections = $this->transferService->lrange($key, 0, -1);
|
}
|
$connections = array_unique($connections);
|
// 广播获取的在线终端
|
foreach ($connections as $fd) {
|
$ingfd = (int)$fd;
|
$server->push($ingfd,json_encode($ret));
|
}
|
break;
|
case "ping":
|
$server->push($frame->fd, $data.'pong');
|
break;
|
default:
|
$ret['code'] = 404;
|
$ret['msg'] = 'event no existent';
|
$server->push($frame->fd,json_encode($ret));
|
return;
|
}
|
}else{
|
$ret['code'] = -1;
|
$ret['msg'] = 'data is null or data no json';
|
$server->push($frame->fd, json_encode($ret));
|
return;
|
}
|
}
|
|
public function onOpen($server, Request $request): void
|
{
|
// $server->push($request->fd, 'Opened');
|
// var_dump($request);
|
}
|
|
/**
|
* 功能:1、判断是否是json
|
* @param string $data
|
* @param bool $assoc
|
* @return bool|mixed|string
|
*/
|
public function is_json($data = '', $assoc = false)
|
{
|
$data = json_decode($data, $assoc);
|
if ($data && (is_object($data)) || (is_array($data) && ! empty(current($data)))) {
|
return $data;
|
}
|
return false;
|
}
|
|
/**
|
* 当前链接的绑定事件。
|
* 绑定的几种状态:
|
* 1.无分组
|
* host -> fds (获取当前节点的在线链接列表)
|
* host:fd -> value (获取当前节点当前链接的绑定数据)
|
* host:fd:group -> host (获取当前节点当前链接的绑定分组)
|
* 2.有分组
|
* groupID:host -> fds
|
* host:fd -> value
|
* host:fd:group -> groupID:host
|
* @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组
|
* @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定
|
*/
|
public function bind($groupID = '', $value = '',$fd){
|
$host = $this->allow_ip;
|
// 绑定分组和当前链接对应的分组标识
|
if(empty($groupID)){
|
$this->transferService->rpush($host, $fd);
|
$this->transferService->set($host.':'.$fd.':group', $host);
|
}else{
|
$this->transferService->rpush($groupID.':'.$host, $fd);
|
$this->transferService->set($host.':'.$fd.':group', $groupID.':'.$host);
|
}
|
// 绑定业务数据
|
if(!empty($value)){
|
$this->transferService->set($host.':'.$fd, $value);
|
}
|
}
|
}
|