<?php
|
/*
|
* @Author: zhoudw
|
* @Date: 2021-12-13 09:57:20
|
* @Last Modified by: zhoudw
|
* @Last Modified time: 2021-12-13 09:58:34
|
*/
|
class ws_server
|
{
|
|
protected $server;
|
protected $request;
|
protected $frame;
|
protected $server_config;
|
protected $redis = null;
|
protected $redis_config;
|
public function __construct()
|
{
|
$config = require './config/config.php';
|
$this->server_config = $config['server_config'];
|
$this->redis_config = $config['redis'];
|
|
$this->server = new Swoole\WebSocket\Server($this->server_config['host'], $this->server_config['port']); // swoole连接
|
$this->server->set([
|
'task_worker_num' => 8,
|
'enable_coroutine' => true,
|
'task_enable_coroutine' => true
|
]);
|
$this->server->on('WorkerStart', function (Swoole\WebSocket\Server $server) {
|
if (!$this->redis) {
|
$this->redis = new Redis();
|
$this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
|
$host = $this->server_config['allow_ip'];
|
$keyList = $this->redis->keys("*{$host}*");
|
foreach ($keyList as $key => $value) {
|
$this->redis->del($value);
|
}
|
$this->redis->del("{$host}*");
|
$this->redis->close();
|
}
|
});
|
$this->server->on('start', function (Swoole\WebSocket\Server $server) {
|
echo "Websocket Server is started at ws://".$this->server_config['host'].":".$this->server_config['port']."\n";
|
});
|
|
$this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
|
$server->task($frame);
|
// $ret = array('code' => 0, 'data' => null);
|
// $msgData = $this->is_json($frame->data,true);
|
// if($msgData){
|
// $frameData = $msgData;
|
// $this->dealMsg($frameData,$frame->fd);
|
// }else{
|
// $ret['code'] = -1;
|
// $ret['msg'] = 'data is null or data no json';
|
// $server->push($frame->fd, json_encode($ret));
|
// return;
|
// }
|
});
|
|
$this->server->on('task', function ($server, $task) {
|
$ret = array('code' => 0, 'data' => null);
|
$frame = $task->data;
|
$msgData = $this->is_json($frame->data,true);
|
if($msgData){
|
$frameData = $msgData;
|
$this->dealMsg($frameData,$frame->fd);
|
}else{
|
$ret['code'] = -1;
|
$ret['msg'] = 'data is null or data no json';
|
$server->push($frame->fd, json_encode($ret));
|
return;
|
}
|
});
|
$this->server->on('close', function ($ser, $fd) {
|
$host = $this->server_config['allow_ip'];
|
$this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
|
$group_key = $this->redis->get($host.':'.$fd.':group');
|
$this->redis->lrem($group_key,$fd,0);
|
});
|
|
$this->server->start();
|
|
}
|
|
/**
|
* 功能:1、判断是否是json
|
* @param string $data
|
* @param bool $assoc
|
* @return bool|mixed|string
|
*/
|
function is_json($data = '', $assoc = false)
|
{
|
$data = json_decode($data, $assoc);
|
if ($data && (is_object($data)) || (is_array($data) && ! empty(current($data)))) {
|
return $data;
|
}
|
return false;
|
}
|
|
/**
|
* 功能:1、消息处理
|
*/
|
public function dealMsg($msgContent,$fd){
|
$ret = array('code' => 0, 'data' => null);
|
// $this->server->push($fd,json_encode($ret));
|
$url = $msgContent['url'];
|
$data = $msgContent['data'];
|
$action = substr($url, strrpos($url, "/") + 1);
|
$this->log('action','ws','ws',$action);
|
switch ($action)
|
{
|
case "reg":
|
$this->log('reg','reg','ws',['content'=>$msgContent,'fd'=>$fd]);
|
$groupId = $data['group_id'];
|
$session = $data['session_id'];
|
$this->bind($groupId,'',$fd);
|
$key = "practice_scenes_ws_cur_session_info:".$session;
|
$node_key = 'practice_scenes_ws_cur_session_node_info:'.$session;
|
$redisData = $this->redis->get($key);
|
if($redisData){
|
$ret['data'] =json_decode($redisData,true);
|
$this->redis->del($key);
|
}else{
|
$redisData = $this->redis->get($node_key);
|
$ret['data'] =json_decode($redisData,true);
|
}
|
$ret['event'] = "re_bind";
|
$this->replay($fd,$ret);
|
break;
|
case "data":
|
$this->log('data','data','ws',['content'=>$msgContent,'fd'=>$fd]);
|
$groupId = $data['group_id'];
|
$ret['event'] = "data";
|
$ret['data'] = $data;
|
$this->broadcastself($ret , $groupId);
|
break;
|
case "ping":
|
$this->replay($fd,$data.'pong', false);
|
break;
|
default:
|
$ret['code'] = 404;
|
$ret['msg'] = 'event no existent';
|
$this->server->push($fd,json_encode($ret));
|
return;
|
}
|
}
|
|
|
/**
|
* 当前链接的绑定事件。
|
* 绑定的几种状态:
|
* 1.无分组
|
* host -> fds (获取当前节点的在线链接列表)
|
* host:fd -> value (获取当前节点当前链接的绑定数据)
|
* host:fd:group -> host (获取当前节点当前链接的绑定分组)
|
* 2.有分组
|
* groupID:host -> fds
|
* host:fd -> value
|
* host:fd:group -> groupID:host
|
* @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组
|
* @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定
|
*/
|
public function bind($groupID = '', $value = '',$fd){
|
$this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
|
$host = $this->server_config['allow_ip'];
|
// 绑定分组和当前链接对应的分组标识
|
if(empty($groupID)){
|
$this->redis->rpush($host, $fd);
|
$this->redis->set($host.':'.$fd.':group', $host);
|
}else{
|
$this->redis->rpush($groupID.':'.$host, $fd);
|
$this->redis->set($host.':'.$fd.':group', $groupID.':'.$host);
|
}
|
// 绑定业务数据
|
if(!empty($value)){
|
$this->redis->set($host.':'.$fd, $value);
|
}
|
}
|
|
/**
|
* 对指定数据和分组对当前集群节点进行广播。
|
* @param \stdClass $data 广播数据
|
* @param string $groupID 分组标识
|
*/
|
public function broadcastself($data, $groupID = '') {
|
$host = $this->server_config['allow_ip'];
|
$connections = array();
|
if(empty($groupID)){
|
// 获取所有无分组标识在线终端
|
$onlines = count($this->server->connections);
|
if ($onlines > 0) {
|
$connections = $this->server->connections;
|
}
|
}else{
|
// 获取所有分组标识在线终端 key = groupID:host
|
$key = $groupID . ':' . $host;
|
try {
|
$this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
|
$connections = $this->redis->lrange($key, 0, -1);
|
if (!is_array($connections)) {//redis异常
|
$this->log('redis异常','ws','broadcastself',['key'=>$key,'data'=>$data]);
|
return;
|
}
|
} catch (\Exception $e) {
|
$this->log('redis异常','ws','broadcastself',['key'=>$key,'data'=>$data,'msg'=>$e->getMessage()]);
|
return;
|
}
|
}
|
if(is_array($data)){
|
$data['group_id'] = $groupID;
|
}else{
|
$data->group_id = $groupID;
|
}
|
$connections = array_unique($connections);
|
// 广播获取的在线终端
|
foreach ($connections as $fd) {
|
$this->server->push($fd,json_encode($data));
|
}
|
}
|
/**
|
* 回复当前链接。
|
* @param \stdClass $data 回复数据
|
* @param bool $encode 如果$data是string格式,则encode为false
|
*/
|
public function replay($fd,$data,$encode = true) {
|
$json_data = $data;
|
if($encode){
|
$json_data = json_encode($data, JSON_UNESCAPED_UNICODE);
|
}
|
$this->server->push($fd, $json_data);
|
}
|
|
function log($title,$folder,$finename,$msg)
|
{
|
$logs = json_encode($msg);
|
$msg = "[".date('Y-m-d H:i:s')."]\t- INFO - ".$title." - ".$logs."\n";
|
//判断目的文件夹是否存在? 如果不存在就生成
|
//简单处理: 实际应用需要修改
|
$dst ='./logs/'.$folder;
|
if (is_dir($dst) === FALSE) {
|
@mkdir($dst, 0777, true);
|
}
|
$log_file_name = $dst.'/'.$finename.'-'.date('Y-m-d').'.log';
|
return @file_put_contents($log_file_name, $msg, FILE_APPEND);
|
}
|
}
|
$ws = new ws_server();
|
?>
|