axon架构特征
领域驱动模型
命令查询责任隔离
事件驱动架构
事件驱动偏向事件,主要由command,event,query,aggregate,saga等概念组成,存储的是事件偏立体。
而数据库中存的是结果,更类似平面的数据。
axon server集群策略(不用EE的情况下)
方案1. server直接连同一个事件存储文件。客户端配置多个axon server服务器,收到重连事件后重新load event索引,快照索引,断掉以后重连会连到不同server上(没有办法使用)
docker 部署修axon集群
实现功能,axonserver 挂掉自动切换。
- 在axonserver-se源码内建dockerfile文件
| 1 | FROM openjdk:8-alpine | 
- 项目目录下执行命令构建docker镜像
| 1 | docker build -t axonserver . | 
- 启动镜像
| 1 | ➜ axon-server-se git:(master) ✗ docker images | 
4 客户端配置文件对应修改为
| 1 | axon: | 
下一阶段工作内容展望
- 在方案1的基础上加上主从 
- node之间数据同步 
- 数据文件split及merge 
- 重建索引速度提升 
基版说明
Axon Server 4.5 Standard Edition Based功能介绍
本方案依赖于Axon Server 4.5(SE)SPOF的解决方案。在方案设计中,将SPOF问题域划分为服务SPOF和数据SPOF两个问题子域分别解决。
本文仅描述服务SPOF的解决方案。实现思路
客户端与server通信握手成功,会广播TopologyEvents.ApplicationConnected的消息通知,监听该消息重建EventStore存储索引。源码改动
| 1 | @EventListener ① 监听消息 | 
配置文件
| 1 | server: | 
安装部署
- 清单 - 名称 - 说明 - axonserver-4.5.jar - axonserver 升级包 - axonserver.yml - 服务配置文件,与升级包同一目录 - startup.sh - 服务启动脚本,与升级包同一目录 
- 服务规划及配置 - node编号 - server.port - axoniq.axonserver.port - node-01 - 8024 - 8124 - node-02 - 8025 - 8125 - node-03 - 8026 - 8126 
本安装说明中默认配置:
    axoniq.axonserver.event.storage  =  ../data/default/events
    axoniq.axonserver.event.snapshot =  ../data/default/snapshots
需要说明的是,该两项配置需要确保不同axonserver实例的配置保持绝对路径一致。
- 服务启动
在已安装Java环境的情况下,
linux环境下执行启动脚本 sh startup.sh;
windows环境下执行启动脚本 .\startup.bat
| 1 | nohup java -jar axonserver-4.5.jar & | 
shutdown.sh
| 1 | kill -s 9 `lsof -t -i:8024` | 
- 服务活跃状态监测
telnet ${ip} ${server.port}
axon组件自定义配置
定义聚合根
聚合根是整个Axon应用的核心业务承担组件,定义一个聚合根主要实现两方面的功能:
###funtion
- 定义一组CommandHandler,接收指定CommandMessage作用于Aggregate,自定义命令下达的业务逻辑并生成语义唯一性的EvntMessage 
- 定义一组EventSourcingHandler,接收EventMessage作用于Aggregate,自定义事件溯源的业务逻辑 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54- @NoArgsConstructor ① 强制要求提供Aggregate的无参构造函数 
 @Aggregate ② 声明一个聚合根,创建有继承关系的Aggregate时,父类声明无效
 public class ExchangeStockIstrAggr extends InstructionAggregate {
 @CommandHandler ③ 声明一个**构造子**CommandHandler
 public ExchangeStockIstrAggr(ESCreateIstrCmd cmd) {
 ESIstrCreatedEvt esIstrCreatedEvt = new ESIstrCreatedEvt();
 esIstrCreatedEvt.setId(cmd.getId());
 esIstrCreatedEvt.setAccountId(cmd.getAccountId());
 esIstrCreatedEvt.setQuantity(cmd.getQuantity());
 esIstrCreatedEvt.setSecurityCode(cmd.getSecurityCode());
 esIstrCreatedEvt.setTradeType(cmd.getTradeType());
 esIstrCreatedEvt.setUnitId(cmd.getUnitId());
 esIstrCreatedEvt.setUserId(cmd.getUserId());
 AggregateLifecycle.apply(esIstrCreatedEvt); ④ Command发送成功一定会同时生成一条操作日志,
 该日志强制要求以EventMessage的形式记录下来
 }
 @EventSourcingHandler ⑤ 声明指定类型EventMessage在溯源时的逻辑
 public void on(ESIstrCreatedEvt evt) {
 setId(evt.getId());
 setInstructionState(new CreatedInstructionState());
 setAccountId(evt.getAccountId());
 setUnitId(evt.getUnitId());
 IstrTradeElement istrTradeElement = new IstrTradeElement();
 istrTradeElement.setQuantity(evt.getQuantity());
 istrTradeElement.setSecurityCode(evt.getSecurityCode());
 istrTradeElement.setTradeType(evt.getTradeType());
 setIstrTradeElement(istrTradeElement);
 setUserId(evt.getUserId());
 }
 @CommandHandler ⑥ 声明一个**非构造子**CommandHandler
 public void handle(ESCancelIstrCmd cmd) {
 ...
 }
 @EventSourcingHandler ⑦ CommandHandler和EventSourcingHandler推荐成对儿出现
 public void on(ESIstrCancellingEvt evt) {
 ...
 }
 }
 @Getter
 @Setter
 @NoArgsConstructor
 @AllArgsConstructor
 public class DomainAggregate {
 @AggregateIdentifier ⑧ 声明聚合根唯一标识,该标识**全局唯一**
 private String id;
 }
