SpringBoot与Axon Framework整合,实现事件溯源驱动的分布式业务系统

Axon Framework 是一个用于构建复杂分布式系统的开源框架,特别适用于实现事件溯源(Eve
首页 新闻资讯 行业资讯 SpringBoot与Axon Framework整合,实现事件溯源驱动的分布式业务系统

Axon Framework 是一个用于构建复杂分布式系统的开源框架,特别适用于实现事件溯源(Event Sourcing)和命令查询责任分离(CQRS)模式,提供强大的工具来简化事件驱动架构的开发。

选择Axon Framework的理由

1. 事件溯源(Event Sourcing)

  • 数据完整性: 事件溯源通过记录每个业务操作的变化事件来保持数据的完整性和一致性。这对于金融系统尤为重要,因为它需要精确跟踪每一笔交易的历史记录。

  • 审计和合规性: 银行业务对审计和合规性有严格的要求。事件溯源可以帮助我们轻松地重建历史状态,并提供详细的变更日志。

2. CQRS 模式(Command Query Responsibility Segregation)

  • 分离读写操作: CQRS 将读操作和写操作分开,使得系统可以在不同的优化方向上独立发展。这有助于提高系统的性能和可扩展性。

  • 灵活的设计: 分离读写逻辑可以简化复杂查询的设计,同时允许使用不同类型的数据库来满足不同的性能需求。

3. 高性能和可扩展性

  • 分布式架构: Axon 支持构建分布式的微服务架构,适用于大规模的应用场景。它可以处理高并发请求,并且易于水平扩展。

  • 异步处理: Axon 提供了强大的异步命令处理机制,减少了事务的锁定时间,提高了系统的吞吐量。

4. 丰富的生态系统

  • 内置支持: Axon 框架提供了许多开箱即用的功能,如事件存储、聚合管理、命令总线等,大大减少了开发工作量。

  • 社区和支持: Axon 拥有一个活跃的开发者社区和技术文档,便于解决在开发过程中遇到的问题。

5. 领域驱动设计(DDD)的支持

  • 模型驱动: Axon 强调领域驱动设计,鼓励将复杂的业务逻辑分解为小的、自治的聚合根,从而更好地反映真实的业务场景。

  • 清晰的职责划分: 通过使用 DDD 原则,我们可以确保每个模块都有明确的职责,提高了代码的可维护性和可理解性。

6. 安全性

  • 细粒度控制: Axon 提供了细粒度的安全控制机制,可以根据不同的角色和权限执行不同的操作。

  • 加密和认证: 结合 Spring Security 等安全框架,可以进一步增强系统的安全性,保护敏感信息。

应用案例

1. ING Bank

ING 银行是最早采用 Axon Framework 的大型金融机构之一。他们利用 Axon 构建了多个分布式系统,包括支付处理、账户管理和风险评估等关键业务流程。

  • 项目: ING 使用 Axon 来构建其下一代银行平台,实现了高可用性和可扩展性。

  • 优势: 通过事件溯源提高了数据一致性和审计能力。

2. KLM Royal Dutch Airlines

荷兰皇家航空(KLM)使用 Axon Framework 来重构其核心预订系统,以提高系统的灵活性和响应速度。

  • 项目: KLM 通过 Axon 实现了订单管理系统的现代化,支持复杂的业务规则和多渠道集成。

  • 优势: 增强了系统的可维护性和可扩展性。

3. Baloise Insurance Group

巴洛伊兹保险集团是一家瑞士保险公司,使用 Axon Framework 来改进其理赔处理系统。

  • 项目: 巴洛伊兹利用 Axon 构建了一个灵活且可扩展的理赔处理平台。

  • 优势: 提升了理赔处理的速度和准确性,并简化了系统的维护工作。

4. Adyen

Adyen 是一家全球领先的支付服务提供商,使用 Axon Framework 来处理复杂的支付交易和结算流程。

  • 项目: Adyen 利用 Axon 实现了一个高性能的支付处理引擎,支持实时交易处理。

  • 优势: 确保了交易的可靠性和一致性,提升了系统的性能。

5. Deutsche Bahn

德意志铁路公司使用 Axon Framework 来优化其票务系统。

  • 项目: 德意志铁路利用 Axon 构建了一个现代化的票务平台,支持在线购票和退票等功能。

  • 优势: 提高了系统的稳定性和用户体验。

6. Zalando SE

Zalando 是一家德国电商平台,使用 Axon Framework 来构建其订单管理系统。

  • 项目: Zalando 利用 Axon 实现了一个高度可扩展的订单管理系统,支持复杂的业务流程。

  • 优势: 提升了系统的响应能力和可维护性。

代码实操

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.5</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>axon-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>axon-demo</name><description>Demo projectforSpring BootandAxon FrameworkwithMySQL</description><properties><java.version>11</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.axonframework</groupId><artifactId>axon-spring-boot-starter</artifactId><version>4.6.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.axonframework</groupId><artifactId>axon-test</artifactId><version>4.6.0</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

src/main/java/com/example/axondemo/aggregate/BankAccountAggregate.java

package com.example.axondemo.aggregate;importlombok.extern.slf4j.Slf4j;importorg.axonframework.commandhandling.CommandHandler;importorg.axonframework.eventsourcing.EventSourcingHandler;importorg.axonframework.modelling.command.AggregateIdentifier;importorg.axonframework.spring.stereotype.Aggregate;importorg.axonframework.modelling.command.AggregateLifecycle;@Slf4j@Aggregatepublicclass BankAccountAggregate {@AggregateIdentifierprivate String accountId;// 账户ID,作为聚合根的标识符privatedoublebalance;// 账户余额publicBankAccountAggregate(){}// 默认构造函数// 处理创建账户命令@CommandHandlerpublicBankAccountAggregate(com.example.axondemo.command.CreateBankAccountCommand command){if(command.getInitialDeposit()<0){
            throw new IllegalArgumentException("初始存款必须为正数");}
        log.info("处理创建账户命令,账户ID: {}",command.getAccountId());// 应用事件来更改状态AggregateLifecycle.apply(new com.example.axondemo.event.BankAccountCreatedEvent(command.getAccountId(),command.getInitialDeposit()));}// 处理存款命令@CommandHandlerpublicvoid handle(com.example.axondemo.command.DepositMoneyCommand command){if(command.getAmount()<=0){
            throw new IllegalArgumentException("存款金额必须为正数");}
        log.info("处理存款命令,账户ID: {}",command.getAccountId());// 应用事件来更改状态AggregateLifecycle.apply(new com.example.axondemo.event.MoneyDepositedEvent(command.getAccountId(),command.getAmount()));}// 处理取款命令@CommandHandlerpublicvoid handle(com.example.axondemo.command.WithdrawMoneyCommand command){if(command.getAmount()>balance||command.getAmount()<=0){
            throw new IllegalArgumentException("无效的取款金额");}
        log.info("处理取款命令,账户ID: {}",command.getAccountId());// 应用事件来更改状态AggregateLifecycle.apply(new com.example.axondemo.event.MoneyWithdrewEvent(command.getAccountId(),command.getAmount()));}// 处理账户创建事件@EventSourcingHandlerprotected voidon(com.example.axondemo.event.BankAccountCreatedEvent event){
        this.accountId=event.getAccountId();this.balance=event.getInitialDeposit();log.info("应用账户创建事件,账户ID: {}",event.getAccountId());}// 处理存款事件@EventSourcingHandlerprotected voidon(com.example.axondemo.event.MoneyDepositedEvent event){
        this.balance+=event.getAmount();log.info("应用存款事件,账户ID: {}, 金额: {}",event.getAccountId(),event.getAmount());}// 处理取款事件@EventSourcingHandlerprotected voidon(com.example.axondemo.event.MoneyWithdrewEvent event){
        this.balance-=event.getAmount();log.info("应用取款事件,账户ID: {}, 金额: {}",event.getAccountId(),event.getAmount());}
}

