__handlePropertyHandler(__CLASS__); // 第一个参数对应日志的 name, 第二个参数对应 config/autoload/logger.php 内的 key $this->logger = $loggerFactory->get('log', 'default'); //获取 server.php 里的内容 $this->server_config = $this->config->get('server.servers', ''); $this->allow_ip = $this->server_config[0]['allow_ip']; } public function onClose($server, int $fd, int $reactorId) : void { //客户端异常关闭 //语音文件传输完毕,关闭文件。 $host = $this->allow_ip; $group_key = $this->transferService->get($host . ':' . $fd . ':group'); $this->transferService->lrem($group_key, $fd, 0); $set_ket = $host . ':' . $fd . ':group'; $this->transferService->delete($set_ket); } public function onStart($server) : void { $host = $this->allow_ip; $keyList = $this->transferService->keys("*{$host}*"); foreach ($keyList as $key => $value) { $this->transferService->delete($value); } } /** * @param Response|Server $server */ public function onMessage($server, Frame $frame) : void { $ret = array('code' => 0, 'data' => null); $msgData = $this->is_json($frame->data, true); if ($msgData) { $frameData = $msgData; $url = $frameData['url']; $data = $frameData['data']; $action = substr($url, strrpos($url, "/") + 1); switch ($action) { case "reg": $groupId = $data['group_id']; $session = $data['session_id']; $this->bind($groupId, '', $frame->fd); $key = "practice_scenes_ws_cur_session_info:" . $session; $node_key = 'practice_scenes_ws_cur_session_node_info:' . $session; $redisData = $this->transferService->get($key); if ($redisData) { $ret['data'] = json_decode($redisData, true); $this->transferService->delete($key); } else { $redisData = $this->transferService->get($node_key); if ($redisData) { $ret['data'] = json_decode($redisData, true); } else { $ret['data'] = ''; } } $ret['event'] = "re_bind"; $server->push($frame->fd, json_encode($ret)); break; case "data": $groupId = $data['group_id']; $ret['event'] = "data"; $ret['data'] = $data; $connections = array(); if (empty($groupId)) { // 获取所有无分组标识在线终端 $onlines = count($server->connections); if ($onlines > 0) { $connections = $server->connections; } } else { // 获取所有分组标识在线终端 key = groupID:host $key = $groupId . ':' . $this->allow_ip; $connections = $this->transferService->lrange($key, 0, -1); } $connections = array_unique($connections); // 广播获取的在线终端 foreach ($connections as $fd) { $ingfd = (int) $fd; $server->push($ingfd, json_encode($ret)); } break; case "ping": $server->push($frame->fd, $data . 'pong'); break; default: $ret['code'] = 404; $ret['msg'] = 'event no existent'; $server->push($frame->fd, json_encode($ret)); return; } } else { $ret['code'] = -1; $ret['msg'] = 'data is null or data no json'; $server->push($frame->fd, json_encode($ret)); return; } } public function onOpen($server, Request $request) : void { // $server->push($request->fd, 'Opened'); // var_dump($request); } /** * 功能:1、判断是否是json * @param string $data * @param bool $assoc * @return bool|mixed|string */ public function is_json($data = '', $assoc = false) { $data = json_decode($data, $assoc); if ($data && is_object($data) || is_array($data) && !empty(current($data))) { return $data; } return false; } /** * 当前链接的绑定事件。 * 绑定的几种状态: * 1.无分组 * host -> fds (获取当前节点的在线链接列表) * host:fd -> value (获取当前节点当前链接的绑定数据) * host:fd:group -> host (获取当前节点当前链接的绑定分组) * 2.有分组 * groupID:host -> fds * host:fd -> value * host:fd:group -> groupID:host * @param string $groupID 需要绑定当前链接所在分组的分组标识,如果为空则使用默认分组 * @param string $value 需要绑定当前链接对应的业务数据,如果为空则不绑定 */ public function bind($groupID = '', $value = '', $fd) { $host = $this->allow_ip; // 绑定分组和当前链接对应的分组标识 if (empty($groupID)) { $this->transferService->rpush($host, $fd); $this->transferService->set($host . ':' . $fd . ':group', $host); } else { $this->transferService->rpush($groupID . ':' . $host, $fd); $this->transferService->set($host . ':' . $fd . ':group', $groupID . ':' . $host); } // 绑定业务数据 if (!empty($value)) { $this->transferService->set($host . ':' . $fd, $value); } } }