聚合根资源库
在Axon中,聚合根承担的职责主要有两个:
###如下
- 接收CommandBus中的命令(Command)完成聚合根状态的更新,形成操作日志(Event)并将其发送至EventBus 
- 在溯源阶段,接收资源库加载的操作日志**重塑聚合根的最新状态 
 聚合根资源库的职责在于提供完整可靠的操作日志,并按照有序追加的方式永久的保存下来,但需要注意的是,聚合根资源库只提供资源保存和加载的抽象接口,而具体的功能实现则被委托给Event Store,如下是一个样例:- 1 
 2
 3
 4- Class aggregateType = ClassUtil.instance(aggregateQualifiedName); 
 EventSourcingRepository
 .builder(aggregateType)
 .build();
快照策略
Event快照(Snapshot)是为解决Event数量大时溯源变慢的性能问题产生的解决方案,快照策略则对快照产生的时机、触发机制所做的代码描述。
在存在Snapshot的情况下,重塑聚合根状态只需加载最新的Snapshot和该快照之后所产生的的Event。Axon提供了SnapshotTriggerDefinition接口定义快照触发时间,并提供了开箱即用的基于事件数量为触发条件的实现方案EventCountSnapshotTriggerDefinition,入参snapshotThreshold为生成快照的聚合根事件事件阈值限制:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public SnapshotTriggerDefinition snapshotTrigger(
		Snapshotter snapshotter, 
		@Value("${axon.snapshot.event.count.threshold}") int snapshotThreshold) {
    return new EventCountSnapshotTriggerDefinition(snapshotter, snapshotThreshold);
}
@Bean
public SpringAggregateSnapshotterFactoryBean snapshotter() {
	SpringAggregateSnapshotterFactoryBean bean = new SpringAggregateSnapshotterFactoryBean();
	bean.setExecutor(Executors.newSingleThreadExecutor());
    return bean;
}
完成快照的配置前提下,需要在聚合根定义中做声明:
1
2
3
4
@Aggregate(snapshotTriggerDefinition = "snapshotTrigger")
public class ExchangeStockIstrAggr extends InstructionAggregate {
...
}
消息拦截
在Axon实现了Message在发送端和处理端(Handler)的透明传送:发送端通过xxxGateway或者xxxBus发送Message,并不关心发送给谁;于此同时,xxxHandler收到xxxBus转送的Message而并不关心该Message是谁发出的,是一个异步解耦的过程。Axon另外两种Message(Event、Query)的发送机制于此相似,区别在于Command及Query的转发在Bus层面做了LoadBanlance,而Event并没有,该过程被称为Message Routing。Axon可以在发送的不同阶段提供消息拦截以完成信息的更新。DispatchInterceptor和HandlerInterceptor是两个可插拔的拦截组件,前者在Message Routing到xxxBus前拦截,后者在处理端接收Message前进行拦截处理,如下是一个CommandMessage Interceptor配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void registerInterceptor(CommandBus commandBus) {
	commandBus.registerDispatchInterceptor(new CommandDispatchInterceptor());	
	commandBus.registerHandlerInterceptor(new CommandHandlerInterceptor());
}
public class CommandDispatchInterceptor implements MessageDispatchInterceptor<CommandMessage<?>> {
   @Override
   public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(List<? extends CommandMessage<?>> messages) {
       return (index, command) -> {
       	...
           return command;
       };
   }
}
public class CommandHandlerInterceptor implements MessageHandlerInterceptor<CommandMessage<?>> {
   @Override
   public Object handle(UnitOfWork<? extends CommandMessage<?>> unitOfWork, InterceptorChain interceptorChain) throws Exception {
   	...
       return interceptorChain.proceed();
   }
}
CommandGateway重试机制
当Command执行失败时,可以配置重新调度的。Axon使用接口RetryScheduler定义重试策略。ExponentialBackOffIntervalRetryScheduler提供重试时间间隔按指数动态增长的重试策略实现,IntervalRetryScheduler提供按固定时间重试的策略实现。RetryScheduler终止条件是Command重试发送成功或者重试次数达到重试策略定义的最大重试次数,如下自定义IntervalRetryScheduler配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public CommandGateway commandGatewayWithRetry(
   		CommandBus commandBus,
   		@Value("${axon.command.retry.executor.count}") int retryExecutorCount,
   		@Value("${axon.command.retry.interval}") int retryInterval,
   		@Value("${axon.command.retry.max-count}") int maxRetryCount){		
       ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(retryExecutorCount);
       RetryScheduler rs = IntervalRetryScheduler
       		.builder()
       		.retryExecutor(scheduledExecutorService)
       		.maxRetryCount(maxRetryCount)
       		.retryInterval(retryInterval)
       		.build();
       CommandGateway commandGateway = DefaultCommandGateway
       		.builder()
       		.commandBus(commandBus)
       		.retryScheduler(rs)
       		.build();
       return commandGateway;
   }
