axon架构特征

领域驱动模型

命令查询责任隔离

事件驱动架构

事件驱动偏向事件,主要由command,event,query,aggregate,saga等概念组成,存储的是事件偏立体。

而数据库中存的是结果,更类似平面的数据。

axon server集群策略(不用EE的情况下)

方案1. server直接连同一个事件存储文件。客户端配置多个axon server服务器,收到重连事件后重新load event索引,快照索引,断掉以后重连会连到不同server上(没有办法使用)

docker 部署修axon集群

实现功能,axonserver 挂掉自动切换。

  1. 在axonserver-se源码内建dockerfile文件
1
2
3
4
5
6
7
8
9
10
FROM openjdk:8-alpine

MAINTAINER dongkw

ADD /axonserver/target/axonserver-4.5-SNAPSHOT.jar app.jar

EXPOSE 8024 8124

ENTRYPOINT [ "java", "-jar","/app.jar" ]

  1. 项目目录下执行命令构建docker镜像
1
docker build -t axonserver  .
  1. 启动镜像
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
➜  axon-server-se git:(master) ✗ docker images                
REPOSITORY TAG IMAGE ID CREATED SIZE
axonserver latest 1911895acf6a 3 hours ago 191MB
openjdk 8-alpine a3562aa0b991 21 months ago 105MB


➜ axon-server-se git:(master) ✗ docker run -d -p 8024:8024 -p 8124:8124 -v /Users/dongkewei/work/trade/axon-server-se/dt/default:/data/default --name axonserver1 --rm axonserver:latest

abfc1c598f78eba64ecd16d5d6924c13cbfd95ff3298af1d5536a833b72c53d9

➜ axon-server-se git:(master) ✗ docker run -d -p 8025:8024 -p 8125:8124 -v /Users/dongkewei/work/trade/axon-server-se/dt/default:/data/default --name axonserver2 --rm axonserver:latest

9a748dad2a8819084525eeeff1ff695f81121ddffdc48fac9ea8f97b61d3a915

➜ axon-server-se git:(master) ✗ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9a748dad2a88 axonserver:latest "java -jar /app.jar" 24 seconds ago Up 23 seconds 0.0.0.0:8025->8024/tcp, 0.0.0.0:8125->8124/tcp axonserver2
abfc1c598f78 axonserver:latest "java -jar /app.jar" About a minute ago Up About a minute 0.0.0.0:8024->8024/tcp, 0.0.0.0:8124->8124/tcp axonserver1

4 客户端配置文件对应修改为

1
2
3
4
axon:
axonserver:
servers: localhost:8124,localhost:8125

下一阶段工作内容展望

  1. 在方案1的基础上加上主从

  2. node之间数据同步

  3. 数据文件split及merge

  4. 重建索引速度提升

基版说明

Axon Server 4.5 Standard Edition Based

功能介绍

本方案依赖于Axon Server 4.5(SE)SPOF的解决方案。在方案设计中,将SPOF问题域划分为服务SPOF和数据SPOF两个问题子域分别解决。
本文仅描述服务SPOF的解决方案。

实现思路

客户端与server通信握手成功,会广播TopologyEvents.ApplicationConnected的消息通知,监听该消息重建EventStore存储索引。

源码改动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@EventListener													① 监听消息
public void on(TopologyEvents.ApplicationConnected event) {
if( event.isProxied()) {
logger.info("Application re-connected via {}: {}, clientId = {}, clientStreamId = {}, context = {}",
event.getProxy(),
event.getComponentName(),
event.getClientId(),
event.getClientStreamId(),
event.getContext());
} else {
logger.info("Application re-connected: {}, clientId = {}, clientStreamId = {}, context = {}",
event.getComponentName(),
event.getClientId(),
event.getClientStreamId(),
event.getContext());
}
reloadService.reloadFromFile(); ② 重建索引(Event索引、快照索引)
}

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
server:
port: 8024

spring:
application:
name: Axon Server

jpa:
open-in-view: false
hibernate:
ddl-auto: none
use-new-id-generator-mappings: false

datasource:
url: jdbc:h2:retry:${axoniq.axonserver.controldb-path:./data}/axonserver-controldb

h2:
console:
enabled: true
path: /h2-console

