Webman使用RabbitMQ消息中间件实现系统异步解耦实战教程

AMQP 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
首页 新闻资讯 行业资讯 Webman使用RabbitMQ消息中间件实现系统异步解耦实战教程

简介

RabbitMQ是一个开源的消息代理软件,它使用高级消息队列协议(AMQP)来实现消息的发送和接收。RabbitMQ支持多种消息协议,包括STOMP、MQTT等,并且能够与多种编程语言和平台集成,如Java、.NET、Python等。

AMQP 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

基本架构设计

图片

图片来源:https://blog.csdn.net/cuierdan/article/details/123824300

基本概念

  • Message Broker:(消息代理服务器)是一个虚拟的概念,而RabbitMQ是Message Broker的一个实例。

  • Producer:(生产者)产生数据并将数据发送到消息代理服务器(Message Broker)的程序被称作消息的生产者。

  • Connection:(连接)生产者与消费者通过TCP协议与消息代理服务器(Message Broker)创建的连接。

  • Channel:(信道)创建在Connection中的虚拟连接,类似于连接数据库时的连接池的概念,生产者和消费者并不是直接与MQ通过Connection进行通讯的,而是通过Channel进行连接通讯的,数据的流动是在Channel中进行的。

  • VirtualHost:(虚拟消息服务器)就像mysql数据库中有数据库实例的概念,并且可以指定用户对库和表等操作的设置权限。也可以类别成LINUX系统中的不同用户,不同用户之间是相互独立的。每个VirtualHost相当于一个相对独立mini的RabbitMQ服务器。每个VirtutalHost之间是相互隔离的,exchange,queue,message等不能互通。

  • Exchange:(交换机)交换机直接与Channel(信道)连接,接收来自于消息生产者产生的数据,在由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ的交换机有fanout(扇出),direct(直接),topic(主题),headers(标题)四种类型,每种交换机类型都对应着不同的路由规则,根据不同的路由规则,交换机会将消息路由到不同的队列中。

  • Binding: (绑定)交换机与队列之间的虚拟连接,在这个绑定中可以设置Binding Key,一个绑定就是用一个Binding Key将交换器和队列连接起来,设置的Binding Key存在着一定的规则,Exchange会将消息中携带的Routing Key与Binding Key 中设置的规则进行匹配,将消息发送到相应的队列中。Binding信息被保存到Exchange中的查询表中,用于Exchange将消息分发到队列的依据。

  • Routing Key:(路由键)用于匹配路由规则的依据,生产者在将消息发送到Exchange时,一般会指定一个Routing Key,交换机会根据Routing Key 来匹配Binding中设置的路由规则,将符合规则的消息发送到指定的队列中。

  • Queue:(消息队列)RabbitMQ中的内部对象用于存放消息的容器,RabbitMQ会将消息按照RabbitMQ的六大模式中的一种将队列中的消息发送给消费者,RabbitMQ会根据选择模式的不同将队列中的消息发送给一个或多个消费者,在连接到消费者之前,消息一直在等待消费者到队列中将消息取走。

  • Consumer:(消费者)消息的消费者,表示一个从队列中取消息的应用程序。

特点

  1. 可靠性:RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。

  2. 灵活的路由:在消息进入队列之前,通过交换器来路由消息。

  3. 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。

  4. 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。

  5. 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。

  6. 支持多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。

主要功能

  1. 消息队列:允许应用程序将消息发送到队列中,然后由另一个应用程序从队列中取出并处理。

  2. 消息路由:支持将消息从发送者路由到一个或多个接收者。

  3. 消息持久化:确保消息在系统故障后不会丢失。

  4. 消息确认:确保消息被正确处理,如果处理失败,可以重新发送。

  5. 集群:支持在多个节点上运行,以提供高可用性和负载均衡。

安装

这里使用1Panel安装,1Panel 是一个现代化、开源的 Linux 服务器运维管理面板

应用商店

图片图片

安装镜像

图片图片

注:这里需要勾选【端口外部访问】,方便本地调试

镜像

安装成功后,就可以在镜像中找到已安装好的RabbitMQ镜像容器

图片图片

外网可访问地址:http://{{你的公网ip}}:15672,如果是在云服务器,记得安全策略放开端口15672

图片图片

  • RabbitMQ管理界面端口:15672。是一个Web应用程序,用于管理和监控RabbitMQ消息代理

  • AMQP默认端口:5672。是一种网络协议,用于在应用程序之间传递消息,通常用于消息队列系统。

控制台

登陆控制台,账号和密码都是rabbitmq

图片图片