public CommandGateway commandGatewayWithRetry(
   		CommandBus commandBus,
   		@Value("${axon.command.retry.executor.count}") int retryExecutorCount,
   		@Value("${axon.command.retry.interval}") int retryInterval,
   		@Value("${axon.command.retry.backoffFactor}") int backoffFactor){
       ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(retryExecutorCount);
	RetryScheduler rs = ExponentialBackOffIntervalRetryScheduler
		.builder()
		.backoffFactor(backoffFactor)
		.build();
	CommandGateway commandGateway = DefaultCommandGateway
       		.builder()
       		.commandBus(commandBus)
       		.retryScheduler(rs)
       		.build();
       return commandGateway;
       }
需要注意的是,只有运行时异常RuntimeException才会触发重试机制,可检查异常被认为是业务异常而永远不会触发重试机制。
聚合根状态机
saga自定义事务
Saga是一个长活事务可被分解成可以交错运行的子事务集合。
- 向后补偿
- 向前撤销
axon实现的saga
axon用的是向前撤销方式,把对应得操作写个补偿命令,然后认为补偿命令的事件一定返回成功,
saga内可以存数据,通过持久化到数据库中来保持saga状态。
基础用法
https://www.processon.com/view/link/5f61676d0791295dccbf246b
| 1 | 
 | 
自己封装的saga