logging:
level:
root: INFO
AUDIT: DEBUG
io.axoniq.axonserver.AxonServer: DEBUG
io.axoniq.axonserver.grpc.Gateway: DEBUG
io.axoniq.axonserver.logging: DEBUG
io.axoniq.axonserver.grpc.internal.MessagingClusterServer: DEBUG
org.springframework.boot.web.embedded.tomcat.TomcatWebServer: INFO
org.springframework.http.converter.json.Jackson2ObjectMapperBuilder: ERROR
org.hibernate.orm.deprecation: ERROR


management:
security:
enabled: false
endpoints:
web:
exposure:
include: metrics,info,health,loggers,prometheus,env

axoniq:
axonserver:
port: 8124
heartbeat:
enabled: true
#controldb-path: ../data
event:
storage: ../data/default/events
snapshot:
storage: ../data/default/snapshots

info:
app:
name: ${spring.application.name}
description: AxonIQ
#version: @project.version@


安装部署

  1. 清单

    名称 说明
    axonserver-4.5.jar axonserver 升级包
    axonserver.yml 服务配置文件,与升级包同一目录
    startup.sh 服务启动脚本,与升级包同一目录
  2. 服务规划及配置

    node编号 server.port axoniq.axonserver.port
    node-01 8024 8124
    node-02 8025 8125
    node-03 8026 8126

本安装说明中默认配置:
axoniq.axonserver.event.storage = ../data/default/events
axoniq.axonserver.event.snapshot = ../data/default/snapshots
需要说明的是,该两项配置需要确保不同axonserver实例的配置保持绝对路径一致。

  1. 服务启动

在已安装Java环境的情况下,
linux环境下执行启动脚本 sh startup.sh;
windows环境下执行启动脚本 .\startup.bat

1
2
nohup java -jar axonserver-4.5.jar &

shutdown.sh

1
kill -s 9 `lsof -t -i:8024`
  1. 服务活跃状态监测

telnet ${ip} ${server.port}

axon组件自定义配置

定义聚合根

聚合根是整个Axon应用的核心业务承担组件,定义一个聚合根主要实现两方面的功能:
###funtion

  • 定义一组CommandHandler,接收指定CommandMessage作用于Aggregate,自定义命令下达的业务逻辑并生成语义唯一性的EvntMessage

  • 定义一组EventSourcingHandler,接收EventMessage作用于Aggregate,自定义事件溯源的业务逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    @NoArgsConstructor                                                     ① 强制要求提供Aggregate的无参构造函数
    @Aggregate ② 声明一个聚合根,创建有继承关系的Aggregate时,父类声明无效
    public class ExchangeStockIstrAggr extends InstructionAggregate {

    @CommandHandler ③ 声明一个**构造子**CommandHandler
    public ExchangeStockIstrAggr(ESCreateIstrCmd cmd) {
    ESIstrCreatedEvt esIstrCreatedEvt = new ESIstrCreatedEvt();
    esIstrCreatedEvt.setId(cmd.getId());
    esIstrCreatedEvt.setAccountId(cmd.getAccountId());
    esIstrCreatedEvt.setQuantity(cmd.getQuantity());
    esIstrCreatedEvt.setSecurityCode(cmd.getSecurityCode());
    esIstrCreatedEvt.setTradeType(cmd.getTradeType());
    esIstrCreatedEvt.setUnitId(cmd.getUnitId());
    esIstrCreatedEvt.setUserId(cmd.getUserId());
    AggregateLifecycle.apply(esIstrCreatedEvt); ④ Command发送成功一定会同时生成一条操作日志,
    该日志强制要求以EventMessage的形式记录下来
    }

    @EventSourcingHandler ⑤ 声明指定类型EventMessage在溯源时的逻辑
    public void on(ESIstrCreatedEvt evt) {
    setId(evt.getId());
    setInstructionState(new CreatedInstructionState());
    setAccountId(evt.getAccountId());
    setUnitId(evt.getUnitId());
    IstrTradeElement istrTradeElement = new IstrTradeElement();
    istrTradeElement.setQuantity(evt.getQuantity());
    istrTradeElement.setSecurityCode(evt.getSecurityCode());
    istrTradeElement.setTradeType(evt.getTradeType());
    setIstrTradeElement(istrTradeElement);
    setUserId(evt.getUserId());
    }

    @CommandHandler ⑥ 声明一个**非构造子**CommandHandler
    public void handle(ESCancelIstrCmd cmd) {
    ...
    }

    @EventSourcingHandler ⑦ CommandHandler和EventSourcingHandler推荐成对儿出现
    public void on(ESIstrCancellingEvt evt) {
    ...
    }
    }

    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    public class DomainAggregate {

    @AggregateIdentifier ⑧ 声明聚合根唯一标识,该标识**全局唯一**
    private String id;

    }

