基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

<?phpreturn [    'mode' => SWOOLE_PROCESS,    'servers' => [        [            'name' => 'http',            'type' => Server::SERVER_HTTP,            'host' => '0.0.0.0',            'port' => 11111,            'sock_type' => SWOOLE_SOCK_TCP,            'callbacks' => [                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],            ],        ],        [            'name' => 'ws',            'type' => Server::SERVER_WEBSOCKET,            'host' => '0.0.0.0',            'port' => 12222,            'sock_type' => SWOOLE_SOCK_TCP,            'callbacks' => [                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],            ],        ],    ],

WebSocket 服务器端代码示例

<?phpdeclare(strict_types=1);/** * This file is part of Hyperf. * * @link     https://www.hyperf.io * @document https://doc.hyperf.io * @contact  group@hyperf.io * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */namespace App\Controller;use Hyperf\Contract\OnCloseInterface;use Hyperf\Contract\OnMessageInterface;use Hyperf\Contract\OnOpenInterface;use Swoole\Http\Request;use Swoole\Server;use Swoole\Websocket\Frame;use Swoole\WebSocket\Server as WebSocketServer;class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface{    /**     * 发送消息     * @param WebSocketServer $server     * @param Frame $frame     */    public function onMessage(WebSocketServer $server, Frame $frame): void    {        //心跳刷新缓存        $redis = $this->container->get(\Redis::class);        //获取所有的客户端id        $fdList = $redis->sMembers('websocket_sjd_1');        //如果当前客户端在客户端集合中,就刷新        if (in_array($frame->fd, $fdList)) {            $redis->sAdd('websocket_sjd_1', $frame->fd);            $redis->expire('websocket_sjd_1', 7200);        }        $server->push($frame->fd, 'Recv: ' . $frame->data);    }    /**     * 客户端失去链接     * @param Server $server     * @param int $fd     * @param int $reactorId     */    public function onClose(Server $server, int $fd, int $reactorId): void    {        //删掉客户端id        $redis = $this->container->get(\Redis::class);        //移除集合中指定的value        $redis->sRem('websocket_sjd_1', $fd);        var_dump('closed');    }    /**     * 客户端链接     * @param WebSocketServer $server     * @param Request $request     */    public function onOpen(WebSocketServer $server, Request $request): void    {        //保存客户端id        $redis = $this->container->get(\Redis::class);        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);        var_dump($res1);        $res = $redis->expire('websocket_sjd_1', 7200);        var_dump($res);        $server->push($request->fd, 'Opened');    }}

WebSocket 前端代码

function WebSocketTest() {        if ("WebSocket" in window) {            console.log("您的浏览器支持 WebSocket!");            var num = 0            // 打开一个 web socket            var ws = new WebSocket("ws://127.0.0.1:12222");            ws.onopen = function () {                // Web Socket 已连接上,使用 send() 方法发送数据                //alert("数据发送中...");                //ws.send("发送数据");            };            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开                var ping = {"type": "ping"};                ws.send(JSON.stringify(ping));            }, 5000);            ws.onmessage = function (evt) {                var d = JSON.parse(evt.data);                console.log(d);                if (d.code == 300) {                    $(".address").text(d.address)                }                if (d.code == 200) {                    var v = d.data                    console.log(v);                    num++                    var str = `<div class="item">                                    <p>${v.recordOutTime}</p>                                    <p>${v.userOutName}</p>                                    <p>${v.userOutNum}</p>                                    <p>${v.doorOutName}</p>                                </div>`                    $(".tableHead").after(str)                    if (num > 7) {                        num--                        $(".table .item:nth-last-child(1)").remove()                    }                }            };            ws.error = function (e) {                console.log(e)                alert(e)            }            ws.onclose = function () {                // 关闭 websocket                alert("连接已关闭...");            };        } else {            alert("您的浏览器不支持 WebSocket!");        }    }

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?phpreturn [    'default' => [        'host' => 'localhost',        'port' => 5672,        'user' => 'guest',        'password' => 'guest',        'vhost' => '/',        'pool' => [            'min_connections' => 1,            'max_connections' => 10,            'connect_timeout' => 10.0,            'wait_timeout' => 3.0,            'heartbeat' => -1,        ],        'params' => [            'insist' => false,            'login_method' => 'AMQPLAIN',            'login_response' => null,            'locale' => 'en_US',            'connection_timeout' => 3.0,            'read_write_timeout' => 6.0,            'context' => null,            'keepalive' => false,            'heartbeat' => 3,        ],    ],];

MQ 消费者代码

<?phpdeclare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use Hyperf\Amqp\Result;use Hyperf\Server\Server;use Hyperf\Server\ServerFactory;/** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */class DemoConsumer extends ConsumerMessage{    /**     * rabbmitMQ消费端代码     * @param $data     * @return string     */    public function consume($data): string    {        print_r($data);        //获取集合中所有的value        $redis = $this->container->get(\Redis::class);        $fdList=$redis->sMembers('websocket_sjd_1');        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();        foreach($fdList as $key=>$v){            if(!empty($v)){                $server->push((int)$v, $data);            }        }        return Result::ACK;    }}

控制器代码

  /**     * test     * @return array     */    public function test()    {        $data = array(            'code' => 200,            'data' => [                'userOutName' => 'ccflow',                'userOutNum' => '9999',                'recordOutTime' => date("Y-m-d H:i:s", time()),                'doorOutName' => '教师公寓',            ]        );        $data = \GuzzleHttp\json_encode($data);        $message = new DemoProducer($data);        $producer = ApplicationContext::getContainer()->get(Producer::class);        $result = $producer->produce($message);        var_dump($result);        $user = $this->request->input('user', 'Hyperf');        $method = $this->request->getMethod();        return [            'method' => $method,            'message' => "{$user}.",        ];    }

最终效果

微信截图_20200605091315.png

推荐教程:《PHP教程》

更多相关文章

  1. 使用 PHPStan 强化PHP代码质量
  2. use关键字在php中的使用(含代码)
  3. 详解php中的几种常见排序方法(附代码)
  4. 代码分析php中的回调函数
  5. php代码如何在html文件里面执行(详解)
  6. PHP让人不知道的匿名函数的几种写法(附代码)
  7. php中的可变变量(代码详解)
  8. PHP中一些常用操作类代码解析
  9. use在php中的使用方法(代码示例)

随机推荐

  1. 让Python在Android系统上飞一会儿
  2. Androidの自定义Spinner实现
  3. 【Android Studio使用教程4】Android Stu
  4. 修改android升级系统后启动系统,提示andro
  5. Android TextView 设置行间距字间距
  6. Android 入门前言之 --布局
  7. android selector 背景选择器的使用, butt
  8. Android Studio系列(四)Version Control II
  9. 8大你不得不知的Android调试工具
  10. android 去掉顶部状态栏