axon架构特征
领域驱动模型
命令查询责任隔离
事件驱动架构
事件驱动偏向事件,主要由command,event,query,aggregate,saga等概念组成,存储的是事件偏立体。
而数据库中存的是结果,更类似平面的数据。
axon server集群策略(不用EE的情况下)
方案1. server直接连同一个事件存储文件。客户端配置多个axon server服务器,收到重连事件后重新load event索引,快照索引,断掉以后重连会连到不同server上(没有办法使用)
docker 部署修axon集群
实现功能,axonserver 挂掉自动切换。
- 在axonserver-se源码内建dockerfile文件
1 | FROM openjdk:8-alpine |
- 项目目录下执行命令构建docker镜像
1 | docker build -t axonserver . |
- 启动镜像
1 | ➜ axon-server-se git:(master) ✗ docker images |
4 客户端配置文件对应修改为
1 | axon: |
下一阶段工作内容展望
在方案1的基础上加上主从
node之间数据同步
数据文件split及merge
重建索引速度提升
基版说明
Axon Server 4.5 Standard Edition Based
功能介绍
本方案依赖于Axon Server 4.5(SE)SPOF的解决方案。在方案设计中,将SPOF问题域划分为服务SPOF和数据SPOF两个问题子域分别解决。
本文仅描述服务SPOF的解决方案。
实现思路
客户端与server通信握手成功,会广播TopologyEvents.ApplicationConnected的消息通知,监听该消息重建EventStore存储索引。
源码改动
1 | @EventListener ① 监听消息 |
配置文件
1 | server: |
安装部署
清单
名称 说明 axonserver-4.5.jar axonserver 升级包 axonserver.yml 服务配置文件,与升级包同一目录 startup.sh 服务启动脚本,与升级包同一目录 服务规划及配置
node编号 server.port axoniq.axonserver.port node-01 8024 8124 node-02 8025 8125 node-03 8026 8126
本安装说明中默认配置:
axoniq.axonserver.event.storage = ../data/default/events
axoniq.axonserver.event.snapshot = ../data/default/snapshots
需要说明的是,该两项配置需要确保不同axonserver实例的配置保持绝对路径一致。
- 服务启动
在已安装Java环境的情况下,
linux环境下执行启动脚本 sh startup.sh;
windows环境下执行启动脚本 .\startup.bat
1 | nohup java -jar axonserver-4.5.jar & |
shutdown.sh
1 | kill -s 9 `lsof -t -i:8024` |
- 服务活跃状态监测
telnet ${ip} ${server.port}
axon组件自定义配置
定义聚合根
聚合根是整个Axon应用的核心业务承担组件,定义一个聚合根主要实现两方面的功能:
###funtion
定义一组CommandHandler,接收指定CommandMessage作用于Aggregate,自定义命令下达的业务逻辑并生成语义唯一性的EvntMessage
定义一组EventSourcingHandler,接收EventMessage作用于Aggregate,自定义事件溯源的业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54@NoArgsConstructor ① 强制要求提供Aggregate的无参构造函数
@Aggregate ② 声明一个聚合根,创建有继承关系的Aggregate时,父类声明无效
public class ExchangeStockIstrAggr extends InstructionAggregate {
@CommandHandler ③ 声明一个**构造子**CommandHandler
public ExchangeStockIstrAggr(ESCreateIstrCmd cmd) {
ESIstrCreatedEvt esIstrCreatedEvt = new ESIstrCreatedEvt();
esIstrCreatedEvt.setId(cmd.getId());
esIstrCreatedEvt.setAccountId(cmd.getAccountId());
esIstrCreatedEvt.setQuantity(cmd.getQuantity());
esIstrCreatedEvt.setSecurityCode(cmd.getSecurityCode());
esIstrCreatedEvt.setTradeType(cmd.getTradeType());
esIstrCreatedEvt.setUnitId(cmd.getUnitId());
esIstrCreatedEvt.setUserId(cmd.getUserId());
AggregateLifecycle.apply(esIstrCreatedEvt); ④ Command发送成功一定会同时生成一条操作日志,
该日志强制要求以EventMessage的形式记录下来
}
@EventSourcingHandler ⑤ 声明指定类型EventMessage在溯源时的逻辑
public void on(ESIstrCreatedEvt evt) {
setId(evt.getId());
setInstructionState(new CreatedInstructionState());
setAccountId(evt.getAccountId());
setUnitId(evt.getUnitId());
IstrTradeElement istrTradeElement = new IstrTradeElement();
istrTradeElement.setQuantity(evt.getQuantity());
istrTradeElement.setSecurityCode(evt.getSecurityCode());
istrTradeElement.setTradeType(evt.getTradeType());
setIstrTradeElement(istrTradeElement);
setUserId(evt.getUserId());
}
@CommandHandler ⑥ 声明一个**非构造子**CommandHandler
public void handle(ESCancelIstrCmd cmd) {
...
}
@EventSourcingHandler ⑦ CommandHandler和EventSourcingHandler推荐成对儿出现
public void on(ESIstrCancellingEvt evt) {
...
}
}
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class DomainAggregate {
@AggregateIdentifier ⑧ 声明聚合根唯一标识,该标识**全局唯一**
private String id;
}
聚合根资源库
在Axon中,聚合根承担的职责主要有两个:
###如下
接收CommandBus中的命令(Command)完成聚合根状态的更新,形成操作日志(Event)并将其发送至EventBus
在溯源阶段,接收资源库加载的操作日志**重塑聚合根的最新状态
聚合根资源库的职责在于提供完整可靠的操作日志,并按照有序追加的方式永久的保存下来,但需要注意的是,聚合根资源库只提供资源保存和加载的抽象接口,而具体的功能实现则被委托给Event Store,如下是一个样例:1
2
3
4Class aggregateType = ClassUtil.instance(aggregateQualifiedName);
EventSourcingRepository
.builder(aggregateType)
.build();
快照策略
Event快照(Snapshot)是为解决Event数量大时溯源变慢的性能问题产生的解决方案,快照策略则对快照产生的时机、触发机制所做的代码描述。
在存在Snapshot的情况下,重塑聚合根状态只需加载最新的Snapshot和该快照之后所产生的的Event。Axon提供了SnapshotTriggerDefinition接口定义快照触发时间,并提供了开箱即用的基于事件数量为触发条件的实现方案EventCountSnapshotTriggerDefinition,入参snapshotThreshold为生成快照的聚合根事件事件阈值限制:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public SnapshotTriggerDefinition snapshotTrigger(
Snapshotter snapshotter,
@Value("${axon.snapshot.event.count.threshold}") int snapshotThreshold) {
return new EventCountSnapshotTriggerDefinition(snapshotter, snapshotThreshold);
}
@Bean
public SpringAggregateSnapshotterFactoryBean snapshotter() {
SpringAggregateSnapshotterFactoryBean bean = new SpringAggregateSnapshotterFactoryBean();
bean.setExecutor(Executors.newSingleThreadExecutor());
return bean;
}
完成快照的配置前提下,需要在聚合根定义中做声明:
1
2
3
4
@Aggregate(snapshotTriggerDefinition = "snapshotTrigger")
public class ExchangeStockIstrAggr extends InstructionAggregate {
...
}
消息拦截
在Axon实现了Message在发送端和处理端(Handler)的透明传送:发送端通过xxxGateway或者xxxBus发送Message,并不关心发送给谁;于此同时,xxxHandler收到xxxBus转送的Message而并不关心该Message是谁发出的,是一个异步解耦的过程。Axon另外两种Message(Event、Query)的发送机制于此相似,区别在于Command及Query的转发在Bus层面做了LoadBanlance,而Event并没有,该过程被称为Message Routing。Axon可以在发送的不同阶段提供消息拦截以完成信息的更新。DispatchInterceptor和HandlerInterceptor是两个可插拔的拦截组件,前者在Message Routing到xxxBus前拦截,后者在处理端接收Message前进行拦截处理,如下是一个CommandMessage Interceptor配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void registerInterceptor(CommandBus commandBus) {
commandBus.registerDispatchInterceptor(new CommandDispatchInterceptor());
commandBus.registerHandlerInterceptor(new CommandHandlerInterceptor());
}
public class CommandDispatchInterceptor implements MessageDispatchInterceptor<CommandMessage<?>> {
@Override
public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(List<? extends CommandMessage<?>> messages) {
return (index, command) -> {
...
return command;
};
}
}
public class CommandHandlerInterceptor implements MessageHandlerInterceptor<CommandMessage<?>> {
@Override
public Object handle(UnitOfWork<? extends CommandMessage<?>> unitOfWork, InterceptorChain interceptorChain) throws Exception {
...
return interceptorChain.proceed();
}
}
CommandGateway重试机制
当Command执行失败时,可以配置重新调度的。Axon使用接口RetryScheduler定义重试策略。ExponentialBackOffIntervalRetryScheduler提供重试时间间隔按指数动态增长的重试策略实现,IntervalRetryScheduler提供按固定时间重试的策略实现。RetryScheduler终止条件是Command重试发送成功或者重试次数达到重试策略定义的最大重试次数,如下自定义IntervalRetryScheduler配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public CommandGateway commandGatewayWithRetry(
CommandBus commandBus,
@Value("${axon.command.retry.executor.count}") int retryExecutorCount,
@Value("${axon.command.retry.interval}") int retryInterval,
@Value("${axon.command.retry.max-count}") int maxRetryCount){
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(retryExecutorCount);
RetryScheduler rs = IntervalRetryScheduler
.builder()
.retryExecutor(scheduledExecutorService)
.maxRetryCount(maxRetryCount)
.retryInterval(retryInterval)
.build();
CommandGateway commandGateway = DefaultCommandGateway
.builder()
.commandBus(commandBus)
.retryScheduler(rs)
.build();
return commandGateway;
}
public CommandGateway commandGatewayWithRetry(
CommandBus commandBus,
@Value("${axon.command.retry.executor.count}") int retryExecutorCount,
@Value("${axon.command.retry.interval}") int retryInterval,
@Value("${axon.command.retry.backoffFactor}") int backoffFactor){
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(retryExecutorCount);
RetryScheduler rs = ExponentialBackOffIntervalRetryScheduler
.builder()
.backoffFactor(backoffFactor)
.build();
CommandGateway commandGateway = DefaultCommandGateway
.builder()
.commandBus(commandBus)
.retryScheduler(rs)
.build();
return commandGateway;
}
需要注意的是,只有运行时异常RuntimeException才会触发重试机制,可检查异常被认为是业务异常而永远不会触发重试机制。
聚合根状态机
saga自定义事务
Saga是一个长活事务可被分解成可以交错运行的子事务集合。
- 向后补偿
- 向前撤销
axon实现的saga
axon用的是向前撤销方式,把对应得操作写个补偿命令,然后认为补偿命令的事件一定返回成功,
saga内可以存数据,通过持久化到数据库中来保持saga状态。
基础用法
https://www.processon.com/view/link/5f61676d0791295dccbf246b
1 |
|
自己封装的saga
saga可以被分成交错的子事务,我们认为saga是由多个事务组成的。
所以需要封装出一个事务接口,根据事务的概念可以知道有以下方法。
- 事务有成功失败两种状态
- 每个事务的开始事件
- 事务都支持回滚
- 事务需要处理不同事件
- 每个事务能处理的事件集合
- 每个事务结束时需返回值填充整体saga结果
1 | public enum SagaStatus { |
1 | public interface ITransaction { |
事务可以区分为单个实际的事务也和多个子事务的组合事务。
单个的事务需要有自己的状态。
1 | public abstract class TransactionUnit implements ITransaction { |
组合的事务需知道自己到底由哪些事务组成,都处理哪些事件,以及每个子事务返回结果
1 | public abstract class TransactionGroup implements ITransaction { |
组合事务分为串行和并行两种。
串行事务需要有一个当前执行哪个事务的记录。
事务状态第一个失败为失败,最后一个成功为成功
事务整体执行时只需要从前往后执行
事务整体回滚时需要从后往前回滚
当前事务执行完事件触发下一个事务的执行,回滚时相反。
1 | public class SerialTransaction extends TransactionGroup { |
并行事务需要知道事件对应的事务
事务状态为全部成功或全部失败,如果没有全部结束则为空
开始事件为子事务的开始,回滚事件为以成功子事务的回滚
事务执行的判断为所有事件结束,且全部为成功或全部为失败,如果有失败则回滚
1 | public class ParallelTransaction extends TransactionGroup { |
读写分离处理
查询服务监听聚合根发出的事件,拼成业务需要的数据格式,存进数据库,可以根据不同的业务来拆分、组合事件。
网关对接
网关对接包括两种情况,目前群济的流程包括
系统发给外部系统的请求
- 请求数据验证
- 记录请求参数流水数据
- 命令与外部系统数据转换
- 发送给外部系统
- 记录返回参数流水数据
- 返回结果转换成事件
外部系统通过各种渠道(mq)返回给交易系统的结果。
- 通过mq收到方达返回消息
- 在流水表里找到对应聚合根
- 与当前数据对比判断业务流程
- 转换成对应的event
- 系统监听对应的event转换成聚合根的command
外部系统主键和聚合根对应关系
目前系统有三套id
1. 外部系统的id
2. 交易系统的聚合根id
3. 数据库对应的主键id
网关服务交易流水中记录三者的关系,聚合根为数据库中的业务主键(指令id),且都会冗余存外部系统的id
现在做网关服务时候对聚合根利用过少,还是依靠数据库来做的状态判断,没有用到聚合根的状态机,需要从数据库中查询出原始数据,然后比对,逻辑都写到判断里,由于数据查询和修改在一个方法里就会有数据不一致的风险。
对接步骤
- 委托id和聚合根之间没有直接关系,修改后新创建一条报价,原委托报价作废。而聚合根不变。
- 质押式回购代码现在每个流程都分7步。
- 初始化
- 验证参数
- 落库(网关服务没有)
- 组装与外部系统对接的参数
- 发送请求
- 成功处理
- 失败处理
axon+cucumber单元测试
Command/Event测试
- 构造测试组件
1 | public class AggregateEndpoint { |
- cucumber测试因子注入
1 | @Test |
- cucumber行为驱动测试
1 | public class AggrStepDefinitions extends AggregateEndpoint{ |
Saga测试
- 构造测试组件
1 | public class SagaEndpoint { |
- cucumber测试因子注入
参考Command/Event测试
- cucumber行为驱动测试
参考Command/Event测试
事件异步转同步
栅栏 请求开始创建锁,结束时候释放锁。
1 | @Slf4j |
axon还缺哪些东西
安全配置
https://www.infoq.cn/article/fcrbkbkvath5fnfjvk1o
事件处理器
订阅处理器,订阅事件流,从订阅那一刻起处理事件。
跟踪处理器,会自动跟踪处理进度,默认从一开始重放事件存储中的所有事件。
1 | @Autowired |
未解决的代码问题
- common basic 中的 response/pledgerepo文件夹挪走
- common basic 中的 response/risk文件夹挪走
- saga中的静态commandgateway 需要改成接口,可测试的类。
- 异步转同步时候map操作同步操作。
- 带业务的公共类 (券信息)单提出一个包放
- Post link: http://dongkw.github.io/2021/01/25/%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A1%86%E6%9E%B6%E6%80%BB%E7%BB%93%E6%96%87%E6%A1%A3/
- Copyright Notice: All articles in this blog are licensed under unless stating additionally.
若没有本文 Issue,您可以使用 Comment 模版新建。
GitHub Issues