聚合根资源库

在Axon中,聚合根承担的职责主要有两个:
###如下

  • 接收CommandBus中的命令(Command)完成聚合根状态的更新,形成操作日志(Event)并将其发送至EventBus

  • 在溯源阶段,接收资源库加载的操作日志**重塑聚合根的最新状态
    聚合根资源库的职责在于提供完整可靠的操作日志,并按照有序追加的方式永久的保存下来,但需要注意的是,聚合根资源库只提供资源保存和加载的抽象接口,而具体的功能实现则被委托给Event Store,如下是一个样例:

    1
    2
    3
    4
    Class aggregateType = ClassUtil.instance(aggregateQualifiedName);
    EventSourcingRepository
    .builder(aggregateType)
    .build();

快照策略

Event快照(Snapshot)是为解决Event数量大时溯源变慢的性能问题产生的解决方案,快照策略则对快照产生的时机触发机制所做的代码描述。
在存在Snapshot的情况下,重塑聚合根状态只需加载最新的Snapshot和该快照之后所产生的的Event。Axon提供了SnapshotTriggerDefinition接口定义快照触发时间,并提供了开箱即用的基于事件数量为触发条件的实现方案EventCountSnapshotTriggerDefinition,入参snapshotThreshold为生成快照的聚合根事件事件阈值限制:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public SnapshotTriggerDefinition snapshotTrigger(
Snapshotter snapshotter,
@Value("${axon.snapshot.event.count.threshold}") int snapshotThreshold) {
return new EventCountSnapshotTriggerDefinition(snapshotter, snapshotThreshold);
}

@Bean
public SpringAggregateSnapshotterFactoryBean snapshotter() {
SpringAggregateSnapshotterFactoryBean bean = new SpringAggregateSnapshotterFactoryBean();
bean.setExecutor(Executors.newSingleThreadExecutor());
return bean;
}

完成快照的配置前提下,需要在聚合根定义中做声明:

1
2
3
4
@Aggregate(snapshotTriggerDefinition = "snapshotTrigger")
public class ExchangeStockIstrAggr extends InstructionAggregate {
...
}

消息拦截

在Axon实现了Message在发送端和处理端(Handler)的透明传送:发送端通过xxxGateway或者xxxBus发送Message,并不关心发送给谁;于此同时,xxxHandler收到xxxBus转送的Message而并不关心该Message是谁发出的,是一个异步解耦的过程。Axon另外两种Message(Event、Query)的发送机制于此相似,区别在于Command及Query的转发在Bus层面做了LoadBanlance,而Event并没有,该过程被称为Message Routing。Axon可以在发送的不同阶段提供消息拦截以完成信息的更新。DispatchInterceptor和HandlerInterceptor是两个可插拔的拦截组件,前者在Message Routing到xxxBus前拦截,后者在处理端接收Message前进行拦截处理,如下是一个CommandMessage Interceptor配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void registerInterceptor(CommandBus commandBus) {
commandBus.registerDispatchInterceptor(new CommandDispatchInterceptor());
commandBus.registerHandlerInterceptor(new CommandHandlerInterceptor());
}

public class CommandDispatchInterceptor implements MessageDispatchInterceptor<CommandMessage<?>> {

@Override
public BiFunction<Integer, CommandMessage<?>, CommandMessage<?>> handle(List<? extends CommandMessage<?>> messages) {
return (index, command) -> {
...
return command;
};
}
}

public class CommandHandlerInterceptor implements MessageHandlerInterceptor<CommandMessage<?>> {

@Override
public Object handle(UnitOfWork<? extends CommandMessage<?>> unitOfWork, InterceptorChain interceptorChain) throws Exception {
...
return interceptorChain.proceed();
}
}

CommandGateway重试机制

当Command执行失败时,可以配置重新调度的。Axon使用接口RetryScheduler定义重试策略。ExponentialBackOffIntervalRetryScheduler提供重试时间间隔按指数动态增长的重试策略实现,IntervalRetryScheduler提供按固定时间重试的策略实现。RetryScheduler终止条件是Command重试发送成功或者重试次数达到重试策略定义的最大重试次数,如下自定义IntervalRetryScheduler配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public CommandGateway commandGatewayWithRetry(
CommandBus commandBus,
@Value("${axon.command.retry.executor.count}") int retryExecutorCount,
@Value("${axon.command.retry.interval}") int retryInterval,
@Value("${axon.command.retry.max-count}") int maxRetryCount){
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(retryExecutorCount);
RetryScheduler rs = IntervalRetryScheduler
.builder()
.retryExecutor(scheduledExecutorService)
.maxRetryCount(maxRetryCount)
.retryInterval(retryInterval)
.build();
CommandGateway commandGateway = DefaultCommandGateway
.builder()
.commandBus(commandBus)
.retryScheduler(rs)
.build();
return commandGateway;
}

