环境:springboot2.4.12 + RocketMQ4.8.0
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
server: port:8080---rocketmq: nameServer: localhost:9876producer:group: demo-mq
@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid send(String message){ rocketMQTemplate.convertAndSend("test-topic:tag2",MessageBuilder.withPayload(message).build());}
@RocketMQMessageListener(topic="test-topic",consumerGroup="consumer01-group",selectorExpression="tag1 || tag2")@Componentpublicclass ConsumerListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){ System.out.println("接收到消息:"+message);} }
@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid sendOrder(String topic,String message,String tags,intid){ rocketMQTemplate.asyncSendOrderly(topic+":"+tags,MessageBuilder.withPayload(message).build(),"order-"+id,new SendCallback(){@Overridepublicvoid onSuccess(SendResult sendResult){ System.err.println("msg-id: "+sendResult.getMsgId()+": "+message+"\tqueueId: "+sendResult.getMessageQueue().getQueueId());}@Overridepublicvoid onException(Throwable e){ e.printStackTrace();} });}
这里是根据hashkey将消息发送到不同的队列中
@RocketMQMessageListener(topic="order-topic",consumerGroup="consumer02-group",selectorExpression="tag3 || tag4",consumeMode=ConsumeMode.ORDERLY)@Componentpublicclass ConsumerOrderListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){ System.out.println(Thread.currentThread().getName()+" 接收到Order消息:"+message);} }
consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。
图片
当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:
图片
@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid send(String topic,String message,String tags){ rocketMQTemplate.send(topic+":"+tags,MessageBuilder.withPayload(message).build());}
@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6 || tag7",messageModel=MessageModel.CLUSTERING)@Componentpublicclass ConsumerBroadListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){ System.out.println("ConsumerBroadListener1接收到消息:"+message);} }
messageModel = MessageModel.CLUSTERING
启动两个服务分别端口是8080,8081
8080服务
图片
图片
集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡
@RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",selectorExpression="tag6 || tag7",messageModel=MessageModel.BROADCASTING)@Componentpublicclass ConsumerBroadListener implements RocketMQListener<String>{@Overridepublicvoid onMessage(String message){ System.out.println("ConsumerBroadListener1接收到消息:"+message);} }
messageModel = MessageModel.BROADCASTING
启动两个服务分别端口是8080,8081
图片
图片
集群消息模式下,每个服务分别都接受了同样的消息。
RocketMQ事务的3个状态
TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。
RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:
正常事务发送与提交阶段
1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)2、服务端响应消息写入结果,半消息发送成功3、开始执行本地事务4、根据本地事务的执行状态执行Commit或者Rollback操作
事务信息的补偿流程1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求2、生产者收到确认回查请求后,检查本地事务的执行状态3、根据检查后的结果执行Commit或者Rollback操作补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。
@Resourceprivate RocketMQTemplate rocketMQTemplate;publicvoid sendTx(String topic,Long id,String tags){ rocketMQTemplate.sendMessageInTransaction(topic+":"+tags,MessageBuilder.withPayload(new Users(id,UUID.randomUUID().toString().replaceAll("-",""))).setHeader("BID",UUID.randomUUID().toString().replaceAll("-","")).build(),UUID.randomUUID().toString().replaceAll("-",""));}
@RocketMQTransactionListenerpublicclass ProducerTxListener implements RocketMQLocalTransactionListener {@Resourceprivate BusinessService bs;@OverridepublicRocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg){// 这里执行本地的事务操作,比如保存数据。try {// 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据String id=(String)msg.getHeaders().get("BID");Users users=new JsonMapper().readValue((byte[])msg.getPayload(),Users.class);System.out.println("消息内容:"+users+"\t参与数据:"+arg+"\t本次事务的唯一编号:"+id);bs.save(users,new UsersLog(users.getId(),id));} catch(Exception e){ e.printStackTrace();returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;}@OverridepublicRocketMQLocalTransactionState checkLocalTransaction(Message msg){// 这里检查本地事务是否执行成功String id=(String)msg.getHeaders().get("BID");System.out.println("执行查询ID为:"+id+" 的数据是否存在");UsersLog usersLog=bs.queryUsersLog(id);if(usersLog==null){returnRocketMQLocalTransactionState.ROLLBACK;}returnRocketMQLocalTransactionState.COMMIT;} }
@RocketMQMessageListener(topic="tx-topic",consumerGroup="consumer05-group",selectorExpression="tag10")@Componentpublicclass ConsumerTxListener implements RocketMQListener<Users>{@Overridepublicvoid onMessage(Users users){ System.out.println("TX接收到消息:"+users);} }
@Transactionalpublicbooleansave(Users users,UsersLog usersLog){ usersRepository.save(users);usersLogRepository.save(usersLog);if(users.getId()==1){ throw new RuntimeException("数据错误");}returntrue;}publicUsersLog queryUsersLog(String bid){returnusersLogRepository.findByBid(bid);}
@GetMapping("/tx/{id}")publicObject sendTx(@PathVariable("id")Long id){ ps.sendTx("tx-topic",id,"tag10");return"send transaction success";}
调用接口后,控制台输出:
图片
从打印日志看出来都保存完毕了后 消费端才接受到消息。
图片
图片
删除数据,再测试ID为1会报错的。
图片
数据库中没有数据。。。
是不是也不是很复杂,2个阶段来处理。
完毕!!!