zhoudw
2021-12-21 4a71b8b7fd4497e9d689e3c0eecfc35f7b5558eb
task
9 files added
343 ■■■■■ changed files
.gitignore 3 ●●●●● patch | view | raw | blame | history
config/config.php 23 ●●●●● patch | view | raw | blame | history
config/dev.php patch | view | raw | blame | history
config/pro.php patch | view | raw | blame | history
config/test.php patch | view | raw | blame | history
index.html 75 ●●●●● patch | view | raw | blame | history
logs/test.log 1 ●●●● patch | view | raw | blame | history
start.sh 2 ●●●●● patch | view | raw | blame | history
ws_server.php 239 ●●●●● patch | view | raw | blame | history
.gitignore
New file
@@ -0,0 +1,3 @@
.DS_Store
logs/*/
ws.out
config/config.php
New file
@@ -0,0 +1,23 @@
<?php
/*
 * @Author: your name
 * @Date: 2021-12-15 11:33:00
 * @LastEditTime: 2021-12-21 13:40:02
 * @LastEditors: Please set LastEditors
 * @Description: 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
 * @FilePath: /websocket/config/config.php
 */
return array(
    //WebSocket绑定、分组、在线等信息存储使用的Redis配置
    'redis' => array(
        'host' => '120.76.101.87',
        'port' => '51101'
    ),
    //本机WebSocket服务配置
    'server_config' => array(
        'allow_ip' => '120.25.156.26:58003',
        'host' => '0.0.0.0',
        'port' => '9501',
    ),
);
config/dev.php
config/pro.php
config/test.php
index.html
New file
@@ -0,0 +1,75 @@
<!DOCTYPE html>
<head>
    <title>socket demo</title>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <script type="text/javascript" src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js"></script>
</head>
<body>
<input type="text" id="msg" />
<button id="send">发送</button>
</body>
</html>
<script>
    $(function()
    {
        // 创建socket对象
        var socket = new WebSocket('ws://127.0.0.1:9501');
        // 打开Socket
        socket.onopen = function(event)
        {
            $('#send').on('click', function()
            {
                var msg = $('#msg').val();
                var url = 'application/index/data';
                var fullData = {
                    url:url,
                    data:{
                        session_id:'1a2e0ca9c7404b765a37a5ab07546fa8',
                        group_id:'1a2e0ca9c7404b765a37a5ab07546fa8',
                        msg:msg
                    }
                };
                socket.send(JSON.stringify(fullData));
                console.log(msg);
            });
                var url = 'application/index/reg';
                var fullData = {
                    url:url,
                    data:{
                        session_id:'1a2e0ca9c7404b765a37a5ab07546fa8',
                        group_id:'1a2e0ca9c7404b765a37a5ab07546fa8'
                    }
                };
            // 发送一个初始化消息
            socket.send(JSON.stringify(fullData));
            if (socket.readyState == 1) {
                setInterval(function(){
                    var fullData = {
                        url: 'plumeWSService/cluster/ping',
                        data: 'ping'
                    };
                    socket.send(JSON.stringify(fullData));
                },1000)
            }
            // 监听消息
            socket.onmessage = function(event) {
                if(event.data == 'pingpong'){//处理心跳接收事件
                }else{
                    var fullData = JSON.parse(event.data);
                    console.log("客户端监听到的消息:", fullData);
                }
            };
            // 监听Socket的关闭
            socket.onclose = function(event) {
                console.log("Socket关闭了", event);
            };
            // 关闭Socket....
            //socket.close()
        };
    });
</script>
logs/test.log
New file
@@ -0,0 +1 @@
日志
start.sh
New file
@@ -0,0 +1,2 @@
#!/bin/bash
php ws_server.php
ws_server.php
New file
@@ -0,0 +1,239 @@
<?php
/*
 * @Author: zhoudw
 * @Date: 2021-12-13 09:57:20
 * @Last Modified by: zhoudw
 * @Last Modified time: 2021-12-13 09:58:34
 */