public CommandGateway commandGatewayWithRetry(
CommandBus commandBus,
@Value("${axon.command.retry.executor.count}") int retryExecutorCount,
@Value("${axon.command.retry.interval}") int retryInterval,
@Value("${axon.command.retry.backoffFactor}") int backoffFactor){
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(retryExecutorCount);
RetryScheduler rs = ExponentialBackOffIntervalRetryScheduler
.builder()
.backoffFactor(backoffFactor)
.build();

CommandGateway commandGateway = DefaultCommandGateway
.builder()
.commandBus(commandBus)
.retryScheduler(rs)
.build();
return commandGateway;
}

需要注意的是,只有运行时异常RuntimeException才会触发重试机制,可检查异常被认为是业务异常而永远不会触发重试机制。

聚合根状态机

saga自定义事务

git地址

Saga是一个长活事务可被分解成可以交错运行的子事务集合。

  1. 向后补偿
  2. 向前撤销

axon实现的saga

axon用的是向前撤销方式,把对应得操作写个补偿命令,然后认为补偿命令的事件一定返回成功,

saga内可以存数据,通过持久化到数据库中来保持saga状态。

基础用法
https://www.processon.com/view/link/5f61676d0791295dccbf246b

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

@Saga
public class CreateSaga {

private CreateEvent createEvent;

@Autowired
private transient CommandGateway commandGateway;

@StartSaga
@SagaEventHandler(associationProperty = "id")
public void startSaga(CreateEvent event) {

this.createEvent = event;
CmplCmd cmd = new CmplCmd();
cmd.setId(event.getId());
commandGateway.send(cmd);

}
@EndSaga
@SagaEventHandler(associationProperty = "id")
public void handler(CmplFailEvt evt) {
FailCmd cmd = new FailCmd();
cmd.setId(this.createEvent.getId());
commandGateway.send(cmd);
}

@SagaEventHandler(associationProperty = "id")
public void handler(CmplSuccEvt evt) {
ConfirmCmd cmd = new ConfirmCmd();
cmd.setId(evt.getId());
commandGateway.send(cmd);
SagaLifecycle.end();

}

}

自己封装的saga

图片

saga可以被分成交错的子事务,我们认为saga是由多个事务组成的。

所以需要封装出一个事务接口,根据事务的概念可以知道有以下方法。

  1. 事务有成功失败两种状态
  2. 每个事务的开始事件
  3. 事务都支持回滚
  4. 事务需要处理不同事件
  5. 每个事务能处理的事件集合
  6. 每个事务结束时需返回值填充整体saga结果
1
2
3
public enum SagaStatus {
SUCCESS, FAIL
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ITransaction {

SagaStatus getStatus();

List<Class> getEventRegList();

void start();

void rollback();

void eventHandler(Object event);

Object fill(Object cmd);
}

事务可以区分为单个实际的事务也和多个子事务的组合事务。

单个的事务需要有自己的状态。

1
2
3
4
5
6
7
8
9
10
public abstract class TransactionUnit implements ITransaction {

protected SagaStatus sagaStatus;

@Override
public SagaStatus getStatus() {
return sagaStatus;
}
}

组合的事务需知道自己到底由哪些事务组成,都处理哪些事件,以及每个子事务返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class TransactionGroup implements ITransaction {

protected List<ITransaction> transactions;

public void setTransactions(List<ITransaction> transaction) {
this.transactions = transaction;
}
@Override
public List<Class> getEventRegList() {
return transactions.stream().flatMap(t -> t.getEventRegList().stream()).collect(Collectors.toList());
}
@Override
public Object fill(Object cmd) {
transactions.forEach(t -> t.fill(cmd));
return cmd;
}
}

组合事务分为串行和并行两种。

  1. 串行事务需要有一个当前执行哪个事务的记录。

  2. 事务状态第一个失败为失败,最后一个成功为成功

  3. 事务整体执行时只需要从前往后执行

