SpringBoot整合RocketMQ实现事务/广播/顺序消息详解

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费
首页 新闻资讯 行业资讯 SpringBoot整合RocketMQ实现事务/广播/顺序消息详解

环境:springboot2.4.12 + RocketMQ4.8.0

依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>

配置文件

server:
  port:8080---rocketmq:
  nameServer: localhost:9876producer:group: demo-mq

普通消息

发送

@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid send(String message){
  rocketMQTemplate.convertAndSend("test-topic:tag2",MessageBuilder.withPayload(message).build());}

接收

@RocketMQMessageListener(topic="test-topic",consumerGroup="consumer01-group",selectorExpression="tag1 || tag2")@Componentpublicclass ConsumerListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){
    System.out.println("接收到消息:"+message);}


}

顺序消息

发送

@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid sendOrder(String topic,String message,String tags,intid){
  rocketMQTemplate.asyncSendOrderly(topic+":"+tags,MessageBuilder.withPayload(message).build(),"order-"+id,new SendCallback(){@Overridepublicvoid onSuccess(SendResult sendResult){
        System.err.println("msg-id: "+sendResult.getMsgId()+": "+message+"\tqueueId: "+sendResult.getMessageQueue().getQueueId());}@Overridepublicvoid onException(Throwable e){
        e.printStackTrace();}
    });}

这里是根据hashkey将消息发送到不同的队列中

@RocketMQMessageListener(topic="order-topic",consumerGroup="consumer02-group",selectorExpression="tag3 || tag4",consumeMode=ConsumeMode.ORDERLY)@Componentpublicclass ConsumerOrderListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){
    System.out.println(Thread.currentThread().getName()+" 接收到Order消息:"+message);}


}

consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。

结果

图片图片

当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:

图片图片

集群/广播消息模式

发送端

@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid send(String topic,String message,String tags){
  rocketMQTemplate.send(topic+":"+tags,MessageBuilder.withPayload(message).build());}

集群消息模式

消费端

@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6 || tag7",messageModel=MessageModel.CLUSTERING)@Componentpublicclass ConsumerBroadListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){
    System.out.println("ConsumerBroadListener1接收到消息:"+message);}


}

messageModel = MessageModel.CLUSTERING

测试

启动两个服务分别端口是8080,8081

8080服务

图片图片

8081服务

图片图片

集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡

广播消息模式

消费端

@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6 || tag7",messageModel=MessageModel.BROADCASTING)@Componentpublicclass ConsumerBroadListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){
    System.out.println("ConsumerBroadListener1接收到消息:"+message);}


}

messageModel = MessageModel.BROADCASTING

测试

启动两个服务分别端口是8080,8081

8080服务

图片图片

8081服务

图片图片


集群消息模式下,每个服务分别都接受了同样的消息。

事务消息

RocketMQ事务的3个状态

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

正常事务发送与提交阶段

1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)2、服务端响应消息写入结果,半消息发送成功3、开始执行本地事务4、根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求2、生产者收到确认回查请求后,检查本地事务的执行状态3、根据检查后的结果执行Commit或者Rollback操作补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

发送端

@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid sendTx(String topic,Long id,String tags){
  rocketMQTemplate.sendMessageInTransaction(topic+":"+tags,MessageBuilder.withPayload(new Users(id,UUID.randomUUID().toString().replaceAll("-",""))).setHeader("BID",UUID.randomUUID().toString().replaceAll("-","")).build(),UUID.randomUUID().toString().replaceAll("-",""));}

生产者对应的监听器

@RocketMQTransactionListenerpublicclass ProducerTxListener implements RocketMQLocalTransactionListener {@Resourceprivate BusinessService bs;@OverridepublicRocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg){// 这里执行本地的事务操作,比如保存数据。try {// 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据String id=(String)msg.getHeaders().get("BID");Users users=new JsonMapper().readValue((byte[])msg.getPayload(),Users.class);System.out.println("消息内容:"+users+"\t参与数据:"+arg+"\t本次事务的唯一编号:"+id);bs.save(users,new UsersLog(users.getId(),id));} catch(Exception e){
      e.printStackTrace();returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;}@OverridepublicRocketMQLocalTransactionState checkLocalTransaction(Message msg){// 这里检查本地事务是否执行成功String id=(String)msg.getHeaders().get("BID");System.out.println("执行查询ID为:"+id+" 的数据是否存在");UsersLog usersLog=bs.queryUsersLog(id);if(usersLog==null){returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;}


}

消费端

@RocketMQMessageListener(topic="tx-topic",consumerGroup="consumer05-group",selectorExpression="tag10")@Componentpublicclass ConsumerTxListener implements RocketMQListener<Users>{@Overridepublicvoid onMessage(Users users){
    System.out.println("TX接收到消息:"+users);}


}

Service

@Transactionalpublicbooleansave(Users users,UsersLog usersLog){
  usersRepository.save(users);usersLogRepository.save(usersLog);if(users.getId()==1){
    throw new RuntimeException("数据错误");}returntrue;}publicUsersLog queryUsersLog(String bid){returnusersLogRepository.findByBid(bid);}

Controller

@GetMapping("/tx/{id}")publicObject sendTx(@PathVariable("id")Long id){
  ps.sendTx("tx-topic",id,"tag10");return"send transaction success";}

测试

调用接口后,控制台输出:

图片图片

从打印日志看出来都保存完毕了后 消费端才接受到消息。

图片图片


图片图片


删除数据,再测试ID为1会报错的。

图片图片

数据库中没有数据。。。

是不是也不是很复杂,2个阶段来处理。

完毕!!!

20    2023-09-04 08:00:53    提交 事务 消息