使用

这里使用webman插件RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67,

非常感谢兔子大佬的插件贡献!🌻

非常感谢兔子大佬的插件贡献!🌻

非常感谢兔子大佬的插件贡献!🌻

插件特点

  • 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;

  • 支持延迟队列(RabbitMQ须安装插件);

  • 异步无阻塞消费、异步无阻塞生产、同步阻塞生产

安装插件

通过composer包管理安装:

composerrequireworkbunny/webman-rabbitmq

注:安装该插件前,请确保你已经安装好webman框架,相关安装文档:https://www.workerman.net/doc/webman/install.html

配置

插件所有配置文件路径:config/plugin/workbunny/webman-rabbitmq/app.php

<?phpreturn['enable'=>true,'host'=>'120.120.120.74','vhost'=>'/','port'=>5672,'username'=>'rabbitmq','password'=>'rabbitmq','mechanisms'=>'AMQPLAIN',...];
  • host 修改为服务器公网ip

  • port 修改为15672

消费者

这里使用命令创建一个拥有单进程消费者的RestyBuilder

./webman workbunny:rabbitmq-builder resty--mode=queueℹ️ Config updated.ℹ️ Builder created.✅ Builder RestyBuilder created successfully.

创建完成后完整的消费者文件位置process/workbunny/rabbitmq/RestyBuilder.php

<?phpdeclare(strict_types=1);namespace process\workbunny\rabbitmq;useBunny\ChannelasBunnyChannel;useBunny\Async\ClientasBunnyClient;useBunny\MessageasBunnyMessage;useWorkbunny\WebmanRabbitMQ\Constants;useWorkbunny\WebmanRabbitMQ\Builders\QueueBuilder;class RestyBuilder extends QueueBuilder
{/**
     * @var array = [
     *   'name'           => 'example',
     *   'delayed'        => false,
     *   'prefetch_count' => 1,
     *   'prefetch_size'  => 0,
     *   'is_global'      => false,
     *   'routing_key'    => '',
     * ]
     */protected array $queueConfig=[// 队列名称 ,默认由类名自动生成'name'=>'process.workbunny.rabbitmq.RestyBuilder',// 是否延迟'delayed'=>false,// QOS 数量'prefetch_count'=>0,// QOS size'prefetch_size'=>0,// QOS 全局'is_global'=>false,// 路由键'routing_key'=>'',];/** @var string 交换机类型 */protected string $exchangeType=Constants::DIRECT;/** @var string|null 交换机名称,默认由类名自动生成 */protected ?string $exchangeName='process.workbunny.rabbitmq.RestyBuilder';/** @inheritDoc */publicfunctionhandler(BunnyMessage $message,BunnyChannel $channel,BunnyClient $client): string 
    {
        echo'[RabbitMQ][队列消费] Tag:'.$message->consumerTag.PHP_EOL;echo'[RabbitMQ][队列消费着] Content:'.$message->content.PHP_EOL;echo'[RabbitMQ][队列消费着] Exchange:'.$message->exchange.PHP_EOL;returnConstants::ACK;}
}

启动webman

phpstart.phpstartWorkerman[start.php]startinDEBUGmode----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------Workerman version:4.1.15PHP version:8.2.18Event-Loop:\Workerman\Events\Event------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------protouserworker                                                                      listen                 processesstatustcp     root            webman                                                                      http://0.0.0.0:8217    2             [OK]tcp     root            monitor                                                                     none1[OK]tcp     root            plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder    none1[OK]---------------------------------------------------------------------------------------------------------------------------------------------------------

生产者

usefunctionWorkbunny\WebmanRabbitMQ\sync_publish;useprocess\workbunny\rabbitmq\RestyBuilder;sync_publish(RestyBuilder::instance(),'兔子大佬你好呀!');# return bool

消费结果

[RabbitMQ][队列消费]Tag:process.workbunny.rabbitmq.RestyBuilder[RabbitMQ][队列消费着]Content:开源技术小栈你好呀![RabbitMQ][队列消费着]Exchange:process.workbunny.rabbitmq.RestyBuilder...[RabbitMQ][队列消费]Tag:process.workbunny.rabbitmq.RestyBuilder[RabbitMQ][队列消费着]Content:兔子大佬你好呀![RabbitMQ][队列消费着]Exchange:process.workbunny.rabbitmq.RestyBuilder

图片图片

RabbitMQ管理界面

图片

通过RabbitMQ管理界面端发送消息

图片

消费者消费情况

图片图片

25    2024-06-11 00:00:05    RabbitMQ AMQP 协议