  4. 事务整体回滚时需要从后往前回滚

  5. 当前事务执行完事件触发下一个事务的执行,回滚时相反。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SerialTransaction extends TransactionGroup {

int curIndex;

@Override
public SagaStatus getStatus() {
if (Objects.equals(transactions.get(0).getStatus(), SagaStatus.FAIL)) {
return SagaStatus.FAIL;
} else if (Objects.equals(transactions.get(transactions.size() - 1).getStatus(), SagaStatus.SUCCESS)) {
return SagaStatus.SUCCESS;
} else {
return null;
}
}

@Override
public void start() {
transactions.get(curIndex).start();
}

@Override
public void rollback() {
transactions.get(curIndex).rollback();
}

@Override
public void eventHandler(Object event) {
ITransaction transaction = transactions.get(curIndex);
transaction.eventHandler(event);
if (transaction.getStatus().equals(SagaStatus.SUCCESS)) {
if (curIndex < transactions.size() - 1) {
transactions.get(++curIndex).start();
}
} else {
if (curIndex > 0) {
transactions.get(--curIndex).rollback();
}
}
}
}

  1. 并行事务需要知道事件对应的事务

  2. 事务状态为全部成功或全部失败,如果没有全部结束则为空

  3. 开始事件为子事务的开始,回滚事件为以成功子事务的回滚

  4. 事务执行的判断为所有事件结束,且全部为成功或全部为失败,如果有失败则回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ParallelTransaction extends TransactionGroup {


public Map<Class, ITransaction> getMap() {
Map<Class, ITransaction> map = new HashMap<>();
transactions.forEach(t -> t.getEventRegList().forEach(e -> map.put(e, t)));
return map;
}

@Override
public SagaStatus getStatus() {

if (transactions.stream().allMatch(t -> Objects.equals(t.getStatus(), SagaStatus.SUCCESS))) {
return SagaStatus.SUCCESS;
} else if (transactions.stream().allMatch(t -> Objects.equals(t.getStatus(), SagaStatus.FAIL))) {
return SagaStatus.FAIL;
} else {
return null;
}
}

@Override
public void start() {
transactions.forEach(t -> t.start());
}

@Override
public void rollback() {
transactions.stream().filter(t ->
Objects.equals(t.getStatus(), SagaStatus.SUCCESS))
.forEach(t -> t.rollback());
}

@Override
public void eventHandler(Object event) {
ITransaction transaction = getMap().get(event.getClass());
transaction.eventHandler(event);
if (transactions.stream().allMatch(t -> Objects.nonNull(t.getStatus()))) {
if (transactions.stream().anyMatch(t -> Objects.equals(t.getStatus(), SagaStatus.FAIL))) {
rollback();
}
}
}

}

读写分离处理

查询服务监听聚合根发出的事件,拼成业务需要的数据格式,存进数据库,可以根据不同的业务来拆分、组合事件。

查询服务流程代码设计

查询服务流程图

网关对接

网关对接包括两种情况,目前群济的流程包括

  1. 系统发给外部系统的请求

    1. 请求数据验证
    2. 记录请求参数流水数据
    3. 命令与外部系统数据转换
    4. 发送给外部系统
    5. 记录返回参数流水数据
    6. 返回结果转换成事件
  2. 外部系统通过各种渠道(mq)返回给交易系统的结果。

    1. 通过mq收到方达返回消息
    2. 在流水表里找到对应聚合根
    3. 与当前数据对比判断业务流程
    4. 转换成对应的event
    5. 系统监听对应的event转换成聚合根的command

外部系统主键和聚合根对应关系

目前系统有三套id

1. 外部系统的id
2. 交易系统的聚合根id
3. 数据库对应的主键id

网关服务交易流水中记录三者的关系,聚合根为数据库中的业务主键(指令id),且都会冗余存外部系统的id

现在做网关服务时候对聚合根利用过少,还是依靠数据库来做的状态判断,没有用到聚合根的状态机,需要从数据库中查询出原始数据,然后比对,逻辑都写到判断里,由于数据查询和修改在一个方法里就会有数据不一致的风险。

对接步骤

  1. 委托id和聚合根之间没有直接关系,修改后新创建一条报价,原委托报价作废。而聚合根不变。
  2. 质押式回购代码现在每个流程都分7步。
    1. 初始化
    2. 验证参数
    3. 落库(网关服务没有)
    4. 组装与外部系统对接的参数
    5. 发送请求
    6. 成功处理
    7. 失败处理

axon+cucumber单元测试

Command/Event测试

