RabbitMQ是一个开源的消息代理软件,它使用高级消息队列协议(AMQP)来实现消息的发送和接收。RabbitMQ支持多种消息协议,包括STOMP、MQTT等,并且能够与多种编程语言和平台集成,如Java、.NET、Python等。
AMQP 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
图片来源:https://blog.csdn.net/cuierdan/article/details/123824300
Message Broker:(消息代理服务器)是一个虚拟的概念,而RabbitMQ是Message Broker的一个实例。
Producer:(生产者)产生数据并将数据发送到消息代理服务器(Message Broker)的程序被称作消息的生产者。
Connection:(连接)生产者与消费者通过TCP协议与消息代理服务器(Message Broker)创建的连接。
Channel:(信道)创建在Connection中的虚拟连接,类似于连接数据库时的连接池的概念,生产者和消费者并不是直接与MQ通过Connection进行通讯的,而是通过Channel进行连接通讯的,数据的流动是在Channel中进行的。
VirtualHost:(虚拟消息服务器)就像mysql数据库中有数据库实例的概念,并且可以指定用户对库和表等操作的设置权限。也可以类别成LINUX系统中的不同用户,不同用户之间是相互独立的。每个VirtualHost相当于一个相对独立mini的RabbitMQ服务器。每个VirtutalHost之间是相互隔离的,exchange,queue,message等不能互通。
Exchange:(交换机)交换机直接与Channel(信道)连接,接收来自于消息生产者产生的数据,在由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ的交换机有fanout(扇出),direct(直接),topic(主题),headers(标题)四种类型,每种交换机类型都对应着不同的路由规则,根据不同的路由规则,交换机会将消息路由到不同的队列中。
Binding: (绑定)交换机与队列之间的虚拟连接,在这个绑定中可以设置Binding Key,一个绑定就是用一个Binding Key将交换器和队列连接起来,设置的Binding Key存在着一定的规则,Exchange会将消息中携带的Routing Key与Binding Key 中设置的规则进行匹配,将消息发送到相应的队列中。Binding信息被保存到Exchange中的查询表中,用于Exchange将消息分发到队列的依据。
Routing Key:(路由键)用于匹配路由规则的依据,生产者在将消息发送到Exchange时,一般会指定一个Routing Key,交换机会根据Routing Key 来匹配Binding中设置的路由规则,将符合规则的消息发送到指定的队列中。
Queue:(消息队列)RabbitMQ中的内部对象用于存放消息的容器,RabbitMQ会将消息按照RabbitMQ的六大模式中的一种将队列中的消息发送给消费者,RabbitMQ会根据选择模式的不同将队列中的消息发送给一个或多个消费者,在连接到消费者之前,消息一直在等待消费者到队列中将消息取走。
Consumer:(消费者)消息的消费者,表示一个从队列中取消息的应用程序。
可靠性:RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
灵活的路由:在消息进入队列之前,通过交换器来路由消息。
扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。
支持多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
消息队列:允许应用程序将消息发送到队列中,然后由另一个应用程序从队列中取出并处理。
消息路由:支持将消息从发送者路由到一个或多个接收者。
消息持久化:确保消息在系统故障后不会丢失。
消息确认:确保消息被正确处理,如果处理失败,可以重新发送。
集群:支持在多个节点上运行,以提供高可用性和负载均衡。
这里使用1Panel安装,1Panel 是一个现代化、开源的 Linux 服务器运维管理面板
图片
图片
注:这里需要勾选【端口外部访问】,方便本地调试
安装成功后,就可以在镜像中找到已安装好的RabbitMQ镜像容器
图片
外网可访问地址:http://{{你的公网ip}}:15672,如果是在云服务器,记得安全策略放开端口15672
图片
RabbitMQ管理界面端口:15672。是一个Web应用程序,用于管理和监控RabbitMQ消息代理
AMQP默认端口:5672。是一种网络协议,用于在应用程序之间传递消息,通常用于消息队列系统。
登陆控制台,账号和密码都是rabbitmq
图片
这里使用webman插件RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67,
非常感谢兔子大佬的插件贡献!🌻
非常感谢兔子大佬的插件贡献!🌻
非常感谢兔子大佬的插件贡献!🌻
支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
支持延迟队列(RabbitMQ须安装插件);
异步无阻塞消费、异步无阻塞生产、同步阻塞生产
通过composer包管理安装:
composerrequireworkbunny/webman-rabbitmq
注:安装该插件前,请确保你已经安装好webman框架,相关安装文档:https://www.workerman.net/doc/webman/install.html
插件所有配置文件路径:config/plugin/workbunny/webman-rabbitmq/app.php
<?phpreturn['enable'=>true,'host'=>'120.120.120.74','vhost'=>'/','port'=>5672,'username'=>'rabbitmq','password'=>'rabbitmq','mechanisms'=>'AMQPLAIN',...];
host 修改为服务器公网ip
port 修改为15672
这里使用命令创建一个拥有单进程消费者的RestyBuilder
./webman workbunny:rabbitmq-builder resty--mode=queueℹ️ Config updated.ℹ️ Builder created.✅ Builder RestyBuilder created successfully.
创建完成后完整的消费者文件位置process/workbunny/rabbitmq/RestyBuilder.php
<?phpdeclare(strict_types=1);namespace process\workbunny\rabbitmq;useBunny\ChannelasBunnyChannel;useBunny\Async\ClientasBunnyClient;useBunny\MessageasBunnyMessage;useWorkbunny\WebmanRabbitMQ\Constants;useWorkbunny\WebmanRabbitMQ\Builders\QueueBuilder;class RestyBuilder extends QueueBuilder {/** * @var array = [ * 'name' => 'example', * 'delayed' => false, * 'prefetch_count' => 1, * 'prefetch_size' => 0, * 'is_global' => false, * 'routing_key' => '', * ] */protected array $queueConfig=[// 队列名称 ,默认由类名自动生成'name'=>'process.workbunny.rabbitmq.RestyBuilder',// 是否延迟'delayed'=>false,// QOS 数量'prefetch_count'=>0,// QOS size'prefetch_size'=>0,// QOS 全局'is_global'=>false,// 路由键'routing_key'=>'',];/** @var string 交换机类型 */protected string $exchangeType=Constants::DIRECT;/** @var string|null 交换机名称,默认由类名自动生成 */protected ?string $exchangeName='process.workbunny.rabbitmq.RestyBuilder';/** @inheritDoc */publicfunctionhandler(BunnyMessage $message,BunnyChannel $channel,BunnyClient $client): string { echo'[RabbitMQ][队列消费] Tag:'.$message->consumerTag.PHP_EOL;echo'[RabbitMQ][队列消费着] Content:'.$message->content.PHP_EOL;echo'[RabbitMQ][队列消费着] Exchange:'.$message->exchange.PHP_EOL;returnConstants::ACK;} }
phpstart.phpstartWorkerman[start.php]startinDEBUGmode----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------Workerman version:4.1.15PHP version:8.2.18Event-Loop:\Workerman\Events\Event------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------protouserworker listen processesstatustcp root webman http://0.0.0.0:8217 2 [OK]tcp root monitor none1[OK]tcp root plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder none1[OK]---------------------------------------------------------------------------------------------------------------------------------------------------------
usefunctionWorkbunny\WebmanRabbitMQ\sync_publish;useprocess\workbunny\rabbitmq\RestyBuilder;sync_publish(RestyBuilder::instance(),'兔子大佬你好呀!');# return bool
[RabbitMQ][队列消费]Tag:process.workbunny.rabbitmq.RestyBuilder[RabbitMQ][队列消费着]Content:开源技术小栈你好呀![RabbitMQ][队列消费着]Exchange:process.workbunny.rabbitmq.RestyBuilder...[RabbitMQ][队列消费]Tag:process.workbunny.rabbitmq.RestyBuilder[RabbitMQ][队列消费着]Content:兔子大佬你好呀![RabbitMQ][队列消费着]Exchange:process.workbunny.rabbitmq.RestyBuilder
图片
通过RabbitMQ管理界面端发送消息
消费者消费情况