server_config = $config['server_config']; $this->redis_config = $config['redis']; $this->server = new Swoole\WebSocket\Server($this->server_config['host'], $this->server_config['port']); // swoole连接 $this->server->set([ 'task_worker_num' => 8, 'enable_coroutine' => true, 'task_enable_coroutine' => true ]); if (!$this->redis) { $this->redis = new Redis(); $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] ); $host = $this->server_config['allow_ip']; $keyList = $this->redis->keys("*{$host}*"); foreach ($keyList as $key => $value) { $this->redis->del($value); } $this->redis->close(); } $this->server->on('start', function (Swoole\WebSocket\Server $server) { echo "Websocket Server is started at ws://".$this->server_config['host'].":".$this->server_config['port']."\n"; }); $this->server->on('message', function (Swoole\WebSocket\Server $server, $frame) { $server->task($frame); // $ret = array('code' => 0, 'data' => null); // $msgData = $this->is_json($frame->data,true); // if($msgData){ // $frameData = $msgData; // $this->dealMsg($frameData,$frame->fd); // }else{ // $ret['code'] = -1; // $ret['msg'] = 'data is null or data no json'; // $server->push($frame->fd, json_encode($ret)); // return; // } }); $this->server->on('task', function ($server, $task) { $ret = array('code' => 0, 'data' => null); $frame = $task->data; $msgData = $this->is_json($frame->data,true); if($msgData){ $frameData = $msgData; $this->dealMsg($frameData,$frame->fd); }else{ $ret['code'] = -1; $ret['msg'] = 'data is null or data no json'; $server->push($frame->fd, json_encode($ret)); return; } }); $this->server->on('close', function ($ser, $fd) { $host = $this->server_config['allow_ip']; $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] ); $group_key = $this->redis->get($host.':'.$fd.':group'); $this->redis->lrem($group_key,$fd,0); }); $this->server->start(); } /** * 功能:1、判断是否是json * @param string $data * @param bool $assoc * @return bool|mixed|string */ function is_json($data = '', $assoc = false) { $data = json_decode($data, $assoc); if ($data && (is_object($data)) || (is_array($data) && ! empty(current($data)))) { return $data; } return false; } /** * 功能:1、消息处理 */ public function dealMsg($msgContent,$fd){ $ret = array('code' => 0, 'data' => null); // $this->server->push($fd,json_encode($ret)); $url = $msgContent['url']; $data = $msgContent['data']; $action = substr($url, strrpos($url, "/") + 1); $this->log('action','ws','ws',$action); switch ($action) { case "reg": $this->log('reg','reg','ws',['content'=>$msgContent,'fd'=>$fd]); $groupId = $data['group_id']; $session = $data['session_id']; $this->bind($groupId,'',$fd); $key = "practice_scenes_ws_cur_session_info:".$session; $node_key = 'practice_scenes_ws_cur_session_node_info:'.$session; $redisData = $this->redis->get($key); if($redisData){ $ret['data'] =json_decode($redisData,true); $this->redis->del($key); }else{ $redisData = $this->redis->get($node_key); $ret['data'] =json_decode($redisData,true); } $ret['event'] = "re_bind"; $this->replay($fd,$ret); break; case "data": $this->log('data','data','ws',['content'=>$msgContent,'fd'=>$fd]); $groupId = $data['group_id']; $ret['event'] = "data"; $ret['data'] = $data; $this->broadcastself($ret , $groupId); break; case "ping": $this->replay($fd,$data.'pong', false); break; default: $ret['code'] = 404; $ret['msg'] = 'event no existent'; $this->server->push($fd,json_encode($ret)); return; } } /** * 当前链接的绑定事件。 * 绑定的几种状态: * 1.无分组 * host -> fds (获取当前节点的在线链接列表) * host:fd -> value (获取当前节点当前链接的绑定数据) * host:fd:group -> host (获取当前节点当前链接的绑定分组) * 2.有分组 * groupID:host -> fds * host:fd -> value * host:fd:group -> groupID:host * @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组 * @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定 */ public function bind($groupID = '', $value = '',$fd){ $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] ); $host = $this->server_config['allow_ip']; // 绑定分组和当前链接对应的分组标识 if(empty($groupID)){ $this->redis->rpush($host, $fd); $this->redis->set($host.':'.$fd.':group', $host); }else{ $this->redis->rpush($groupID.':'.$host, $fd); $this->redis->set($host.':'.$fd.':group', $groupID.':'.$host); } // 绑定业务数据 if(!empty($value)){ $this->redis->set($host.':'.$fd, $value); } } /** * 对指定数据和分组对当前集群节点进行广播。 * @param \stdClass $data 广播数据 * @param string $groupID 分组标识 */ public function broadcastself($data, $groupID = '') { $host = $this->server_config['allow_ip']; $connections = array(); if(empty($groupID)){ // 获取所有无分组标识在线终端 $onlines = count($this->server->connections); if ($onlines > 0) { $connections = $this->server->connections; } }else{ // 获取所有分组标识在线终端 key = groupID:host $key = $groupID . ':' . $host; try { $this->redis->connect($this->redis_config['host'],$this->redis_config['port'] ); $connections = $this->redis->lrange($key, 0, -1); if (!is_array($connections)) {//redis异常 $this->log('redis异常','ws','broadcastself',['key'=>$key,'data'=>$data]); return; } } catch (\Exception $e) { $this->log('redis异常','ws','broadcastself',['key'=>$key,'data'=>$data,'msg'=>$e->getMessage()]); return; } } if(is_array($data)){ $data['group_id'] = $groupID; }else{ $data->group_id = $groupID; } $connections = array_unique($connections); // 广播获取的在线终端 foreach ($connections as $fd) { $this->server->push($fd,json_encode($data)); } } /** * 回复当前链接。 * @param \stdClass $data 回复数据 * @param bool $encode 如果$data是string格式,则encode为false */ public function replay($fd,$data,$encode = true) { $json_data = $data; if($encode){ $json_data = json_encode($data, JSON_UNESCAPED_UNICODE); } $this->server->push($fd, $json_data); } function log($title,$folder,$finename,$msg) { $logs = json_encode($msg); $msg = "[".date('Y-m-d H:i:s')."]\t- INFO - ".$title." - ".$logs."\n"; //判断目的文件夹是否存在? 如果不存在就生成 //简单处理: 实际应用需要修改 $dst ='./logs/'.$folder; if (is_dir($dst) === FALSE) { @mkdir($dst, 0777, true); } $log_file_name = $dst.'/'.$finename.'-'.date('Y-m-d').'.log'; return @file_put_contents($log_file_name, $msg, FILE_APPEND); } } $ws = new ws_server(); ?>