  1. 构造测试组件
1
2
3
4
5
6
public class AggregateEndpoint {

AggregateTestFixture<ExchangeStockOrderAggregate> fixture = new AggregateTestFixture<ExchangeStockOrderAggregate>(ExchangeStockOrderAggregate.class); ① 声明Axon测试Mock组件

ResultValidator<ExchangeStockOrderAggregate> resultValidator; ② 执行状态池
}
  1. cucumber测试因子注入
1
2
3
4
5
6
7
8
9
@Test
Feature: Command/Event测试
Scenario Outline: 发送ESCreateOrderCmd,得到ESOrderCreatedEvt
Given 啥也不给
When 发送ESCreateOrderCmd命令,其中id="<id>"
Then 预期成功产生一个ESOrderCreatedEvt事件,其中id="<id>"
Examples:
| id |
| 0001 |
  1. cucumber行为驱动测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AggrStepDefinitions extends AggregateEndpoint{
@Given("啥也不给")
public void 啥也不给() {

}
@When("发送ESCreateOrderCmd命令,其中id={string}")
public void 发送es_create_order_cmd命令_其中id(String id) {
ESCreateOrderCmd cmd = new ESCreateOrderCmd();
cmd.setId(id);
cmd.initData();
resultValidator = fixture.when(cmd);
}
@Then("预期成功产生一个ESOrderCreatedEvt事件,其中id={string}")
public void 预期成功产生一个es_order_created_evt事件_其中id(String id) {
ESOrderCreatedEvt evt = new ESOrderCreatedEvt();
evt.setId(id);
evt.initData();
resultValidator.expectEvents(evt);
}

}

Saga测试

  1. 构造测试组件
1
2
3
4
5
6
7
public class SagaEndpoint {

SagaTestFixture<ESCancelOrderSaga> fixture = new SagaTestFixture<>(ESCancelOrderSaga.class); ① 声明AxonSaga组件

FixtureExecutionResult executionResult; ② 执行状态池

}
  1. cucumber测试因子注入

参考Command/Event测试

  1. cucumber行为驱动测试

参考Command/Event测试

事件异步转同步

栅栏 请求开始创建锁,结束时候释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Slf4j
public class SyncController {

private final Map<String, Object> responseMap;

private final Map<String, CyclicBarrier> lockMap;

private final ReentrantLock reentrantLock;


public SyncController() {
responseMap = new ConcurrentHashMap<>();
lockMap = new ConcurrentHashMap<>();
reentrantLock = new ReentrantLock();
}

public Object waitResponse(String requestId) {
reentrantLock.lock();
if (responseMap.get(requestId) != null) {
return responseMap.remove(requestId);
}
lockMap.put(requestId, new CyclicBarrier(2));
reentrantLock.unlock();
try {
lockMap.get(requestId).await(8, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
e.printStackTrace();
}
return responseMap.remove(requestId);
}

public void syncResponse(String requestId, Object response) {
reentrantLock.lock();
responseMap.put(requestId, response);
if (Objects.isNull(lockMap.get(requestId))) {
return;
}
reentrantLock.unlock();
try {
lockMap.get(requestId).await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
lockMap.remove(requestId);
log.info("sync response:{},{}", requestId, response);

}
}


axon还缺哪些东西

安全配置

https://www.infoq.cn/article/fcrbkbkvath5fnfjvk1o

事件处理器

  1. 订阅处理器,订阅事件流,从订阅那一刻起处理事件。

  2. 跟踪处理器,会自动跟踪处理进度,默认从一开始重放事件存储中的所有事件。

1
2
3
4
5
6
7
8
9
10
11
12
 @Autowired
public void config(EventProcessingConfigurer configurer) {
//跟踪处理器
configurer.registerTrackingEventProcessorConfiguration(
c -> TrackingEventProcessorConfiguration.forParallelProcessing(2)
);
//订阅处理器 不好用,待研究!!!!!!
// configurer.usingSubscribingEventProcessors();
// configurer.registerSubscribingEventProcessor("sb");

}

未解决的代码问题

  1. common basic 中的 response/pledgerepo文件夹挪走
  2. common basic 中的 response/risk文件夹挪走
  3. saga中的静态commandgateway 需要改成接口,可测试的类。
  4. 异步转同步时候map操作同步操作。
  5. 带业务的公共类 (券信息)单提出一个包放