class ws_server
{
    protected $server;
    protected $request;
    protected $frame;
    protected $server_config;
    protected $redis = null;
    protected $redis_config;
    public function __construct()
    {
        $config = require './config/config.php';
        $this->server_config = $config['server_config'];
        $this->redis_config  = $config['redis'];
        $this->server = new Swoole\WebSocket\Server($this->server_config['host'], $this->server_config['port']); // swoole连接
        $this->server->set([
            'task_worker_num'       => 8,
            'enable_coroutine'      => true,
            'task_enable_coroutine' => true
        ]);
        if (!$this->redis) {
            $this->redis = new Redis();
        }
        $this->server->on('start', function (Swoole\WebSocket\Server $server) {
            echo "Websocket Server is started at ws://".$this->server_config['host'].":".$this->server_config['port']."\n";
        });
        $this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
            $server->task($frame);
            // $ret = array('code' => 0, 'data' => null);
            // $msgData = $this->is_json($frame->data,true);
            // if($msgData){
            //     $frameData = $msgData;
            //     $this->dealMsg($frameData,$frame->fd);
            // }else{
            //     $ret['code'] = -1;
            //     $ret['msg'] = 'data is null or data no json';
            //     $server->push($frame->fd, json_encode($ret));
            //     return;
            // }
        });
        $this->server->on('task', function ($server, $task) {
            $ret = array('code' => 0, 'data' => null);
            $frame = $task->data;
            $msgData = $this->is_json($frame->data,true);
            if($msgData){
                $frameData = $msgData;
                $this->dealMsg($frameData,$frame->fd);
            }else{
                $ret['code'] = -1;
                $ret['msg'] = 'data is null or data no json';
                $server->push($frame->fd, json_encode($ret));
                return;
            }
        });
        $this->server->on('close', function ($ser, $fd) {
            $host  = $this->server_config['allow_ip'];
            $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
            $group_key =  $this->redis->get($host.':'.$fd.':group');
            $this->redis->lrem($group_key,$fd,0);
        });
        $this->server->start();
    }
    /**
     * 功能:1、判断是否是json
     * @param string $data
     * @param bool $assoc
     * @return bool|mixed|string
     */
    function is_json($data = '', $assoc = false)
    {
        $data = json_decode($data, $assoc);
        if ($data && (is_object($data)) || (is_array($data) && ! empty(current($data)))) {
            return $data;
        }
        return false;
    }
    /**
     * 功能:1、消息处理
     */
    public function dealMsg($msgContent,$fd){
        $ret = array('code' => 0, 'data' => null);
        // $this->server->push($fd,json_encode($ret));
        $url = $msgContent['url'];
        $data = $msgContent['data'];
        $action = substr($url, strrpos($url, "/") + 1);
        $this->log('action','ws','ws',$action);
        switch ($action)
        {
        case "reg":
            $this->log('reg','reg','ws',['content'=>$msgContent,'fd'=>$fd]);
            $groupId = $data['group_id'];
            $session = $data['session_id'];
            $this->bind($groupId,'',$fd);
            $key = "practice_scenes_ws_cur_session_info:".$session;
            $node_key = 'practice_scenes_ws_cur_session_node_info:'.$session;
            $redisData = $this->redis->get($key);
            if($redisData){
                $ret['data'] =json_decode($redisData,true);
                $this->redis->del($key);
            }else{
                $redisData = $this->redis->get($node_key);
                $ret['data'] =json_decode($redisData,true);
            }
            $ret['event'] = "re_bind";
            $this->replay($fd,$ret);
            break;
        case "data":
            $this->log('data','data','ws',['content'=>$msgContent,'fd'=>$fd]);
            $groupId = $data['group_id'];
            $ret['event'] = "data";
            $ret['data']  = $data;
            $this->broadcastself($ret , $groupId);
            break;
        case "ping":
            $this->replay($fd,$data.'pong', false);
            break;
        default:
            $ret['code'] = 404;
            $ret['msg'] = 'event no existent';
            $this->server->push($fd,json_encode($ret));
            return;
        }
    }
    /**
     * 当前链接的绑定事件。
     * 绑定的几种状态:
     * 1.无分组
     * host -> fds (获取当前节点的在线链接列表)
     * host:fd -> value (获取当前节点当前链接的绑定数据)
     * host:fd:group -> host (获取当前节点当前链接的绑定分组)
     * 2.有分组
     * groupID:host -> fds
     * host:fd -> value
     * host:fd:group -> groupID:host
     * @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组
     * @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定
     */
    public function bind($groupID = '', $value = '',$fd){
        $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
        $host  = $this->server_config['allow_ip'];
        // 绑定分组和当前链接对应的分组标识
        if(empty($groupID)){
            $this->redis->rpush($host, $fd);
            $this->redis->set($host.':'.$fd.':group', $host);
        }else{
            $this->redis->rpush($groupID.':'.$host, $fd);
            $this->redis->set($host.':'.$fd.':group', $groupID.':'.$host);
        }
        // 绑定业务数据
        if(!empty($value)){
            $this->redis->set($host.':'.$fd, $value);
        }
    }
      /**
     * 对指定数据和分组对当前集群节点进行广播。
     * @param \stdClass $data 广播数据
     * @param string $groupID 分组标识
     */
    public function broadcastself($data, $groupID = '') {
        $host  = $this->server_config['allow_ip'];
        $connections = array();
        if(empty($groupID)){
            // 获取所有无分组标识在线终端
            $onlines = count($this->server->connections);
            if ($onlines > 0) {
                $connections = $this->server->connections;
            }
        }else{
            // 获取所有分组标识在线终端 key = groupID:host
            $key = $groupID . ':' . $host;
            try {
                $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] );
                $connections = $this->redis->lrange($key, 0, -1);
                if (!is_array($connections)) {//redis异常
                    $this->log('redis异常','ws','broadcastself',['key'=>$key,'data'=>$data]);
                    return;
                }
            } catch (\Exception $e) {
                $this->log('redis异常','ws','broadcastself',['key'=>$key,'data'=>$data,'msg'=>$e->getMessage()]);
                return;
            }
        }
        if(is_array($data)){
            $data['group_id'] = $groupID;
        }else{
            $data->group_id = $groupID;
        }
        $connections = array_unique($connections);
        // 广播获取的在线终端
        foreach ($connections as $fd) {
            $this->server->push($fd,json_encode($data));
        }
    }
    /**
     * 回复当前链接。
     * @param \stdClass $data 回复数据
     * @param bool $encode 如果$data是string格式,则encode为false
     */
    public function replay($fd,$data,$encode = true) {
        $json_data = $data;
        if($encode){
            $json_data = json_encode($data, JSON_UNESCAPED_UNICODE);
        }
        $this->server->push($fd, $json_data);
    }
    function log($title,$folder,$finename,$msg)
    {
        $logs = json_encode($msg);
        $msg = "[".date('Y-m-d H:i:s')."]\t- INFO - ".$title." - ".$logs."\n";
        //判断目的文件夹是否存在? 如果不存在就生成
        //简单处理: 实际应用需要修改
        $dst ='./logs/'.$folder;
        if (is_dir($dst) === FALSE) {
            @mkdir($dst, 0777, true);
        }
        $log_file_name = $dst.'/'.$finename.'-'.date('Y-m-d').'.log';
        return @file_put_contents($log_file_name, $msg, FILE_APPEND);
    }
}
$ws = new ws_server();
?>