saga可以被分成交错的子事务,我们认为saga是由多个事务组成的。
所以需要封装出一个事务接口,根据事务的概念可以知道有以下方法。
- 事务有成功失败两种状态
- 每个事务的开始事件
- 事务都支持回滚
- 事务需要处理不同事件
- 每个事务能处理的事件集合
- 每个事务结束时需返回值填充整体saga结果
| 1 | public enum SagaStatus { | 
| 1 | public interface ITransaction { | 
事务可以区分为单个实际的事务也和多个子事务的组合事务。
单个的事务需要有自己的状态。
| 1 | public abstract class TransactionUnit implements ITransaction { | 
组合的事务需知道自己到底由哪些事务组成,都处理哪些事件,以及每个子事务返回结果
| 1 | public abstract class TransactionGroup implements ITransaction { | 
组合事务分为串行和并行两种。
- 串行事务需要有一个当前执行哪个事务的记录。 
- 事务状态第一个失败为失败,最后一个成功为成功 
- 事务整体执行时只需要从前往后执行 
- 事务整体回滚时需要从后往前回滚 
- 当前事务执行完事件触发下一个事务的执行,回滚时相反。 
| 1 | public class SerialTransaction extends TransactionGroup { | 
- 并行事务需要知道事件对应的事务 
- 事务状态为全部成功或全部失败,如果没有全部结束则为空 
- 开始事件为子事务的开始,回滚事件为以成功子事务的回滚 
- 事务执行的判断为所有事件结束,且全部为成功或全部为失败,如果有失败则回滚 
| 1 | public class ParallelTransaction extends TransactionGroup { | 
读写分离处理
查询服务监听聚合根发出的事件,拼成业务需要的数据格式,存进数据库,可以根据不同的业务来拆分、组合事件。

网关对接
网关对接包括两种情况,目前群济的流程包括
- 系统发给外部系统的请求 - 请求数据验证
- 记录请求参数流水数据
- 命令与外部系统数据转换
- 发送给外部系统
- 记录返回参数流水数据
- 返回结果转换成事件
 
- 外部系统通过各种渠道(mq)返回给交易系统的结果。 - 通过mq收到方达返回消息
- 在流水表里找到对应聚合根
- 与当前数据对比判断业务流程
- 转换成对应的event
- 系统监听对应的event转换成聚合根的command
 
外部系统主键和聚合根对应关系
目前系统有三套id
1. 外部系统的id
2. 交易系统的聚合根id
3. 数据库对应的主键id网关服务交易流水中记录三者的关系,聚合根为数据库中的业务主键(指令id),且都会冗余存外部系统的id
现在做网关服务时候对聚合根利用过少,还是依靠数据库来做的状态判断,没有用到聚合根的状态机,需要从数据库中查询出原始数据,然后比对,逻辑都写到判断里,由于数据查询和修改在一个方法里就会有数据不一致的风险。
对接步骤
- 委托id和聚合根之间没有直接关系,修改后新创建一条报价,原委托报价作废。而聚合根不变。
- 质押式回购代码现在每个流程都分7步。- 初始化
- 验证参数
- 落库(网关服务没有)
- 组装与外部系统对接的参数
- 发送请求
- 成功处理
- 失败处理
 
axon+cucumber单元测试
Command/Event测试
- 构造测试组件
| 1 | public class AggregateEndpoint { | 
- cucumber测试因子注入
| 1 | @Test | 
- cucumber行为驱动测试
| 1 | public class AggrStepDefinitions extends AggregateEndpoint{ | 
Saga测试
- 构造测试组件
| 1 | public class SagaEndpoint { | 
- cucumber测试因子注入
参考Command/Event测试
- cucumber行为驱动测试
参考Command/Event测试
事件异步转同步
栅栏 请求开始创建锁,结束时候释放锁。
| 1 | @Slf4j | 
axon还缺哪些东西
安全配置
https://www.infoq.cn/article/fcrbkbkvath5fnfjvk1o
事件处理器
- 订阅处理器,订阅事件流,从订阅那一刻起处理事件。 
- 跟踪处理器,会自动跟踪处理进度,默认从一开始重放事件存储中的所有事件。 
| 1 | @Autowired | 
未解决的代码问题
- common basic 中的 response/pledgerepo文件夹挪走
- common basic 中的 response/risk文件夹挪走
- saga中的静态commandgateway 需要改成接口,可测试的类。
- 异步转同步时候map操作同步操作。
- 带业务的公共类 (券信息)单提出一个包放
- Post link: http://dongkw.github.io/2021/01/25/%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A1%86%E6%9E%B6%E6%80%BB%E7%BB%93%E6%96%87%E6%A1%A3/
- Copyright Notice: All articles in this blog are licensed under unless stating additionally.


若没有本文 Issue,您可以使用 Comment 模版新建。
GitHub Issues