src/main/java/com/example/axondemo/command/CreateBankAccountCommand.java

package com.example.axondemo.command;importlombok.Builder;importlombok.Data;importorg.axonframework.modelling.command.TargetAggregateIdentifier;@Data@Builderpublicclass CreateBankAccountCommand {@TargetAggregateIdentifier// 标记目标聚合根的标识符private final String accountId;// 账户IDprivate finaldoubleinitialDeposit;// 初始存款}

src/main/java/com/example/axondemo/command/DepositMoneyCommand.java

package com.example.axondemo.command;importlombok.Builder;importlombok.Data;importorg.axonframework.modelling.command.TargetAggregateIdentifier;@Data@Builderpublicclass DepositMoneyCommand {@TargetAggregateIdentifier// 标记目标聚合根的标识符private final String accountId;// 账户IDprivate finaldoubleamount;// 存款金额}

src/main/java/com/example/axondemo/command/WithdrawMoneyCommand.java

package com.example.axondemo.command;importlombok.Builder;importlombok.Data;importorg.axonframework.modelling.command.TargetAggregateIdentifier;@Data@Builderpublicclass WithdrawMoneyCommand {@TargetAggregateIdentifier// 标记目标聚合根的标识符private final String accountId;// 账户IDprivate finaldoubleamount;// 取款金额}

src/main/java/com/example/axondemo/controller/AccountController.java

package com.example.axondemo.controller;importcom.example.axondemo.command.*;importcom.example.axondemo.dto.CreateBankAccountRequest;importcom.example.axondemo.dto.DepositRequest;importcom.example.axondemo.dto.WithdrawRequest;importcom.example.axondemo.exception.InsufficientFundsException;importcom.example.axondemo.exception.InvalidAmountException;importcom.example.axondemo.repository.BankAccountRepository;importlombok.RequiredArgsConstructor;importorg.axonframework.commandhandling.gateway.CommandGateway;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.*;importjavax.validation.Valid;importjava.util.UUID;importjava.util.concurrent.CompletableFuture;@RestController@RequestMapping("/accounts")@RequiredArgsConstructorpublicclass AccountController {

    private final CommandGateway commandGateway;// 命令网关,用于发送命令private final BankAccountRepository bankAccountRepository;// 银行账户仓库// 创建账户@PostMapping("/")publicResponseEntity<String>createAccount(@Valid@RequestBodyCreateBankAccountRequest request){
        String accountId=UUID.randomUUID().toString();// 生成唯一的账户IDCompletableFuture<Object>future=commandGateway.send(CreateBankAccountCommand.builder().accountId(accountId).initialDeposit(request.getInitialDeposit()).build());returnfuture.thenApply(response->ResponseEntity.ok(accountId))// 成功时返回账户ID.exceptionally(ex->ResponseEntity.badRequest().body(ex.getMessage()))// 失败时返回错误信息.join();}// 存款@PostMapping("/{accountId}/deposit")publicResponseEntity<Void>deposit(@PathVariableString accountId,@Valid@RequestBodyDepositRequest request){
        CompletableFuture<Object>future=commandGateway.send(DepositMoneyCommand.builder().accountId(accountId).amount(request.getAmount()).build());returnfuture.thenApply(response->ResponseEntity.ok().<Void>build())// 成功时返回200 OK.exceptionally(ex->ResponseEntity.badRequest().body(null))// 失败时返回400 Bad Request.join();}// 取款@PostMapping("/{accountId}/withdraw")publicResponseEntity<Void>withdraw(@PathVariableString accountId,@Valid@RequestBodyWithdrawRequest request){
        CompletableFuture<Object>future=commandGateway.send(WithdrawMoneyCommand.builder().accountId(accountId).amount(request.getAmount()).build());returnfuture.thenApply(response->ResponseEntity.ok().<Void>build())// 成功时返回200 OK.exceptionally(ex->ResponseEntity.badRequest().body(null))// 失败时返回400 Bad Request.join();}// 查询账户余额@GetMapping("/{accountId}/balance")publicResponseEntity<Double>getBalance(@PathVariableString accountId){Doublebalance=bankAccountRepository.findById(accountId).map(it->it.getBalance()).orElse(0.0);// 获取账户余额returnResponseEntity.ok(balance);// 返回账户余额}
}

src/main/java/com/example/axondemo/dto/CreateBankAccountRequest.java

package com.example.axondemo.dto;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjavax.validation.constraints.DecimalMin;importjavax.validation.constraints.NotNull;@Data@NoArgsConstructor@AllArgsConstructorpublicclass CreateBankAccountRequest {@NotNull(message="初始存款不能为空")// 验证初始存款不为空@DecimalMin(value="0",message="初始存款必须非负")// 验证初始存款非负privatedoubleinitialDeposit;// 初始存款}

src/main/java/com/example/axondemo/dto/DepositRequest.java

package com.example.axondemo.dto;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjavax.validation.constraints.DecimalMin;importjavax.validation.constraints.NotNull;@Data@NoArgsConstructor@AllArgsConstructorpublicclass DepositRequest {@NotNull(message="金额不能为空")// 验证金额不为空@DecimalMin(value="0",message="金额必须非负")// 验证金额非负privatedoubleamount;// 存款金额}

src/main/java/com/example/axondemo/dto/WithdrawRequest.java

package com.example.axondemo.dto;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjavax.validation.constraints.DecimalMin;importjavax.validation.constraints.NotNull;@Data@NoArgsConstructor@AllArgsConstructorpublicclass WithdrawRequest {@NotNull(message="金额不能为空")// 验证金额不为空@DecimalMin(value="0",message="金额必须非负")// 验证金额非负privatedoubleamount;// 取款金额}

src/main/java/com/example/axondemo/event/BankAccountCreatedEvent.java

账户创建: 通过事件 BankAccountCreatedEvent 记录账户的初始状态。

package com.example.axondemo.event;importlombok.Builder;importlombok.Data;importorg.axonframework.serialization.Revision;@Data@Builder@Revision("1")publicclass BankAccountCreatedEvent {
    private final String accountId;// 账户IDprivate finaldoubleinitialDeposit;// 初始存款}

src/main/java/com/example/axondemo/event/MoneyDepositedEvent.java

存款和取款: 通过事件 MoneyDepositedEvent 和 MoneyWithdrewEvent 记录每一次的资金变动。

package com.example.axondemo.event;importlombok.Builder;importlombok.Data;importorg.axonframework.serialization.Revision;@Data@Builder@Revision("1")publicclass MoneyDepositedEvent {
    private final String accountId;// 账户IDprivate finaldoubleamount;// 存款金额}

src/main/java/com/example/axondemo/event/MoneyWithdrewEvent.java

package com.example.axondemo.event;importlombok.Builder;importlombok.Data;importorg.axonframework.serialization.Revision;@Data@Builder@Revision("1")publicclass MoneyWithdrewEvent {
    private final String accountId;// 账户IDprivate finaldoubleamount;// 取款金额}

src/main/java/com/example/axondemo/exception/InsufficientFundsException.java

package com.example.axondemo.exception;publicclass InsufficientFundsException extends RuntimeException {publicInsufficientFundsException(String message){
        super(message);}
}

src/main/java/com/example/axondemo/exception/InvalidAmountException.java

package com.example.axondemo.exception;publicclass InvalidAmountException extends RuntimeException {publicInvalidAmountException(String message){
        super(message);}
}

src/main/java/com/example/axondemo/exception/GlobalExceptionHandler.java

package com.example.axondemo.exception;importorg.springframework.http.HttpStatus;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.MethodArgumentNotValidException;importorg.springframework.web.bind.annotation.ExceptionHandler;importorg.springframework.web.bind.annotation.ResponseStatus;importorg.springframework.web.bind.annotation.RestControllerAdvice;importjava.util.HashMap;importjava.util.Map;@RestControllerAdvicepublicclass GlobalExceptionHandler {// 处理验证异常@ResponseStatus(HttpStatus.BAD_REQUEST)@ExceptionHandler(MethodArgumentNotValidException.class)publicMap<String,String>handleValidationExceptions(MethodArgumentNotValidException ex){
        Map<String,String>errors=new HashMap<>();ex.getBindingResult().getFieldErrors().forEach(error->errors.put(error.getField(),error.getDefaultMessage()));returnerrors;}// 处理非法参数异常@ExceptionHandler(IllegalArgumentException.class)publicResponseEntity<String>handleIllegalArgumentException(IllegalArgumentException ex){returnResponseEntity.badRequest().body(ex.getMessage());}// 处理资金不足异常@ExceptionHandler(InsufficientFundsException.class)publicResponseEntity<String>handleInsufficientFundsException(InsufficientFundsException ex){returnResponseEntity.status(HttpStatus.CONFLICT).body(ex.getMessage());}// 处理解析金额异常@ExceptionHandler(InvalidAmountException.class)publicResponseEntity<String>handleInvalidAmountException(InvalidAmountException ex){returnResponseEntity.badRequest().body(ex.getMessage());}
}

src/main/java/com/example/axondemo/projection/BankAccountProjection.java

余额查询: 使用投影类 BankAccountProjection 将事件转换为可供查询的数据视图。

package com.example.axondemo.projection;importcom.example.axondemo.event.BankAccountCreatedEvent;importcom.example.axondemo.event.MoneyDepositedEvent;importcom.example.axondemo.event.MoneyWithdrewEvent;importcom.example.axondemo.repository.BankAccountEntity;importcom.example.axondemo.repository.BankAccountRepository;importlombok.extern.slf4j.Slf4j;importorg.axonframework.eventhandling.EventHandler;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclass BankAccountProjection {@Autowiredprivate BankAccountRepository bankAccountRepository;// 银行账户仓库// 处理账户创建事件@EventHandlerpublicvoidon(BankAccountCreatedEvent event){
        BankAccountEntity bankAccountEntity=BankAccountEntity.builder().accountId(event.getAccountId()).balance(event.getInitialDeposit()).build();bankAccountRepository.save(bankAccountEntity);log.info("投影账户创建事件,账户ID: {}",event.getAccountId());}// 处理存款事件@EventHandlerpublicvoidon(MoneyDepositedEvent event){
        bankAccountRepository.findById(event.getAccountId()).ifPresentOrElse(bankAccountEntity->{
                            bankAccountEntity.setBalance(bankAccountEntity.getBalance()+event.getAmount());bankAccountRepository.save(bankAccountEntity);log.info("投影存款事件,账户ID: {}, 金额: {}",event.getAccountId(),event.getAmount());},()->log.error("未找到账户ID: {}",event.getAccountId()));}// 处理取款事件@EventHandlerpublicvoidon(MoneyWithdrewEvent event){
        bankAccountRepository.findById(event.getAccountId()).ifPresentOrElse(bankAccountEntity->{
                            bankAccountEntity.setBalance(bankAccountEntity.getBalance()-event.getAmount());bankAccountRepository.save(bankAccountEntity);log.info("投影取款事件,账户ID: {}, 金额: {}",event.getAccountId(),event.getAmount());},()->log.error("未找到账户ID: {}",event.getAccountId()));}
}

src/main/java/com/example/axondemo/repository/BankAccountEntity.java

package com.example.axondemo.repository;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;importjavax.persistence.Entity;importjavax.persistence.Id;@Entity@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclass BankAccountEntity {@Idprivate String accountId;// 账户IDprivatedoublebalance;// 账户余额}

src/main/java/com/example/axondemo/repository/BankAccountRepository.java

package com.example.axondemo.repository;importorg.springframework.data.jpa.repository.JpaRepository;publicinterface BankAccountRepository extends JpaRepository<BankAccountEntity,String>{
}

src/main/resources/application.yml

server:
  port:8080spring:
  datasource:
    url: jdbc:mysql://localhost:3306/banktest?useSSL=false&serverTimezone=UTCusername: root
    password:12345678jpa:
    hibernate:
      ddl-auto:updateshow-sql:truelogging:level:
    org.axonframework: INFO

src/main/java/com/example/axondemo/AxonDemoApplication.java

package com.example.axondemo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclass AxonDemoApplication {publicstatic void main(String[]args){
        SpringApplication.run(AxonDemoApplication.class,args);}
}

测试

创建账户

  • URL: http://localhost:8080/accounts/

  • Method: POST

  • Headers:

a.Content-Type: application/json

  • Body (raw, JSON):

{"initialDeposit":100}
  • Response Body:

9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1

存款

  • URL: http://localhost:8080/accounts/9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1/deposit

  • Method: POST

  • Headers:

  • Content-Type: application/json

  • Body (raw, JSON):

{"amount":50}
  • Status Code: 200 OK

  • Response Body: (空)

取款

  • URL: http://localhost:8080/accounts/9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1/withdraw

  • Method: POST

  • Headers:

a.Content-Type: application/json

  • Body (raw, JSON):

{"amount":30}
  • Status Code: 200 OK

  • Response Body: (空)

查询账户余额

  • URL: http://localhost:8080/accounts/9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1/balance

  • Method: GET

  • Status Code: 200 OK

  • Response Body:

120.0


54    2025-03-10 00:15:00    Axon 开源 框架