logger = $loggerFactory->get('log','default'); //获取 server.php 里的内容 $this->server_config = $this->config->get('server.servers',''); $this->allow_ip = $this->server_config[0]['allow_ip']; } public function onClose($server, int $fd, int $reactorId): void { //客户端异常关闭 //语音文件传输完毕,关闭文件。 $host = $this->allow_ip; $group_key = $this->transferService->get($host.':'.$fd.':group'); $this->transferService->lrem($group_key,$fd,0); $set_ket = $host.':'.$fd.':group'; $this->transferService->delete($set_ket); } public function onStart($server): void { $host = $this->allow_ip; $keyList = $this->transferService->keys("*{$host}*"); foreach ($keyList as $key => $value) { $this->transferService->delete($value); } } /** * @param Response|Server $server */ public function onMessage($server, Frame $frame): void { $ret = array('code' => 0, 'data' => null); $msgData = $this->is_json($frame->data,true); if($msgData){ $frameData = $msgData; $url = $frameData['url']; $data = $frameData['data']; $action = substr($url, strrpos($url, "/") + 1); switch ($action) { case "reg": $groupId = $data['group_id']; $session = $data['session_id']; $this->bind($groupId,'',$frame->fd); $key = "practice_scenes_ws_cur_session_info:".$session; $node_key = 'practice_scenes_ws_cur_session_node_info:'.$session; $redisData = $this->transferService->get($key); if($redisData){ $ret['data'] =json_decode($redisData,true); $this->transferService->delete($key); }else{ $redisData = $this->transferService->get($node_key); if($redisData){ $ret['data'] = json_decode($redisData,true); }else{ $ret['data'] = ''; } } $ret['event'] = "re_bind"; $server->push($frame->fd, json_encode($ret)); break; case "data": $groupId = $data['group_id']; $ret['event'] = "data"; $ret['data'] = $data; $connections = array(); if(empty($groupId)){ // 获取所有无分组标识在线终端 $onlines = count($server->connections); if ($onlines > 0) { $connections = $server->connections; } }else{ // 获取所有分组标识在线终端 key = groupID:host $key = $groupId . ':' . $this->allow_ip; $connections = $this->transferService->lrange($key, 0, -1); } $connections = array_unique($connections); // 广播获取的在线终端 foreach ($connections as $fd) { $ingfd = (int)$fd; $server->push($ingfd,json_encode($ret)); } break; case "ping": $server->push($frame->fd, $data.'pong'); break; default: $ret['code'] = 404; $ret['msg'] = 'event no existent'; $server->push($frame->fd,json_encode($ret)); return; } }else{ $ret['code'] = -1; $ret['msg'] = 'data is null or data no json'; $server->push($frame->fd, json_encode($ret)); return; } } public function onOpen($server, Request $request): void { // $server->push($request->fd, 'Opened'); // var_dump($request); } /** * 功能:1、判断是否是json * @param string $data * @param bool $assoc * @return bool|mixed|string */ public function is_json($data = '', $assoc = false) { $data = json_decode($data, $assoc); if ($data && (is_object($data)) || (is_array($data) && ! empty(current($data)))) { return $data; } return false; } /** * 当前链接的绑定事件。 * 绑定的几种状态: * 1.无分组 * host -> fds (获取当前节点的在线链接列表) * host:fd -> value (获取当前节点当前链接的绑定数据) * host:fd:group -> host (获取当前节点当前链接的绑定分组) * 2.有分组 * groupID:host -> fds * host:fd -> value * host:fd:group -> groupID:host * @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组 * @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定 */ public function bind($groupID = '', $value = '',$fd){ $host = $this->allow_ip; // 绑定分组和当前链接对应的分组标识 if(empty($groupID)){ $this->transferService->rpush($host, $fd); $this->transferService->set($host.':'.$fd.':group', $host); }else{ $this->transferService->rpush($groupID.':'.$host, $fd); $this->transferService->set($host.':'.$fd.':group', $groupID.':'.$host); } // 绑定业务数据 if(!empty($value)){ $this->transferService->set($host.':'.$fd, $value); } } }