跳到主要内容

· 阅读需 6 分钟

现状 & 痛点

对于Seata而言,是通过记录DML操作的前后的数据用于进行后续可能的回滚操作的,并且把这些数据保存到数据库的一个blob的字段里面。对于批量插入,更新,删除等操作,其影响的行数可能会比较多,拼接成一个大的字段插入到数据库,可能会带来以下问题:

  1. 超出数据库单次操作的最大写入限制(比如MySQL的max_allowed_package参数);
  2. 较大的数据量带来的网络IO和数据库磁盘IO开销比较大。

头脑风暴

对于第1点的问题,可以根据业务的实际情况,调大max_allowed_package参数的限制,从而避免出现query is too large的问题;对于第2点,可以通过提高带宽和选用高性能的SSD作为数据库的存储介质。

以上都是通过外部方案或者加钱方案去解决的。那么有没有框架层面解决方案以解决上面的痛点?

此时结合到以上的痛点出现的根源,在于生成的数据字段过大。为此,如果可以把对应的数据进行业务方压缩之后,再进行数据传输以及落库,理论上也可以解决上面的问题。

可行性分析

结合以上头脑风暴的内容,考虑在实际开发中,当需要进行大批量操作的时候,大多会选在较少用户操作,并发相对较低的时间点执行,此时CPU,内存等资源可以相对占用多一点以快速完成对应的操作。因此,可以通过消耗CPU资源和内存资源,来对对应的回滚的数据进行压缩,从而缩小数据传输和存储的大小。

此时,还需要证明以下两件事:

  1. 经过压缩之后,可以减少网络IO和数据库磁盘IO的压力,这里可以采用数据压缩+落库完成的总时间作为侧面参考指标。
  2. 经过压缩之后,数据大小跟原来比较的压缩效率有多高,这里使用压缩前后的数据大小来作为指标。

压缩网络用时指标测试:

image

压缩比测试:

image

通过以上的测试结果,可以明显的看出,使用gzip或zip进行压缩的情况下,可以较大程度的减少数据库的压力和网络传输的压力,同时也可以较大幅度的减少保存的数据的大小。

实现

实现思路

压缩

部分代码

properties配置:

# 是否开启undo_log压缩,默认为true
seata.client.undo.compress.enable=true
# 压缩器类型,默认为zip,一般建议都是zip
seata.client.undo.compress.type=zip
# 启动压缩的阈值,默认为64k
seata.client.undo.compress.threshold=64k

判断是否开启了undo_log压缩功能以及是否达到压缩的阈值:

protected boolean needCompress(byte[] undoLogContent) {
// 1. 判断是否开启了undo_log压缩功能(1.4.2默认开启)
// 2. 判断是否达到了压缩的阈值(默认64k)
// 如果都满足返回需要对对应的undoLogContent进行压缩
return ROLLBACK_INFO_COMPRESS_ENABLE
&& undoLogContent.length > ROLLBACK_INFO_COMPRESS_THRESHOLD;
}

确定需要压缩后,对undo_log进行压缩:

// 如果需要压缩,对undo_log进行压缩
if (needCompress(undoLogContent)) {
// 获取压缩类型,默认zip
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
// 获取对应的压缩器,并且进行压缩
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
// else 不需要压缩就不需要做任何操作

将压缩类型同步保存到数据库,供回滚时使用:

protected String buildContext(String serializer, CompressorType compressorType) {
Map<String, String> map = new HashMap<>();
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
// 保存压缩类型到数据库
map.put(UndoLogConstants.COMPRESSOR_TYPE_KEY, compressorType.name());
return CollectionUtils.encodeMap(map);
}

回滚时解压缩对应的信息:

protected byte[] getRollbackInfo(ResultSet rs) throws SQLException  {
// 获取保存到数据库的回滚信息的字节数组
byte[] rollbackInfo = rs.getBytes(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
// 获取压缩类型
// getOrDefault使用默认值CompressorType.NONE来兼容1.4.2之前的版本直接升级1.4.2+
String rollbackInfoContext = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = CollectionUtils.decodeMap(rollbackInfoContext);
CompressorType compressorType = CompressorType.getByName(context.getOrDefault(UndoLogConstants.COMPRESSOR_TYPE_KEY,
CompressorType.NONE.name()));
// 获取对应的压缩器,并且解压缩
return CompressorFactory.getCompressor(compressorType.getCode())
.decompress(rollbackInfo);
}

结语

通过对undo_log的压缩,在框架层面,进一步提高Seata在处理数据量较大的时候的性能。同时,也提供了对应的开关和相对合理的默认值,既方便用户进行开箱即用,也方便用户根据实际需求进行一定的调整,使得对应的功能更适合实际使用场景。

· 阅读需 16 分钟
  1. seata版本:1.4.0,但1.4以下的所有版本也都有这个问题
  2. 问题描述:在一个全局事务中,一个分支事务上的纯查询操作突然卡住了,没有任何反馈(日志/异常),直到消费端RPC超时

image.png

问题排查

  1. 整个流程在一个全局事务中,消费者和提供者可以看成是全局事务中的两个分支事务,消费者 --> 提供者
  2. 消费者先执行本地的一些逻辑,然后向提供者发送RPC请求,确定消费者发出了请求已经并且提供者接到了请求
  3. 提供者先打印一条日志,然后执行一条纯查询SQL,如果SQL正常执行会打印日志,但目前的现象是只打印了执行SQL前的那条日志,而没有打印任何SQL相关的日志。找DBA查SQL日志,发现该SQL没有执行
  4. 确定了该操作只是全局事务下的一个纯查询操作,在该操作之前,全局事务中的整体流程完全正常
  5. 其实到这里现象已经很明显了,不过当时想法没转变过来,一直关注那条查询SQL,总在想就算查询超时等原因也应该抛出异常啊,不应该什么都没有。DBA都找不到查询记录,那是不是说明SQL可能根本就没执行啊,而是在执行SQL前就出问题了,比如代理?
  6. 借助arthas的watch命令,一直没有东西输出。第一条日志的输出代表这个方法一定执行了,迟迟没有结果输出说明当前请求卡住了,为什么卡住了呢?
  7. 借助arthas的thread命令 thread -bthread -n,就是要找出当前最忙的线程。这个效果很好,有一个线程CPU使用率92%,并且因为该线程导致其它20多个Dubbo线程BLOCKED了。堆栈信息如下
  8. 分析堆栈信息,已经可以很明显的发现和seata相关的接口了,估计和seata的数据源代理有关;同时发现CPU占用最高的那个线程卡在了ConcurrentHashMap#computeIfAbsent方法中。难道ConcurrentHashMap#computeIfAbsent方法有bug?
  9. 到这里,问题的具体原因我们还不知道,但应该和seata的数据源代理有点关系,具体原因我们需要分析业务代码和seata代码

image.png

问题分析

ConcurrentHashMap#computeIfAbsent

这个方法确实有可能出问题:如果两个key的hascode相同,并且在对应的mappingFunction中又进行了computeIfAbsent操作,则会导致死循环,具体分析参考这篇文章:https://juejin.cn/post/6844904191077384200

Seata数据源自动代理

相关内容之前有分析过,我们重点来看看以下几个核心的类:

  1. SeataDataSourceBeanPostProcessor
  2. SeataAutoDataSourceProxyAdvice
  3. DataSourceProxyHolder
SeataDataSourceBeanPostProcessor

SeataDataSourceBeanPostProcessorBeanPostProcessor实现类,在postProcessAfterInitialization方法(即Bean初始化之后)中,会为业务方配置的数据源创建对应的seata代理数据源

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy.");
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice是一个MethodInterceptor,seata中的SeataAutoDataSourceProxyCreator会针对DataSource类型的Bean创建动态代理对象,代理逻辑就是SeataAutoDataSourceProxyAdvice#invoke逻辑。即:执行数据源AOP代理对象的相关方法时候,会经过其invoke方法,在invoke方法中再根据当原生数据源,找到对应的seata代理数据源,最终达到执行seata代理数据源逻辑的目的

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
......
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
}
DataSourceProxyHolder

流程上我们已经清楚了,现在还有一个问题,如何维护原生数据源seata代理数据源之间的关系?通过DataSourceProxyHolder维护,这是一个单例对象,该对象中通过一个ConcurrentHashMap维护两者的关系:原生数据源为key --> seata代理数据源 为value

public class DataSourceProxyHolder {
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource = dataSource;
......
return CollectionUtils.computeIfAbsent(this.dataSourceProxyMap, originalDataSource,
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}
}


// CollectionUtils.java
public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value != null) {
return value;
}
return map.computeIfAbsent(key, mappingFunction);
}

客户端数据源配置

  1. 配置了两个数据源:DynamicDataSourceP6DataSource
  2. P6DataSource可以看成是对DynamicDataSource的一层包装
  3. 我们暂时不去管这个配置合不合理,现在只是单纯的基于这个数据源配置分析问题
@Qualifier("dsMaster")
@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(dsMaster());
return p6DataSource;
}

分析过程

假设现在大家都已经知道了 ConcurrentHashMap#computeIfAbsent 可能会产生的问题,已知现在产生了这个问题,结合堆栈信息,我们可以知道大概哪里产生了这个问题。

1、ConcurrentHashMap#computeIfAbsent会产生这个问题的前提条件是:两个key的hashcode相同mappingFunction中对应了一个put操作。结合我们seata的使用场景,mappingFunction对应的是DataSourceProxy::new,说明在DataSourceProxy的构造方法中可能会触发put操作

image.png

执行AOP代理数据源相关方法 =>
进入SeataAutoDataSourceProxyAdvice切面逻辑 =>
执行DataSourceProxyHolder#putDataSource方法 =>
执行DataSourceProxy::new =>
AOP代理数据源的getConnection方法 =>
原生数据源的getConnection方法 =>
进入SeataAutoDataSourceProxyAdvice切面逻辑 =>
执行DataSourceProxyHolder#putDataSource方法 =>
执行DataSourceProxy::new =>
DuridDataSource的getConnection方法

2、步骤1中说的AOP代理数据源原生数据源分别是什么?看下面这张图 image.png

3、上面还说到了产生这个问题还有一个条件两个key的hashcode相同,但我看这两个数据源对象都没有重写hashcode方法,所以按理来说,这两个对象的hashcode一定是不同的。后面又再看了一遍ConcurrentHashMap这个问题,感觉两个key的hashcode相同这个说法是不对的,两个key会产生hash冲突更合理一些,这样就能解释两个hashcode不同的对象为啥会遇上这个问题了。为了证明这个,下面我给了一个例子

public class Test {
public static void main(String[] args) {
ConcurrentHashMap map = new ConcurrentHashMap(8);
Num n1 = new Num(3);
Num n2 = new Num(19);
Num n3 = new Num(20);

// map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)); // 这行代码不会导致程序死循环
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)); // 这行代码会导致程序死循环
}

static class Num{
private int i;
public Num(int i){
this.i = i;
}

@Override
public int hashCode() {
return i;
}
}
}
  1. 为了方便重现问题,我们重写了Num#hashCode方法,保证构造函数入参就是hashcode的值
  2. 创建一个ConcurrentHashMap对象,initialCapacity为8,sizeCtl计算出来的值为16,即该map中数组长度默认为16
  3. 创建对象n1,入参为3,即hashcode为3,计算得出其对应的数组下标为3
  4. 创建对象n2,入参为19,即hashcode为19,计算得出其对应的数组下标为3,此时我们可以认为n1和n2产生了hash冲突
  5. 创建对象n3,入参为20,即hashcode为20,计算得出其对应的数组下标为4
  6. 执行map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n3, k2 -> 200)),程序正常退出:因为n1和n3没有hash冲突,所以正常结束
  7. 执行map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200)),程序正常退出:因为n1和n2产生了hash冲突,所以陷入死循环

4、在对象初始化的时候,SeataDataSourceBeanPostProcessor不是已经将对象对应的数据源代理初始化好了吗?为什么在SeataAutoDataSourceProxyAdvice中还是会创建对应的数据源代理呢?

  1. 首先,SeataDataSourceBeanPostProcessor执行时期是晚于AOP代理对象创建的,所以在执行SeataDataSourceBeanPostProcessor相关方法的时候,SeataAutoDataSourceProxyAdvice其实应生效了
  2. SeataDataSourceBeanPostProcessor中向map中添加元素时,key为AOP代理数据源SeataAutoDataSourceProxyAdvice中的invocation.getThis()中拿到的是原生数据源,所以key不相同

image.png

5、还有一个问题,SeataAutoDataSourceProxyAdvic#invoke方法中并没有过滤toString、hashCode等方法,cglib创建的代理对象默认会重写这几个方法,如果在向map中put元素的时候触发了代理对象的这些方法,此时又会重新进入SeataAutoDataSourceProxyAdvic#invoke切面,直到线程堆栈益处

问题总结

  1. 在两个key会产生hash冲突的时候,会触发ConcurrentHashMap#computeIfAbsentBUG,该BUG的表现就是让当前线程陷入死循环
  2. 业务反馈,该问题是偶现的,偶现的原因有两种:首先,该应用是多节点部署,但线上只有一个节点触发了该BUG(hashcode冲突),所以只有当请求打到这个节点的时候才有可能会触发该BUG;其次,因为每次重启对象地址(hashcode)都是不确定的,所以并不是每次应用重启之后都会触发,但如果一旦触发,该节点就会一直存在这个问题。有一个线程一直在死循环,并将其它尝试从map中获取代理数据源的线程阻塞了,这种现象在业务上的反馈就是请求卡住了。如果连续请求都是这样,此时业务方可能会重启服务,然后因为重启后hash冲突不一定存在,可能重启后业务表现就正常了,但也有可能在下次重启的时候又会触发了这个BUG
  3. 当遇到这个问题时,从整个问题上来看,确实就是死锁了,因为那个死循环的线程占者锁一直不释放,导致其它操作该map的线程被BLOCK了
  4. 本质上还是因为ConcurrentHashMap#computeIfAbsent方法可能会触发BUG,而seata的使用场景刚好就触发了该BUG
  5. 下面这个demo其实就完整的模拟了线上出问题时的场景,如下:
public class Test {
public static void main(String[] args) {

ConcurrentHashMap map = new ConcurrentHashMap(8);

Num n1 = new Num(3);
Num n2 = new Num(19);

for(int i = 0; i< 20; i++){
new Thread(()-> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

map.computeIfAbsent(n1, k-> 200);
}).start();
}
map.computeIfAbsent(n1, k1 -> map.computeIfAbsent(n2, k2 -> 200));
}


static class Num{
private int i;

public Num(int i){
this.i = i;
}
@Override
public int hashCode() {
return i;
}
}
}

image.png

解决问题

可以从两方面解决这个问题:

  1. 业务方改动:P6DataSource 和 DynamicDataSource 没必要都被代理,直接代理P6DataSource就可以了,DynamicDataSource没有必要声明成一个Bean;或者通过excluds属性排除P6DataSource,这样就不会存在重复代理问题
  2. Seata完善:完善数据源代理相关逻辑
业务方改动

1、数据源相关的配置改成如下即可:

@Primary
@Qualifier("p6DataSource")
@Bean("p6DataSource")
P6DataSource p6DataSource(@Qualifier("dsMaster") DataSource dataSource) {
P6DataSource p6DataSource = new P6DataSource(new TuYaDynamicDataSource(masterDsRoute));
logger.warn("dsMaster={}, hashcode={}",p6DataSource, p6DataSource.hashCode());
return p6DataSource;
}

2、或者不改变目前的数据源配置,添加excluds属性

@EnableAutoDataSourceProxy(excludes={"P6DataSource"})
Seata完善

1、ConcurrentHashMap#computeIfAbsent方法改成双重检查,如下:

SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
synchronized (dataSourceProxyMap) {
dsProxy = dataSourceProxyMap.get(originalDataSource);
if (dsProxy == null) {
dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
dataSourceProxyMap.put(originalDataSource, dsProxy);
}
}
}
return dsProxy;

之前我想直接改CollectionUtils#computeIfAbsent方法,群里大佬反馈这样可能会导致数据源多次创建,确实有这个问题:如下

public static <K, V> V computeIfAbsent(Map<K, V> map, K key, Function<? super K, ? extends V> mappingFunction) {
V value = map.get(key);
if (value != null) {
return value;
}
value = mappingFunction.apply(key);
return map.computeIfAbsent(key, value);
}

2、SeataAutoDataSourceProxyAdvice切面逻辑中添加一些过滤

Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}

遗留问题

SeataDataSourceBeanPostProcessorSeataAutoDataSourceProxyAdvice对应方法中,向map中初始化seata数据源代理时对应的key根本就不同,SeataDataSourceBeanPostProcessor中对应的key是AOP代理数据源SeataAutoDataSourceProxyAdvice中对应的key是原生对象,此时就造成了不必要的seata数据源代理对象的创建。

针对这个问题,大家有什么好的建议?能不能为SeataDataSourceBeanPostProcessor指定一个order,让其在创建AOP代理对象之前生效

原文链接

https://juejin.cn/post/6939041336964153352/

· 阅读需 19 分钟

“刚上手Seata,对其各个模块了解还不够深入?
想深入研究Seata源码,却还未付诸实践?
想探究下在集成Seata后,自己的应用在启动过程中“偷偷”干了些啥?
想学习Seata作为一款优秀开源框架蕴含的设计理念和最佳实践?
如果你有上述任何想法之一,那么今天这篇文章,就是为你量身打造的~

前言

在Seata的应用侧(RM、TM)启动过程中,首先要做的就是与协调器侧(TC)建立通信,这是Seata能够完成分布式事务协调的前提,那么Seata在完成应用侧初始化以及与TC建立连接的过程中,是如何找到TC事务协调器的集群和地址的?又是如何从配置模块中获取各种配置信息的呢?这正是本文要探究的重点。

给个限定

Seata作为一款中间件级的底层组件,是很谨慎引入第三方框架具体实现的,感兴趣的同学可以深入了解下Seata的SPI机制,看看Seata是如何通过大量扩展点(Extension),来将依赖组件的具体实现倒置出去,转而依赖抽象接口的,同时,Seata为了更好地融入微服务、云原生等流行架构所衍生出来的生态中,也基于SPI机制对多款主流的微服务框架、注册中心、配置中心以及Java开发框架界“扛把子”——SpringBoot等做了主动集成,在保证微内核架构、松耦合、可扩展的同时,又可以很好地与各类组件“打成一片”,使得采用了各种技术栈的环境都可以比较方便地引入Seata。

本文为了贴近大家刚引入Seata试用时的场景,在以下介绍中,选择应用侧的限定条件如下:使用File(文件)作为配置中心与注册中心,并基于SpringBoot启动。

有了这个限定条件,接下来就让我们深入Seata源码,一探究竟吧。

多模块交替协作的RM/TM初始化过程

Seata客户端启动过程剖析(一)中,我们分析了Seata应用侧TM与RM的初始化、以及应用侧如何创建Netty Channel并向TC Server发送注册请求的过程。除此之外,在RM初始化过程中,Seata的其他多个模块(注册中心、配置中心、负载均衡)也都纷纷登场,相互协作,共同完成了连接TC Server的过程。

当执行Client重连TC Server的方法:NettyClientChannelManager.Channreconnect()时,首先需要根据当前的事务分组获取可用的TC Server地址列表:

    /**
* NettyClientChannelManager.reconnect()
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
//从注册中心中获取可用的TC Server地址
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
//以下代码略
}

关于事务分组的详细概念介绍,大家可以参考官方文档事务分组介绍。这里简单介绍一下:

  • 每个Seata应用侧的RM、TM,都具有一个事务分组
  • 每个Seata协调器侧的TC,都具有一个集群名地址 应用侧连接协调器侧时,经历如下两步:
  • 通过事务分组的名称,从配置中获取到该应用侧对应的TC集群名
  • 通过集群名称,可以从注册中心中获取TC集群的地址列表 以上概念、关系与过程,如下图所示: Seata事务分组与建立连接的关系

注册中心获取TC Server集群地址

了解RM/TC连接TC时涉及的主要概念与步骤后,我们继续探究getAvailServerList方法:

    private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
//① 使用注册中心工厂,获取注册中心实例
//② 调用注册中心的查找方法lookUp(),根据事务分组名称获取TC集群中可用Server的地址列表
List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
return Collections.emptyList();
}

return availInetSocketAddressList.stream()
.map(NetUtil::toStringAddress)
.collect(Collectors.toList());
}

用哪个注册中心?Seata元配置文件给出答案

上面已提到,Seata支持多种注册中心的实现,那么,Seata首先需要从一个地方先获取到“注册中心的类型”这个信息。

从哪里获取呢?Seata设计了一个“配置文件”用于存放其框架内所用组件的一些基本信息,我更愿意称这个配置文件为 『元配置文件』,这是因为它包含的信息,其实是“配置的配置”,也即“元”的概念,大家可以对比数据库表中的信息,和数据库表本身结构的信息(表数据和表元数据)来理解。

我们可以把注册中心、配置中心中的信息,都看做是配置信息本身,而这些配置信息的配置是什么?这些信息,就包含在Seata的元配置文件中。实际上,『元配置文件』中只包含两类信息

  • 一是注册中心的类型:registry.type,以及该类型注册中心的一些基本信息,比如当注册中心类型为文件时,元配置文件中存放了文件的名字信息;当注册中心类型是Nacos时,元配置文件中则存放着Nacos的地址、命名空间、集群名等信息
  • 二是配置中心的类型:config.type,以及该类型配置中心的一些基本信息,比如当配置中心为文件时,元配置文件中存放了文件的名字信息;当注册中心类型为Consul时,元配置文件中存放了Consul的地址信息

Seata的元配置文件支持Yaml、Properties等多种格式,而且可以集成到SpringBoot的application.yaml文件中(使用seata-spring-boot-starter即可),方便与SpringBoot集成。

Seata中自带的默认元配置文件是registry.conf,当我们采用文件作为注册与配置中心时,registry.conf中的内容设置如下:

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}

在如下源码中,我们可以发现,Seata使用的注册中心的类型,是从ConfigurationFactory.CURRENT_FILE_INSTANCE中获取的,而这个CURRENT_FILE_INSTANCE,就是我们所说的,Seata元配置文件的实例

    //在getInstance()中,调用buildRegistryService,构建具体的注册中心实例
public static RegistryService getInstance() {
if (instance == null) {
synchronized (RegistryFactory.class) {
if (instance == null) {
instance = buildRegistryService();
}
}
}
return instance;
}

private static RegistryService buildRegistryService() {
RegistryType registryType;
//获取注册中心类型
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
registryType = RegistryType.getType(registryTypeName);
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName);
}
if (RegistryType.File == registryType) {
return FileRegistryServiceImpl.getInstance();
} else {
//根据注册中心类型,使用SPI的方式加载注册中心的实例
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
}

我们来看一下元配置文件的初始化过程,当首次获取静态字段CURRENT_FILE_INSTANCE时,触发ConfigurationFactory类的初始化:

    //ConfigurationFactory类的静态块
static {
load();
}

/**
* load()方法中,加载Seata的元配置文件
*/
private static void load() {
//元配置文件的名称,支持通过系统变量、环境变量扩展
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_DEFAULT;
}
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}
//根据元配置文件名称,创建一个实现了Configuration接口的文件配置实例
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = null;
//通过SPI加载,来判断是否存在扩展配置提供者
//当应用侧使用seata-spring-boot-starer时,将通过SpringBootConfigurationProvider作为扩展配置提供者,这时当获取元配置项时,将不再从file.conf(默认)中获取,而是从application.properties/application.yaml中获取
try {
//通过ExtConfigurationProvider的provide方法,将原有的Configuration实例替换为扩展配置的实例
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("load Configuration:{}", extConfiguration == null ? configuration.getClass().getSimpleName()
: extConfiguration.getClass().getSimpleName());
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
//存在扩展配置,则返回扩展配置实例,否则返回文件配置实例
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

load()方法的调用序列图如下: Seata元配置文件的加载过程

上面的序列图中,大家可以关注以下几点:

  • Seata元配置文件名称支持扩展
  • Seata元配置文件后缀支持3种后缀,分别为yaml/properties/conf,在创建元配置文件实例时,会依次尝试匹配
  • Seata中配置能力相关的顶级接口为Configuration,各种配置中心均需实现此接口,Seata的元配置文件就是使用FileConfiguration(文件类型的配置中心)实现了此接口
/**
* Seata配置能力接口
* package:io.seata.config
*/

public interface Configuration {
/**
* Gets short.
*
* @param dataId the data id
* @param defaultValue the default value
* @param timeoutMills the timeout mills
* @return the short
*/
short getShort(String dataId, int defaultValue, long timeoutMills);

//以下内容略,主要能力为配置的增删改查
}
  • Seata提供了一个类型为ExtConfigurationProvider的扩展点,开放了对配置具体实现的扩展能力,它具有一个provide()方法,接收原有的Configuration,返回一个全新的Configuration,此接口方法的形式决定了,一般可以采用静态代理、动态代理、装饰器等设计模式来实现此方法,实现对原有Configuration的增强
/**
* Seata扩展配置提供者接口
* package:io.seata.config
*/
public interface ExtConfigurationProvider {
/**
* provide a AbstractConfiguration implementation instance
* @param originalConfiguration
* @return configuration
*/
Configuration provide(Configuration originalConfiguration);
}
  • 当应用侧基于seata-seata-spring-boot-starter启动时,将采用『SpringBootConfigurationProvider』作为扩展配置提供者,在其provide方法中,使用动态字节码生成(CGLIB)的方式为『FileConfiguration』实例创建了一个动态代理类,拦截了所有以"get"开头的方法,来从application.properties/application.yaml中获取元配置项。

关于SpringBootConfigurationProvider类,本文只说明下实现思路,不再展开分析源码,这也仅是ExtConfigurationProvider接口的一种实现方式,从Configuration可扩展、可替换的角度来看,Seata正是通过ExtConfigurationProvider这样一个扩展点,为多种配置的实现提供了一个广阔的舞台,允许配置的多种实现与接入方案。

经历过上述加载流程后,如果我们没有扩展配置提供者,我们将从Seata元配置文件中获取到注册中心的类型为file,同时创建了一个文件注册中心实例:FileRegistryServiceImpl

从注册中心获取TC Server地址

获取注册中心的实例后,需要执行lookup()方法(RegistryFactory.getInstance().lookup(transactionServiceGroup)),FileRegistryServiceImpl.lookup()的实现如下:

    /**
* 根据事务分组名称,获取TC Server可用地址列表
* package:io.seata.discovery.registry
* class:FileRegistryServiceImpl
*/
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
//获取TC Server集群名称
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
//从配置中心中获取TC集群中所有可用的Server地址
String endpointStr = CONFIG.getConfig(
PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);
if (StringUtils.isNullOrEmpty(endpointStr)) {
throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");
}
//将地址封装为InetSocketAddress并返回
String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);
List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
for (String endpoint : endpoints) {
String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);
if (ipAndPort.length != 2) {
throw new IllegalArgumentException("endpoint format should like ip:port");
}
inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));
}
return inetSocketAddresses;
}

/**
* 注册中心接口中的default方法
* package:io.seata.discovery.registry
* class:RegistryService
*/
default String getServiceGroup(String key) {
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
//在配置缓存中,添加事务分组名称变化监听事件
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
//从配置中心中获取事务分组对应的TC集群名称
return ConfigurationFactory.getInstance().getConfig(key);
}

可以看到,代码逻辑与第一节中图Seata事务分组与建立连接的关系中的流程相符合, 这时,注册中心将需要配置中心的协助,来获取事务分组对应的集群名称、并查找集群中可用的服务地址。

配置中心获取TC集群名称

配置中心的初始化

配置中心的初始化(在ConfigurationFactory.buildConfiguration()),与注册中心的初始化流程类似,都是先从元配置文件中获取配置中心的类型等信息,然后初始化一个具体的配置中心实例,有了之前的分析基础,这里不再赘述。

获取配置项的值

上方代码段的两个方法:*FileRegistryServiceImpl.lookup()以及RegistryService.getServiceGroup()*中,都从配置中心中获取的配置项的值:

  • lookup()需要由具体的注册中心实现,使用文件作为注册中心,其实是一种直连TC Server的情况,其特殊点在于TC Server的地址是写死在配置中的的(正常应存于注册中心中),因此FileRegistryServiceImpl.lookup()方法,是通过配置中心获取的TC集群中Server的地址信息
  • getServiceGroup()是RegistryServer接口中的default方法,即所有注册中心的公共实现,Seata中任何一种注册中心,都需要通过配置中心来根据事务分组名称来获取TC集群名称

负载均衡

经过上述环节配置中心、注册中心的协作,现在我们已经获取到了当前应用侧所有可用的TC Server地址,那么在发送真正的请求之前,还需要通过特定的负载均衡策略,选择一个TC Server地址,这部分源码比较简单,就不带着大家分析了。

关于负载均衡的源码,大家可以阅读AbstractNettyRemotingClient.doSelect(),因本文分析的代码是RMClient/TMClient的重连方法,此方法中,所有获取到的Server地址,都会通过遍历依次连接(重连),因此这里不需要再做负载均衡。

以上就是Seata应用侧在启动过程中,注册中心与配置中心这两个关键模块之间的协作关系与工作流程,欢迎共同探讨、学习!

后记:本文及其上篇 Seata客户端启动过程剖析(一),是本人撰写的首批技术博客,将上手Seata时,个人认为Seata中较为复杂、需要研究和弄通的部分源码进行了分析和记录。 在此欢迎各位读者提出各种改进建议,谢谢!

· 阅读需 12 分钟

“刚上手 Seata,对其各个模块了解还不够深入?
想深入研究 Seata 源码,却还未付诸实践?
想探究下在集成 Seata 后,自己的应用在启动过程中“偷偷”干了些啥?
想学习 Seata 作为一款优秀开源框架蕴含的设计理念和最佳实践?
如果你有上述任何想法之一,那么今天这篇文章,就是为你量身打造的~

前言

看过官网 README 的第一张图片的同学都应该清楚,Seata 协调分布式事务的原理便在于通过其协调器侧的 TC,来与应用侧的 TM、RM 进行各种通信与交互,来保证分布式事务中,多个事务参与者的数据一致性。那么 Seata 的协调器侧与应用侧之间,是如何建立连接并进行通信的呢?

没错,答案就是 Netty,Netty 作为一款高性能的 RPC 通信框架,保证了 TC 与 RM 之间的高效通信,关于 Netty 的详细介绍,本文不再展开,今天我们探究的重点,在于应用侧在启动过程中,如何通过一系列 Seata 关键模块之间的协作(如 RPC、Config/Registry Center 等),来建立与协调器侧之间的通信

从 GlobalTransactionScanner 说起

我们知道 Seata 提供了多个开发期注解,比如用于开启分布式事务的@GlobalTransactional、用于声明 TCC 两阶段服务的@TwoPhraseBusinessAction 等,它们都是基于 Spring AOP 机制,对使用了注解的 Bean 方法分配对应的拦截器进行增强,来完成对应的处理逻辑。而 GlobalTransactionScanner 这个 Spring Bean,就承载着为各个注解分配对应的拦截器的职责,从其 Scanner 的命名,我们也不难推断出,它是为了在 Spring 应用启动过程中,对与全局事务(GlobalTransactionScanner)相关的 Bean 进行扫描、处理的。

除此之外,应用侧 RPC 客户端(TMClient、RMClient)初始化、与 TC 建立连接的流程,也是在 GlobalTransactionScanner#afterPropertiesSet()中发起的:

    /**
* package:io.seata.spring.annotation
* class:GlobalTransactionScanner
*/
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//在Bean属性初始化之后,执行TM、RM的初始化
initClient();

}

RM & TM 的初始化与连接过程

这里,我们以 RMClient.init()为例说明,TMClient 的初始化过程亦同理。

类关系的设计

查看 RMClient#init()的源码,我们发现,RMClient 先构造了一个 RmNettyRemotingClient,然后执行其初始化init()方法。而 RmNettyRemotingClient 的构造器初始化方法,都会逐层调用父类的构造器与初始化方法

    /**
* RMClient的初始化逻辑
* package:io.seata.rm
* class:RMClient
*/
public static void init(String applicationId, String transactionServiceGroup) {
//① 首先从RmNettyRemotingClient类开始,依次调用父类的构造器
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//② 然后从RmNettyRemotingClient类开始,依次调用父类的init()
rmNettyRemotingClient.init();
}

上述 RMClient 系列各类之间的关系以及调用构造器和 init()初始化方法的过程如下图示意: RMClient.init简化版流程与主要类之间的关系

那么为何要将 RMClient 设计成这样较为复杂的继承关系呢?其实是为了将各层的职责、边界划分清楚,使得各层可以专注于特定逻辑处理,实现更好的扩展性,这部分的详细设计思路,可参考 Seata RPC 模块重构 PR 的操刀者乘辉兄的文章Seata-RPC 重构之路

初始化的完整流程

各类的构造器与初始化方法中的主要逻辑,大家可以借助下面这个能表意的序列图来梳理下,此图大家也可先跳过不看,在下面我们分析过几个重点类后,再回头来看这些类是何时登场、如何交互的协作的。 RMClient的初始化流程

抓住核心——Channel 的创建

首先我们需要知道,应用侧与协调器侧的通信是借助 Netty 的 Channel(网络通道)来完成的,因此通信过程的关键在于 Channel 的创建,在 Seata 中,通过池化的方式(借助了 common-pool 中的对象池)方式来创建、管理 Channel。

这里我们有必要简要介绍下对象池的简单概念及其在 Seata 中的实现: 涉及到的 common-pool 中的主要类:

  • GenericKeydObjectPool<K, V>:KV 泛型对象池,提供对所有对象的存取管理,而对象的创建由其内部的工厂类来完成
  • KeyedPoolableObjectFactory<K, V>:KV 泛型对象工厂,负责池化对象的创建,被对象池持有

涉及到的 Seata 中对象池实现相关的主要类:

  • 首先,被池化管理的对象就是Channel,对应 common-pool 中的泛型 V
  • NettyPoolKey:Channel 对应的 Key,对应 common-pool 中的泛型 K,NettyPoolKey 主要包含两个信息:
    • address:创建 Channel 时,对应的 TC Server 地址
    • message:创建 Channel 时,向 TC Server 发送的 RPC 消息体
  • GenericKeydObjectPool<NettyPoolKey,Channel>:Channel 对象池
  • NettyPoolableFactory:创建 Channel 的工厂类

认识了上述对象池相关的主要类之后,我们再来看看 Seata 中涉及 Channel 管理以及与 RPC 相关的几个主要类:

  • NettyClientChannelManager:
    • 持有 Channel 对象池
    • 与 Channel 对象池交互,对应用侧 Channel 进行管理(获取、释放、销毁、缓存等)
  • RpcClientBootstrap:RPC 客户端核心引导类,持有 Netty 框架的 Bootstrap 对象,具备启停能力;具有根据连接地址来获取新 Channel 的能力,供 Channel 工厂类调用
  • AbstractNettyRemotingClient:
    • 初始化并持有 RpcClientBootstrap
    • 应用侧 Netty 客户端的顶层抽象,抽象了应用侧 RM/TM 取得各自 Channel 对应的 NettyPoolKey 的能力,供 NettyClientChannelManager 调用
    • 初始化 NettyPoolableFactory

了解上述概念后,我们可以把 Seata 中创建 Channel 的过程简化如下: 创建Channel对象过程

看到这里,大家可以回过头再看看上面的RMClient 的初始化序列图,应该会对图中各类的职责、关系,以及整个初始化过程的意图有一个比较清晰的理解了。

建立连接的时机与流程

那么,RMClient 是何时与 Server 建立连接的呢?

在 RMClient 初始化的过程中,大家会发现,很多 init()方法都设定了一些定时任务,而 Seata 应用侧与协调器的重连(连接)机制,就是通过定时任务来实现的:

    /**
* package:io.seata.core.rpcn.netty
* class:AbstractNettyRemotingClient
*/
public void init() {
//设置定时器,定时重连TC Server
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

我们通过跟踪一次 reconnect 的执行,看看上面探究的几个类之间是如何协作,完成 RMClient 与 TC 的连接的(实际上首次连接可能发生在 registerResource 的过程中,但流程一致) RMClient与TC Server连接过程

这个图中,大家可以重点关注这几个点:

  • NettyClientChannelManager 执行具体 AbstractNettyRemotingClient 中,获取 NettyPoolKey 的回调函数(getPoolKeyFunction()):应用侧的不同 Client(RMClient 与 TMClient),在创建 Channel 时使用的 Key 不同,使两者在重连 TC Server 时,发送的注册消息不同,这也是由两者在 Seata 中扮演的角色不同而决定的:
    • TMClient:扮演事务管理器角色,创建 Channel 时,仅向 TC 发送 TM 注册请求(RegisterTMRequest)即可
    • RMClient:扮演资源管理器角色,需要管理应用侧所有的事务资源,因此在创建 Channel 时,需要在发送 RM 注册请求(RegesterRMRequest)前,获取应用侧所有事务资源(Resource)信息,注册至 TC Server
  • 在 Channel 对象工厂 NettyPoolableFactory 的 makeObject(制造 Channel)方法中,使用 NettyPoolKey 中的两项信息,完成了两项任务:
    • 使用 NettyPoolKey 的 address 创建新的 Channel
    • 使用 NettyPoolKey 的 message 以及新的 Channel 向 TC Server 发送注册请求,这就是 Client 向 TC Server 的连接(首次执行)或重连(非首次,由定时任务驱动执行)请求

以上内容,就是关于 Seata 应用侧的初始化及其与 TC Server 协调器侧建立连接的全过程分析。

更深层次的细节,建议大家再根据本文梳理的脉络和提到的几个重点,细致地阅读下源码,相信定会有更深层次的理解和全新的收获!

后记:考虑到篇幅以及保持一篇源码分析文章较为合适的信息量,本文前言中所说的配置、注册等模块协作配合并没有在文章中展开和体现。
在下篇源码剖析中,我会以配置中心注册中心为重点,为大家分析,在 RMClient/TM Client 与 TC Server 建立连接之前,Seata 应用侧是如何通过服务发现找到 TC Server、如何从配置模块获取各种信息的。

· 阅读需 8 分钟

本文将介绍基于Spring Cloud + feign 如何集成 Seata(1.4.0)的TCC模式。实际上,Seata的AT模式基本上能满足我们使用分布式事务80%的需求,但涉及不支持事务的数据库与中间件(如redis)等的操作,或AT模式暂未支持的数据库(目前AT支持Mysql、Oracle与PostgreSQL)、跨公司服务的调用、跨语言的应用调用或有手动控制整个二阶段提交过程的需求,则需要结合TCC模式。不仅如此,TCC模式还支持与AT模式混合使用。

本文作者:弓行(谭志坚)

一、TCC模式的概念

一个分布式的全局事务,整体是两阶段提交Try-[Comfirm/Cancel] 的模型。在Seata中,AT模式与TCC模式事实上都是两阶段提交的具体实现。他们的区别在于:

AT 模式基于支持本地 ACID 事务关系型数据库(目前支持Mysql、Oracle与PostgreSQL):

一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志。 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚。

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。 二阶段 commit 行为:调用 自定义的 commit 逻辑。 二阶段 rollback 行为:调用 自定义的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

简单点概括,SEATA的TCC模式就是手工的AT模式,它允许你自定义两阶段的处理逻辑而不依赖AT模式的undo_log。

二、前提准备

三、TM与TCC-RM的搭建

本章着重讲基于Spring Cloud + Feign的TCC的实现,项目的搭建直接看源码(本工程提供了AT模式与TCC模式的DEMO)

DEMO工程源码

3.1 seata服务端的搭建

服务端搭建文档

3.2 TM的搭建

service-tm

3.3 RM-TCC的搭建

3.3.1 定义TCC接口

由于我们使用的是 SpringCloud + Feign,Feign的调用基于http,因此此处我们使用@LocalTCC便可。值得注意的是,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可,TCC相关注解如下:

  • @LocalTCC 适用于SpringCloud+Feign模式下的TCC
  • @TwoPhaseBusinessAction 注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。
  • @BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。
  • BusinessActionContext 便是指TCC事务上下文

实例如下:

/**
* 这里定义tcc的接口
* 一定要定义在接口上
* 我们使用springCloud的远程调用
* 那么这里使用LocalTCC便可
*
* @author tanzj
*/
@LocalTCC
public interface TccService {

/**
* 定义两阶段提交
* name = 该tcc的bean名称,全局唯一
* commitMethod = commit 为二阶段确认方法
* rollbackMethod = rollback 为二阶段取消方法
* BusinessActionContextParameter注解 传递参数到二阶段中
*
* @param params -入参
* @return String
*/
@TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
String insert(
@BusinessActionContextParameter(paramName = "params") Map<String, String> params
);

/**
* 确认方法、可以另命名,但要保证与commitMethod一致
* context可以传递try方法的参数
*
* @param context 上下文
* @return boolean
*/
boolean commitTcc(BusinessActionContext context);

/**
* 二阶段取消方法
*
* @param context 上下文
* @return boolean
*/
boolean cancel(BusinessActionContext context);
}

3.3.2 TCC接口的业务实现

为了保证代码的简洁,此处将路由层与业务层结合讲解,实际项目则不然。

  • 在try方法中使用@Transational可以直接通过spring事务回滚关系型数据库中的操作,而非关系型数据库等中间件的回滚操作可以交给rollbackMethod方法处理。
  • 使用context.getActionContext("params")便可以得到一阶段try中定义的参数,在二阶段对此参数进行业务回滚操作。
  • **注意1:**此处亦不可以捕获异常(同理切面处理异常),否则TCC将识别该操作为成功,二阶段直接执行commitMethod。
  • 注意2:TCC模式要开发者自行保证幂等和事务防悬挂
@Slf4j
@RestController
public class TccServiceImpl implements TccService {

@Autowired
TccDAO tccDAO;

/**
* tcc服务t(try)方法
* 根据实际业务场景选择实际业务执行逻辑或者资源预留逻辑
*
* @param params - name
* @return String
*/
@Override
@PostMapping("/tcc-insert")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public String insert(@RequestBody Map<String, String> params) {
log.info("xid = " + RootContext.getXID());
//todo 实际的操作,或操作MQ、redis等
tccDAO.insert(params);
//放开以下注解抛出异常
//throw new RuntimeException("服务tcc测试回滚");
return "success";
}

/**
* tcc服务 confirm方法
* 若一阶段采用资源预留,在二阶段确认时要提交预留的资源
*
* @param context 上下文
* @return boolean
*/
@Override
public boolean commitTcc(BusinessActionContext context) {
log.info("xid = " + context.getXid() + "提交成功");
//todo 若一阶段资源预留,这里则要提交资源
return true;
}

/**
* tcc 服务 cancel方法
*
* @param context 上下文
* @return boolean
*/
@Override
public boolean cancel(BusinessActionContext context) {
//todo 这里写中间件、非关系型数据库的回滚操作
System.out.println("please manually rollback this data:" + context.getActionContext("params"));
return true;
}
}

3.3.3 在TM中开启全局事务,调用RM-TCC接口

工程源码见3.2


至此,Spring Cloud整合TCC模式完成

· 阅读需 13 分钟

说到Seata中的配置管理,大家可能会想到Seata中适配的各种配置中心,其实今天要说的不是这个,虽然也会简单分析Seata和各配置中心的适配过程,但主要还是讲解Seata配置管理的核心实现

在讲配置中心之前,先简单介绍一下Server端的启动流程,因为这一块就涉及到配置管理的初始化,核心流程如下:

  1. 程序入口在Server#main方法中
  2. 获取port的几种形式:从容器中获取;从命令行获取;默认端口
  3. 解析命令行参数:host、port、storeMode等参数,这个过程可能涉及到配置管理的初始化
  4. Metric相关:无关紧要,跳过
  5. NettyServer初始化
  6. 核心控制器初始化:Server端的核心,还包括几个定时任务
  7. NettyServer启动

代码如下,删除了非核心代码

public static void main(String[] args) throws IOException {
// 获取port的几种形式:从容器中获取;从命令行获取;默认端口, use to logback.xml
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// 解析启动参数,分容器和非容器两种情况
ParameterParser parameterParser = new ParameterParser(args);

// Metric相关
MetricsManager.get().init();

// NettyServer初始化
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

// 核心控制器初始化
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();

// NettyServer启动
nettyRemotingServer.init();
}

为社么说步骤3中肯能涉及到配置管理的初始化呢?核心代码如下:

if (StringUtils.isBlank(storeMode)) {
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}

如果在启动参数中没有特别指定storeMode,就会通过ConfigurationFactory相关API去获取该配置,在ConfigurationFactory.getInstance()这行代码中,涉及到两部分内容:ConfigurationFactory静态代码初始化和Configuration初始化。接下来我们重点分析这部分内容

配置管理初始化

ConfigurationFactory初始化

我们知道在Seata中有两个关键配置文件:一个是registry.conf,这是核心配置文件,必须要有;另一个是file.conf,只有在配置中心是File的情况下才需要用到。ConfigurationFactory静态代码块中,其实主要就是加载registry.conf文件,大概如下:

1、在没有手动配置的情况,默认读取registry.conf文件,封装成一个FileConfiguration对象,核心代码如下:

Configuration configuration = new FileConfiguration(seataConfigName,false);
FileConfigFactory.load("registry.conf", "registry");
FileConfig fileConfig = EnhancedServiceLoader.load(FileConfig.class, "CONF", argsType, args);

2、配置增强:类似代理模式,获取配置时,在代理对象里面做一些其他处理,下面详细介绍

Configuration extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

3、将步骤2中的代理对象赋值给CURRENT_FILE_INSTANCE引用,在很多地方都直接用到了CURRENT_FILE_INSTANCE这个静态引用

CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;

可以简单的认为:CURRENT_FILE_INSTANCE对应的就是registry.conf里面的内容。我认为registry.conf这个文件名取的不太好,歧义太大,叫做bootstrap.conf是不是更好一些?

Configuration初始化

Configuration其实就是对应配置中心,Seata目前支持的配置中心很多,几乎主流的配置中心都支持,如:apollo、consul、etcd、nacos、zk、springCloud、本地文件。当使用本地文件作为配置中心的时候,涉及到file.conf的加载,当然这是默认的名字,可以自己配置。到这里,大家也基本上知道了registry.conffile.conf的关系了。

Configuration作为单例放在ConfigurationFactory中,所以Configuration的初始化逻辑也是在ConfigurationFactory中,大概流程如下: 1、先从registry.conf文件中读取config.type属性,默认就是file

configTypeName = CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR+ ConfigurationKeys.FILE_ROOT_TYPE);

2、基于config.type属性值加载配置中心,比如默认是:file,则先从registry.conf文件中读取config.file.name读取本地文件配置中心对应的文件名,然后基于该文件名创建FileConfiguration对象,这样就将file.conf中的配置加载到内存中了。同理,如果配置的是其他配置中心,则会通过SPI初始化其他配置中心。还有一点需要注意的是,在这阶段,如果配置中心是本地文件,则会为其创建代理对象;如果不是本地文件,则通过SPI加载对应的配置中心

if (ConfigType.File == configType) {
String pathDataId = String.join("config.file.name");
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
configuration = new FileConfiguration(name);
try {
// 配置增强 代理
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}

3、基于步骤2创建的Configuration对象,为其再创建一层代理,这个代理对象有两个作用:一个是本地缓存,不需要每次获取配置的时候都从配置中获取;另一个是监听,当配置发生变更会更新它维护的缓存。如下:

if (null != extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}

到这里,配置管理的初始化就完成了。Seata通过先先加载registry.conf文件获取对应的配置中心信息、注册中心,然后再根据获取到的的对应信息初始化配置中心。在使用本地文件作为配置中心的情况下,默认是加载file.conf文件。然后再为对应的配置中心创建对应的代理对象,使其支持本地缓存和配置监听

整理流程还是比较简单,在这里我要抛出几个问题:

  1. 什么是配置增强?Seata中的配置增强是怎么做的?
  2. 如果使用本地文件作为配置中心,就必须要将配置定义在file.conf文件中。如果是Spring应用,能不能直接将对应的配置项定义在application.yaml中?
  3. 在上面说的步骤2中,为什么在使用本地文件配置中心的情况下,要先为Configuration创建对应配置增强代理对象,而其他配置中心不用?

这3个问题都是紧密联系的,都和Seata的配置增加相关。下面详细介绍

配置管理增强

配置增强,简单来说就是为其创建一个代理对象,在执行目标独对象的目标方法时候,执行代理逻辑,从配置中心的角度来讲,就是在通过配置中心获取对应配置的时候,执行代理逻辑。

  1. 通过ConfigurationFactory.CURRENT_FILE_INSTANCE.getgetConfig(key)获取配置
  2. 加载registry.conf文件创建FileConfiguration对象configuration
  3. 基于SpringBootConfigurationProviderconfiguration创建代理对象configurationProxy
  4. configurationProxy中获取配置中心的连接信息file zk nacos 等
  5. 基于连接信息创建配中心Configuration对象configuration2
  6. 基于SpringBootConfigurationProviderconfiguration2创建代理对象configurationProxy2
  7. 执行configurationProxy2的代理逻辑
  8. 基于key找到对应的SpringBean
  9. 执行SpringBean的getXxx方法

配置增强实现

上面也简单提到过配置增强,相关代码如下:

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
  1. 首先通过SPI机获取一个ExtConfigurationProvider对象,在Seata中默认只有一个实现:SpringBootConfigurationProvider
  2. 通过ExtConfigurationProvider#provider方法为Configuration创建代理对象

核心代码如下:

public Configuration provide(Configuration originalConfiguration) {
return (Configuration) Enhancer.create(originalConfiguration.getClass(), new MethodInterceptor() {
@Override
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy)
throws Throwable {
if (method.getName().startsWith("get") && args.length > 0) {
Object result = null;
String rawDataId = (String) args[0];
if (args.length == 1) {
result = get(convertDataId(rawDataId));
} else if (args.length == 2) {
result = get(convertDataId(rawDataId), args[1]);
} else if (args.length == 3) {
result = get(convertDataId(rawDataId), args[1], (Long) args[2]);
}
if (result != null) {
//If the return type is String,need to convert the object to string
if (method.getReturnType().equals(String.class)) {
return String.valueOf(result);
}
return result;
}
}

return method.invoke(originalConfiguration, args);
}
});
}

private Object get(String dataId) throws IllegalAccessException, InstantiationException {
String propertyPrefix = getPropertyPrefix(dataId);
String propertySuffix = getPropertySuffix(dataId);
ApplicationContext applicationContext = (ApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT);
Class<?> propertyClass = PROPERTY_BEAN_MAP.get(propertyPrefix);
Object valueObject = null;
if (propertyClass != null) {
try {
Object propertyBean = applicationContext.getBean(propertyClass);
valueObject = getFieldValue(propertyBean, propertySuffix, dataId);
} catch (NoSuchBeanDefinitionException ignore) {

}
} else {
throw new ShouldNeverHappenException("PropertyClass for prefix: [" + propertyPrefix + "] should not be null.");
}
if (valueObject == null) {
valueObject = getFieldValue(propertyClass.newInstance(), propertySuffix, dataId);
}

return valueObject;
}

1、如果方法是以get开头,并且参数个数为1/2/3,则执行其他的获取配置的逻辑,否则执行原生Configuration对象的逻辑 2、我们没必要纠结为啥是这样的规则,这就是Seata的一个约定 3、其他获取配置的逻辑,就是指通过Spring的方式获取对应配置值

到这里已经清楚了配置增强的原理,同时,也可以猜测得出唯一的ExtConfigurationProvider实现SpringBootConfigurationProvider,肯定是和Spring相关

配置增强与Spring

在介绍这块内容之前,我们先简单介绍一下Seata的使用方式:

  1. 非Starter方式:引入依赖 seata-all, 然后手动配置几个核心的Bean
  2. Starter方式: 引入依赖seata-spring-boot-starter,全自动准配,不需要自动注入核心Bean

SpringBootConfigurationProvider就在seata-spring-boot-starter模块中,也就是说,当我们通过引入seata-all的方式来使用Seata时,配置增强其实没有什么作用,因为此时根本找不到ExtConfigurationProvider实现类,自然就不会增强。

seata-spring-boot-starter是如何将这些东西串联起来的?

1、首先,在seata-spring-boot-starter模块的resources/META-INF/services目录下,存在一个spring.factories文件,内容分如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\

# 暂时不管
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration

2、在SeataAutoConfiguration文件中,会创建以下Bean: GlobalTransactionScanner 、SeataDataSourceBeanPostProcessor、SeataAutoDataSourceProxyCreator、SpringApplicationContextProvider。前3个和我们本文要讲的内容不相关,主要关注SpringApplicationContextProvider,核心代码非常简单,就是将ApplicationContext保存下来:

public class SpringApplicationContextProvider implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
}
}

3、然后,在SeataAutoConfiguration文件中,还会将一些xxxProperties.Class和对应的Key前缀缓存到PROPERTY_BEAN_MAP中。``xxxProperties就简单理解成application.yaml`中的各种配置项:

static {
PROPERTY_BEAN_MAP.put(SEATA_PREFIX, SeataProperties.class);
PROPERTY_BEAN_MAP.put(CLIENT_RM_PREFIX, RmProperties.class);
PROPERTY_BEAN_MAP.put(SHUTDOWN_PREFIX, ShutdownProperties.class);
...省略...
}

至此,整个流程其实已经很清晰,在有SpringBootConfigurationProvider配置增强的时候,我们获取一个配置项的流程如下:

  1. 先根据p配置项Key获取对应的xxxProperties对象
  2. 通过ObjectHolder中的ApplicationContext获取对应xxxProperties的SpringBean
  3. 基于xxxProperties的SpringBean获取对应配置的值
  4. 至此,通过配置增强,我们成功的获取到application.yaml中的值

· 阅读需 19 分钟

作者 | 刘晓敏 于雨

一、简介

Java 的世界里,大家广泛使用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发,作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个大家庭,并改名 dubbo-getty

18 年的时候,我在公司里实践微服务,当时遇到最大的问题就是分布式事务问题。同年,阿里在社区开源他们的分布式事务解决方案,我也很快关注到这个项目,起初还叫 fescar,后来更名 seata。由于我对开源技术很感兴趣,加了很多社区群,当时也很关注 dubbo-go 这个项目,在里面默默潜水。随着对 seata 的了解,逐渐萌生了做一个 go 版本的分布式事务框架的想法。

要做一个 golang 版的分布式事务框架,首要的一个问题就是如何实现 RPC 通信。dubbo-go 就是很好的一个例子摆在眼前,遂开始研究 dubbo-go 的底层 getty。

二、如何基于 getty 实现 RPC 通信

getty 框架的整体模型图如下:

image.png

下面结合相关代码,详述 seata-golang 的 RPC 通信过程。

1. 建立连接

实现 RPC 通信,首先要建立网络连接吧,我们从 client.go 开始看起。

func (c *client) connect() {
var (
err error
ss Session
)

for {
// 建立一个 session 连接
ss = c.dial()
if ss == nil {
// client has been closed
break
}
err = c.newSession(ss)
if err == nil {
// 收发报文
ss.(*session).run()
// 此处省略部分代码

break
}
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
}
}

connect() 方法通过 dial() 方法得到了一个 session 连接,进入 dial() 方法:

func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT:
return c.dialTCP()
case UDP_CLIENT:
return c.dialUDP()
case WS_CLIENT:
return c.dialWS()
case WSS_CLIENT:
return c.dialWSS()
}

return nil
}

我们关注的是 TCP 连接,所以继续进入 c.dialTCP() 方法:

func (c *client) dialTCP() Session {
var (
err error
conn net.Conn
)

for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
// 建立加密连接
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// 建立 tcp 连接
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// 返回一个 TCPSession
return newTCPSession(conn, c)
}

log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}

至此,我们知道了 getty 如何建立 TCP 连接,并返回 TCPSession。

2. 收发报文

那它是怎么收发报文的呢,我们回到 connection 方法接着往下看,有这样一行 ss.(*session).run(),在这行代码之后代码都是很简单的操作,我们猜测这行代码运行的逻辑里面一定包含收发报文的逻辑,接着进入 run() 方法:

func (s *session) run() {
// 省略部分代码

go s.handleLoop()
go s.handlePackage()
}

这里起了两个 goroutine,handleLoophandlePackage,看字面意思符合我们的猜想,进入 handleLoop() 方法:

func (s *session) handleLoop() {
// 省略部分代码

for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
// 省略部分代码

case outPkg, ok = <-s.wQ:
// 省略部分代码

iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// 通过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特
pkgBytes, err = s.writer.Write(s, outPkg)
// 省略部分代码

iovec = append(iovec, pkgBytes)

//省略部分代码
}
// 将这些二进制比特发送出去
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}

case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// 定时执行的逻辑,心跳等
s.listener.OnCron(s)
}
}
}
}

通过上面的代码,我们不难发现,handleLoop() 方法处理的是发送报文的逻辑,RPC 需要发送的消息首先由 s.writer 编码成二进制比特,然后通过建立的 TCP 连接发送出去。这个 s.writer 对应的 Writer 接口是 RPC 框架必须要实现的一个接口。

继续看 handlePackage() 方法:

func (s *session) handlePackage() {
// 省略部分代码

if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}

err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}

进入 handleTCPPackage() 方法:

func (s *session) handleTCPPackage() error {
// 省略部分代码

conn = s.Connection.(*gettyTCPConn)
for {
// 省略部分代码

bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// 从 TCP 连接中收到报文
bufLen, err = conn.recv(buf)
// 省略部分代码

break
}
// 省略部分代码

// 将收到的报文二进制比特写入 pkgBuf
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
break
}
// 通过 s.reader 将收到的报文解码成 RPC 消息
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// 省略部分代码

s.UpdateActive()
// 将收到的消息放入 TaskQueue 供 RPC 消费端消费
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
if exit {
break
}
}

return perrors.WithStack(err)
}

从上面的代码逻辑我们分析出,RPC 消费端需要将从 TCP 连接收到的二进制比特报文解码成 RPC 能消费的消息,这个工作由 s.reader 实现,所以,我们要构建 RPC 通信层也需要实现 s.reader 对应的 Reader 接口。

3. 底层处理网络报文的逻辑如何与业务逻辑解耦

我们都知道,netty 通过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么,getty 是如何实现的呢?

handlePackage() 方法最后,我们看到,收到的消息被放入了 s.addTask(pkg) 这个方法,接着往下分析:

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()
}

pkg 参数传递到了一个匿名方法,这个方法最终放入了 taskPool。这个方法很关键,在我后来写 seata-golang 代码的时候,就遇到了一个坑,这个坑后面分析。

接着我们看一下 taskPool 的定义:

// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
}
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}

构建了一个缓冲大小为 size (默认为  runtime.NumCPU() * 100) 的 channel sem。再看方法 AddTaskAlways(t task)

func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done:
return
default:
}

select {
case p.work <- t:
return
default:
}
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
}
}

加入的任务,会先由 len(p.sem) 个 goroutine 去消费,如果没有 goroutine 空闲,则会启动一个临时的 goroutine 去运行 t()。相当于有  len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去处理业务逻辑,而不是由处理网络报文的 goroutine 去运行业务逻辑,从而实现了解耦。写 seata-golang 时遇到的一个坑,就是忘记设置 taskPool 造成了处理业务逻辑和处理底层网络报文逻辑的 goroutine 是同一个,我在业务逻辑中阻塞等待一个任务完成时,阻塞了整个 goroutine,使得阻塞期间收不到任何报文。

4. 具体实现

下面的代码见 getty.go

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}

// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
}

// ReadWriter interface use for handle application packages
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error

// invoked when session closed.
OnClose(Session)

// invoked when got error.
OnError(Session, error)

// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)

// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}

通过对整个 getty 代码的分析,我们只要实现  ReadWriter 来对 RPC  消息编解码,再实现 EventListener 来处理 RPC 消息的对应的具体逻辑,将 ReadWriter 实现和 EventLister 实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通信。

4.1 编解码协议实现

下面是 seata 协议的定义: image-20201205214556457.png

在 ReadWriter 接口的实现 RpcPackageHandler 中,调用 Codec 方法对消息体按照上面的格式编解码:

// 消息编码为二进制比特
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA:
return SeataEncoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil
}
}

// 二进制比特解码为消息体
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA:
return SeataDecoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

4.2 Client 端实现

再来看 client 端 EventListener 的实现 RpcRemotingClient

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
ApplicationId: client.conf.ApplicationId,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
// 建立连接后向 Transaction Coordinator 发起注册 TransactionManager 的请求
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// 将与 Transaction Coordinator 建立的连接保存在连接池供后续使用
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()

return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)

// 处理事务消息,提交 or 回滚
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// 发送心跳
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

clientSessionManager.RegisterGettySession(session) 的逻辑 4.4 小节分析。

4.3 Server 端 Transaction Coordinator 实现

代码见 DefaultCoordinator

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
return nil
}

func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
// 释放 TCP 连接
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......", session.Stat())
}

func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// 将 TransactionManager 信息和 TCP 连接建立映射关系
coordinator.OnRegTmMessage(rpcMessage, session)
return
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
return
}

if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// 将 ResourceManager 信息和 TCP 连接建立映射关系
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
defer func() {
if err := recover(); err != nil {
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// 处理事务消息,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("close a unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}

func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {

}

coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑分析见 4.4 小节。 消息进入 coordinator.OnTrxMessage(rpcMessage, session) 方法,将按照消息的类型码路由到具体的逻辑当中:

	switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus:
req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport:
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit:
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback:
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister:
req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport:
req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default:
return nil
}

4.4 session manager 分析

Client 端同 Transaction Coordinator 建立连接起连接后,通过 clientSessionManager.RegisterGettySession(session) 将连接保存在 serverSessions = sync.Map{} 这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就可以通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个连接既有可能用来发送 TM 消息也有可能用来发送 RM 消息。我们通过 RpcContext 来标识一个连接信息:

type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.Set
Session getty.Session
}

当收到事务消息时,我们需要构造这样一个 RpcContext 供后续事务处理逻辑使用。所以,我们会构造下列 map 来缓存映射关系:

var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered,
// it will be the RM registered
session_transactionroles = sync.Map{}

// session -> applicationId
identified_sessions = sync.Map{}

// applicationId -> ip -> port -> session
client_sessions = sync.Map{}

// applicationId -> resourceIds
client_resources = sync.Map{}
)

这样,Transaction Manager 和 Resource Manager 分别通过 coordinator.OnRegTmMessage(rpcMessage, session)coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在 client_resources map 中缓存 applicationId 与 resourceIds(一个应用可能存在多个 Resource Manager) 的关系。在需要时,我们就可以通过上述映射关系构造一个 RpcContext。这部分的实现和 java 版 seata 有很大的不同,感兴趣的可以深入了解一下。具体代码见 getty_session_manager.go 至此,我们就分析完了 seata-golang 整个 RPC 通信模型的机制。

三、seata-golang 的未来

seata-golang  从今年 4 月份开始开发,到 8 月份基本实现和 java 版 seata 1.2 协议的互通,对 mysql 数据库实现了 AT 模式(自动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端使用 mysql 存储数据,使 TC 变成一个无状态应用支持高可用部署。下图展示了 AT 模式的原理:image20201205-232516.png

后续,还有许多工作可以做,比如:对注册中心的支持、对配置中心的支持、和 java 版 seata 1.4 的协议互通、其他数据库的支持、raft transaction coordinator 的实现等,希望对分布式事务问题感兴趣的开发者可以加入进来一起来打造一个完善的 golang 的分布式事务框架。

如果你有任何疑问,欢迎钉钉扫码加入交流群【钉钉群号 33069364】:

作者简介

刘晓敏 (GitHubID dk-lockdown),目前就职于 h3c 成都分公司,擅长使用 Java/Go 语言,在云原生和微服务相关技术方向均有涉猎,目前专攻分布式事务。 于雨(github @AlexStocks),dubbo-go 项目和社区负责人,一个有十多年服务端基础架构研发一线工作经验的程序员,陆续参与改进过 Muduo/Pika/Dubbo/Sentinel-go 等知名项目,目前在蚂蚁金服可信原生部从事容器编排和 service mesh 工作。

参考资料

seata 官方:https://seata.apache.org

java 版 seata:https://github.com/apache/incubator-seata

seata-golang 项目地址:https://github.com/apache/incubator-seata-go

seata-golang go 夜读 b站分享:https://www.bilibili.com/video/BV1oz411e72T

· 阅读需 37 分钟

在Seata1.3.0版本中,数据源自动代理和手动代理一定不能混合使用,否则会导致多层代理,从而导致以下问题:

  1. 单数据源情况下:导致分支事务提交时,undo_log本身也被代理,即为 undo_log 生成了 undo_log, 假设为undo_log2,此时undo_log将被当作分支事务来处理;分支事务回滚时,因为undo_log2生成的有问题,在undo_log对应的事务分支回滚时会将业务表关联的undo_log也一起删除,从而导致业务表对应的事务分支回滚时发现undo_log不存在,从而又多生成一条状态为1的undo_log。这时候整体逻辑已经乱了,很严重的问题
  2. 多数据源和逻辑数据源被代理情况下:除了单数据源情况下会出现的问题,还可能会造成死锁问题。死锁的原因就是针对undo_log的操作,本该在一个事务中执行的select for updatedelete 操作,被分散在多个事务中执行,导致一个事务在执行完select for update后一直不提交,一个事务在执行delete时一直等待锁,直到超时

代理描述

即对DataSource代理一层,重写一些方法。比如getConnection方法,这时不直接返回一个Connection,而是返回ConnectionProxy,其它的以此类推

// DataSourceProxy

public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}

private void init(DataSource dataSource, String resourceGroupId) {
DefaultResourceManager.get().registerResource(this);
}

public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection();
}

@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}

手动代理

即手动注入一个DataSourceProxy,如下

@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}

自动代理

针对DataSource创建一个代理类,在代理类里面基于DataSource获取DataSourceProxy(如果没有就创建),然后调用DataSourceProxy的相关方法。核心逻辑在SeataAutoDataSourceProxyCreator

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes;
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());

public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}

@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}

@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}

@Override
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}

数据源多层代理

@Bean
@DependsOn("strangeAdapter")
public DataSource druidDataSource(StrangeAdapter strangeAdapter) {
doxx
return new DruidDataSource()
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
  1. 首先我们在配置类里面注入了两个DataSource,分别为: DruidDataSourceDataSourceProxy, 其中DruidDataSource 作为 DataSourceProxy 的 targetDataSource 属性,并且DataSourceProxy为使用了@Primary注解声明
  2. 应用默认开启了数据源自动代理,所以在调用DruidDataSource相关方法时,又会为为DruidDataSource创建一个对应的数据源代理DataSourceProxy2
  3. 当我们在程序中想获取一个Connection时会发生什么?
    1. 先获取一个DataSource,因为DataSourceProxyPrimary,所以此时拿到的是DataSourceProxy
    2. 基于DataSource获取一个Connection,即通过DataSourceProxy获取Connection。此时会先调用targetDataSource 即 DruidDataSource 的 getConnection 方法,但因为切面会对DruidDataSource进行拦截,根据步骤2的拦截逻辑可以知道,此时会自动创建一个DataSourceProxy2,然后调用DataSourceProxy2#getConnection,然后再调用DruidDataSource#getConnection。最终形成了双层代理, 返回的Connection也是一个双层的ConnectionProxy

上面其实是改造之后的代理逻辑,Seata默认的自动代理会对DataSourceProxy再次进行代理,后果就是代理多了一层此时对应的图如下

数据源多层代理会导致的两个问题在文章开头处已经总结了,下面会有案例介绍。

分支事务提交

通过ConnectionProxy中执行对应的方法,会发生什么?以update操作涉及到的一个分支事务提交为例:

  1. 执行ConnectionProxy#prepareStatement, 返回一个PreparedStatementProxy
  2. 执行PreparedStatementProxy#executeUpdatePreparedStatementProxy#executeUpdate大概会帮做两件事情: 执行业务SQL和提交undo_log

提交业务SQL

// ExecuteTemplate#execute
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}

主要流程就是: 先执行业务SQL,然后执行ConnectionProxy的commit方法,在这个方法中,会先帮我们执行对应的 undo_log SQL,然后提交事务

PreparedStatementProxy#executeUpdate => 
ExecuteTemplate#execute =>
BaseTransactionalExecutor#execute =>
AbstractDMLBaseExecutor#doExecute =>
AbstractDMLBaseExecutor#executeAutoCommitTrue =>
AbstractDMLBaseExecutor#executeAutoCommitFalse => 在这一步操中,会触发statementCallback#execute方法,即调用调用原生PreparedStatement#executeUpdate方法
ConnectionProxy#commit
ConnectionProxy#processGlobalTransactionCommit

UNDO_LOG插入

// ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务,简单理解向server发一个请求,然后server在branch_table表里插入一条记录,不关注
register();
} catch (TransactionException e) {
// 如果没有for update 的sql,会直接在commit之前做注册,此时不止插入一条branch记录,而会附带锁信息进行竞争,下方的异常一般就是在注册时没拿到锁抛出,一般就是纯update语句的并发下会触发竞争锁失败的异常 @FUNKYE
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// undo_log处理,期望用 targetConnection 处理 @1
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);

// 提交本地事务,期望用 targetConnection 处理 @2
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
  1. undo_log处理@1,解析当前事务分支涉及到的undo_log,然后使用TargetConnection, 写到数据库
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}

String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();

BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}

insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,cp.getTargetConnection());
}
  1. 提交本地事务@2,即通过TargetConnection提交事务。即 务SQL执行undo_log写入即事务提交 用的都是同一个TargetConnection

lcn的内置数据库方案,lcn是将undolog写到他内嵌的h2(忘了是不是这个来着了)数据库上,此时会变成2个本地事务,一个是h2的undolog插入事务,一个是业务数据库的事务,如果在h2插入后,业务数据库异常,lcn的方案就会出现数据冗余,回滚数据的时候也是一样,删除undolog跟回滚业务数据不是一个本地事务. 但是lcn这样的好处就是入侵小,不需要另外添加undolog表。 感谢@FUNKYE大佬给的建议,对lcn不太了解,有机会好好研究一下

分支事务回滚

  1. Server端向Client端发送回滚请求
  2. Client端接收到Server发过来的请求,经过一系列处理,最终会到DataSourceManager#branchRollback方法
  3. 先根据resourceId从DataSourceManager.dataSourceCache中获取对应的DataSourceProxy,此时为masterSlaveProxy(回滚阶段我们就不考代理数据源问题,简单直接一些,反正最终拿到的都是TragetConnection)
  4. 根据Server端发过来的xid和branchId查找对应的undo_log并解析其rollback_info属性,每条undo_log可能会解析出多条SQLUndoLog,每个SQLUndoLog可以理解成是一个操作。比如一个分支事务先更新A表,再更新B表,这时候针对该分支事务生成的undo_log就包含两个SQLUndoLog:第一个SQLUndoLog对应的是更新A表的前后快照;第二个SQLUndoLog对应的是更新B表的前后快照
  5. 针对每条SQLUndoLog执行对应的回滚操作,比如一个SQLUndoLog对应的操作是INSERT,则其对应的回滚操作就是DELETE
  6. 根据xid和branchId删除该undo_log
// AbstractUndoLogManager#undo   删除了部分非关键代码

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;

for (; ; ) {
try {
// 获取原生数据源的Connection, 回滚阶段我们不管代理数据源问题,最终拿到的都是 TargetConnection
conn = dataSourceProxy.getPlainConnection();

// 将回滚操作放在一个本地事务中,手动提交,确保最终业务SQL操作和undo_log删除操作一起提交
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}

// 根据xid 和 branchId 查询 undo_log,注意此时的SQL语句 SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();

boolean exists = false;
while (rs.next()) {
exists = true;
// status == 1 undo_log不处理,和防悬挂相关
if (!canUndo(state)) {
return;
}

// 解析undo_log
byte[] rollbackInfo = getRollbackInfo(rs);
BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance(serializer).decode(rollbackInfo);
try {
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
// 执行对应的回滚操作
undoExecutor.executeOn(conn);
}
}
}

//
if (exists) {
LOGGER.error("\n delete from undo_log where xid={} AND branchId={} \n", xid, branchId);
deleteUndoLog(xid, branchId, conn);
conn.commit();
// 和防悬挂相关 如果根据 xid和branchId 没有查到undo_log,说明这个分支事务有异常:例如业务处理超时,导致全局事务回滚,但这时候业务undo_log并没有插入
} else {
LOGGER.error("\n insert into undo_log xid={},branchId={} \n", xid, branchId);
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return;
} catch (Throwable e) {
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,branchId, e.getMessage()), e);
}
}
}

有以下几个注意点:

  1. 回滚时不考虑数据源代理问题,最终都是使用TargetConnection
  2. 设置atuoCommit为false,即需要手动提交事务
  3. 根据xid和branchId查询undo_log时加了for update,也就是说,这个事务会持有这条undo_log的锁直到所有回滚操作都完成,因为完成之后才会

多层代理问题

数据源多层代理会导致的几个问题在文章开头的时候已经提到过了,重点分析一下为什么会造成以上问题:

对分支事务提交的影响

先分析一下,如果使用双层代理会发生什么?我们从两个方面来分析:业务SQLundo_log

  1. 业务SQL
PreparedStatementProxy1.executeUpdate => 
statementCallback#executeUpdate(PreparedStatementProxy2#executeUpdate) =>
PreparedStatement#executeUpdate

好像没啥影响,就是多绕了一圈,最终还是通过PreparedStatement执行

  1. undo_log
ConnectionProxy1#getTargetConnection -> 
ConnectionProxy2#prepareStatement ->
PreparedStatementProxy2#executeUpdate ->
PreparedStatement#executeUpdate(原生的undo_log写入,在此之前会对为该 undo_log 生成 undo_log2(即 undo_log 的 undo_log)) ->
ConnectionProxy2#commit ->
ConnectionProxy2#processGlobalTransactionCommit(写入undo_log2) ->
ConnectionProxy2#getTargetConnection ->
TargetConnection#prepareStatement ->
PreparedStatement#executeUpdate

对分支事务回滚的影响

在事务回滚之后,为何undo_log没有被删除呢?

其实并不是没有被删除。前面已经说过,双层代理会导致undo_log被当作分支事务来处理,所以也会为该 undo_log生成一个undo_log(假设为undo_log2),而undo_log2生成的有问题(其实也没问题,就应该这样生成),从而导致回滚时会将业务表关联的undo_log也一起删除,最终导致业务表对应的事务分支回滚时发现undo_log不存在,从而又多生成一条状态为为1的undo_log

回滚之前

// undo_log
84 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.1KB 0
85 59734075254222849 172.16.120.59:23004:59734061438185472 serializer=jackson 4.0KB 0

// branch_table
59734070967644161 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware
59734075254222849 172.16.120.59:23004:59734061438185472 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// lock_table
jdbc:mysql://xx^^^seata_storage^^^1 59734070967644161 jdbc:mysql://172.16.248.10:3306/tuya_middleware seata_storage 1
jdbc:mysql://xx^^^undo_log^^^84 59734075254222849 jdbc:mysql://172.16.248.10:3306/tuya_middleware undo_log 84

回滚之后

// 生成了一条状态为1的undo_log,对应的日志为: undo_log added with GlobalFinished
86 59734070967644161 172.16.120.59:23004:59734061438185472 serializer=jackson 1.0Byte 1

问题分析

  1. 根据xid和branchId找到对应的undo_log日志
  2. 对undo_log进行解析,主要就是解析它的rollback_info字段,rollback_info解析出来就是一个SQLUndoLog集合,每条SQLUndoLog对应着一个操作,里面包含了该操作的前后的快照,然后执行对应的回滚
  3. 根据xid和branchId删除undo_log日志

因为双层代理问题,导致一条undo_log变成了一个分支事务,所以发生回滚时,我们也需要对undo_log分支事务进行回滚: 1、首先根据xid和branchId找到对应的undo_log并解析其rollback_info属性,这里解析出来的rollback_info包含了两条SQLUndoLog。为什么有两条?

仔细想想也可以可以理解,第一层代理针对seata_storage的操作,放到缓存中,本来执行完之后是需要清掉的,但因为这里是双层代理,所以这时候这个流程并没有结束。轮到第二层代理对undo_log操作时,将该操作放到缓存中,此时缓存中有两个操作,分别为seata_storage的UPDATEundo_log的INSERT。所以这也就很好理解为什么针对undo_log操作的那条undo_log格外大(4KB),因为它的rollback_info包含了两个操作。

有一点需要注意的是,第一条SQLUndoLog对应的after快照,里面的branchId=59734070967644161 pk=84, 即 seata_storage分支对应的branchIdseata_storage对应的undo_log PK。也就是说,undo_log回滚时候 把seata_storage对应的undo_log删掉了。 那undo_log本身对应的undo_log 如何删除呢?在接下来的逻辑中会根据xid和branchId删除

2、解析第一条SQLUndoLog,此时对应的是undo_log的INSERT操作,所以其对应的回滚操作是DELETE。因为undo_log此时被当作了业务表。所以这一步会将59734075254222849对应的undo_log删除,但这个其实是业务表对应的对应的undo_log

3、解析第二条SQLUndoLog,此时对应的是seata_storage的UPDATE操作,这时会通过快照将seata_storage对应的记录恢复

4、根据xid和branchId删除undo_log日志,这里删除的是undo_log 的 undo_log , 即 undo_log2。所以,执行到这里,两条undo_log就已经被删除了

5、接下来回滚seata_storage,因为这时候它对应的undo_log已经在步骤2删掉了,所以此时查不到undo_log,然后重新生成一条status == 1 的 undo_log

案例分析

背景

1、配置了三个数据源: 两个物理数据源、一个逻辑数据源,但是两个物理数据源对应的连接地址是一样的。这样做有意思吗?

@Bean("dsMaster")
DynamicDataSource dsMaster() {
return new DynamicDataSource(masterDsRoute);
}

@Bean("dsSlave")
DynamicDataSource dsSlave() {
return new DynamicDataSource(slaveDsRoute);
}

@Primary
@Bean("masterSlave")
DataSource masterSlave(@Qualifier("dsMaster") DataSource dataSourceMaster,
@Qualifier("dsSlave") DataSource dataSourceSlave) throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>(2);
//主库
dataSourceMap.put("dsMaster", dataSourceMaster);
//从库
dataSourceMap.put("dsSlave", dataSourceSlave);
// 配置读写分离规则
MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(
"masterSlave", "dsMaster", Lists.newArrayList("dsSlave")
);
Properties shardingProperties = new Properties();
shardingProperties.setProperty("sql.show", "true");
shardingProperties.setProperty("sql.simple", "true");
// 获取数据源对象
DataSource dataSource = MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveRuleConfig, shardingProperties);
log.info("datasource initialized!");
return dataSource;˚
}

2、开启seata的数据源动态代理,根据seata的数据源代理逻辑可以知道,最终会生成三个代理数据源,原生数据源和代理数据源的关系缓存在DataSourceProxyHolder.dataSourceProxyMap中,假如原生数据源和代理数据源对应的关系如下:

dsMaster(DynamicDataSource)           =>       dsMasterProxy(DataSourceProxy)
dsSlave(DynamicDataSource) => dsSlaveProxy(DataSourceProxy)
masterSlave(MasterSlaveDataSource) => masterSlaveProxy(DataSourceProxy)

所以,最终在IOC容器中存在的数据源是这三个: dsMasterProxy 、 dsSlaveProxy 、 masterSlaveProxy 。根据@Primary的特性可以知道,当我们从容器中获取一个DataSource的时候,默认返回的就是代理数据源 masterSlaveProxy

对shardingjdbc没有具体的研究过,只是根据debug时看到的代码猜测它的工作机制,又不对的地方,还请大佬指出来

masterSlaveProxy可以看成是被 DataSourceProxy 包装后的 MasterSlaveDataSource。我们可以大胆的猜测MasterSlaveDataSource并不是一个物理数据源,而是一个逻辑数据源,可以简单的认为里面包含了路由的逻辑。当我们获取一个连接时,会通过里面的路由规则选择到具体的物理数据源,然后通过该物理数据源获取一个真实的连接。 路由规则应该可以自己定义,根据debug时观察到的现象,默认的路由规则应该是:

  1. 针对select 读操作,会路由到从库,即我们的 dsSlave
  2. 针对update 写操作,会路由到主库,即我们的 dsMaster

3、每个DataSourceProxy在初始化的时候,会解析该真实DataSource的连接地址,然后将该连接地址和DataSourceProxy本身维护DataSourceManager.dataSourceCache中。DataSourceManager.dataSourceCache有一个作用是用于回滚:回滚时根据连接地址找到对应的DataSourceProxy,然后基于该DataSourceProxy做回滚操作。 但我们可以发现这个问题,这三个数据源解析出来的连接地址是一样的,也就是key重复了,所以在DataSourceManager.dataSourceCache中中,当连接地相同时,后注册的数据源会覆盖已存在的。即: DataSourceManager.dataSourceCache最终存在的是masterSlaveProxy,也就是说,最终会通过masterSlaveProxy进行回滚,这点很重要。

4、涉及到的表:很简单,我们期待的就一个业务表seata_account,但因为重复代理问题,导致seata将undo_log也当成了一个业务表

  1. seata_account
  2. undo_log

好了,这里简单介绍一下背景,接下来进入Seata环节

需求

我们的需求很简单,就是在分支事务里面执行一条简单的update操作,更新seata_account的count值。在更新完之后,手动抛出一个异常,触发全局事务的回滚。 为了更便于排查问题,减少干扰,我们全局事务中就使用一个分支事务,没有其它分支事务了。SQL如下:

update seata_account set count = count - 1 where id = 100;

问题现象

Client:在控制台日志中,不断重复打印以下日志

  1. 以上日志打印的间隔为20s,而我查看了数据库的innodb_lock_wait_timeout属性值,刚好就是20,说明每次回滚请求过来的时候,都因为获取锁超时(20)而回滚失败
  2. 为什么会没过20s打印一次?因为Server端会有定时处理回滚请求
// 分支事务开始回滚
Branch rollback start: 172.16.120.59:23004:59991911632711680 59991915571163137 jdbc:mysql://172.16.248.10:3306/tuya_middleware

// undo_log事务分支 原始操作对应是 insert, 所以其回滚为 delete
undoSQL undoSQL=DELETE FROM undo_log WHERE id = ? , PK=[[id,139]]
// 因为第一层代理对应的操作也在上下文中,undo_log分支事务 提交时候, 对应的undo_log包含两个操作
undoSQL undoSQL=UPDATE seata_account SET money = ? WHERE id = ? , PK=[[id,1]]

// 该分支事务回滚完成之后,再删除该分支事务的对应的 undo_log
delete from undo_log where xid=172.16.120.59:23004:59991911632711680 AND branchId=59991915571163137

// 抛出异常,提示回滚失败,失败原因是`Lock wait timeout exceeded`, 在根据xid和branchId删除undo_log时失败,失败原因是获取锁超时,说明此时有另一个操作持有该记录的锁没有释放
branchRollback failed. branchType:[AT], xid:[172.16.120.59:23004:59991911632711680], branchId:[59991915571163137], resourceId:[jdbc:mysql://172.16.248.10:3306/tuya_middleware], applicationData:[null]. reason:[Branch session rollback failed and try again later xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137 Lock wait timeout exceeded; try restarting transaction]

Server:每20s打印以下日志,说明server在不断的重试发送回滚请求

Rollback branch transaction fail and will retry, xid = 172.16.120.59:23004:59991911632711680 branchId = 59991915571163137

在该过程中,涉及到的SQL大概如下:

1. SELECT * FROM undo_log WHERE branch_id = ? AND xid = ? FOR UPDATE							slaveDS
2. SELECT * FROM undo_log WHERE (id ) in ( (?) ) slaveDS
3. DELETE FROM undo_log WHERE id = ? masterDS
4. SELECT * FROM seata_account WHERE (id ) in ( (?) ) masterDS
5. UPDATE seata_account SET money = ? WHERE id = ? masterDS
6. DELETE FROM undo_log WHERE branch_id = ? AND xid = ? masterDS

此时查看数据库的 事务情况、锁情况 、锁等待关系 1、查当前正在执行的事务

SELECT * FROM information_schema.INNODB_TRX;

2、查当前锁情况

SELECT * FROM information_schema.INNODB_LOCKs;

3、查当前锁等待关系

SELECT * FROM information_schema.INNODB_LOCK_waits;

SELECT
block_trx.trx_mysql_thread_id AS 已经持有锁的sessionID,
request_trx.trx_mysql_thread_id AS 正在申请锁的sessionID,
block_trx.trx_query AS 已经持有锁的SQL语句,
request_trx.trx_query AS 正在申请锁的SQL语句,
waits.blocking_trx_id AS 已经持有锁的事务ID,
waits.requesting_trx_id AS 正在申请锁的事务ID,
waits.requested_lock_id AS 锁对象的ID,
locks.lock_table AS lock_table, -- 锁对象所锁定的表
locks.lock_type AS lock_type, -- 锁类型
locks.lock_mode AS lock_mode -- 锁模式
FROM
information_schema.innodb_lock_waits AS waits
INNER JOIN information_schema.innodb_trx AS block_trx ON waits.blocking_trx_id = block_trx.trx_id
INNER JOIN information_schema.innodb_trx AS request_trx ON waits.requesting_trx_id = request_trx.trx_id
INNER JOIN information_schema.innodb_locks AS locks ON waits.requested_lock_id = locks.lock_id;

  1. 涉及到到记录为 branch_id = 59991915571163137 AND xid = 172.16.120.59:23004:59991911632711680
  2. 事务ID1539483284持有该记录的锁,但是它对应的SQL为空,那应该是在等待commit
  3. 事务ID1539483286在尝试获取该记录的锁,但从日志可以发现,它一直锁等待超时

大概可以猜测是 select for updatedelete from undo ... 发生了冲突。根据代码中的逻辑,这两个操作应该是放在一个事务中提交了,为什么被分开到两个事务了?

问题分析

结合上面的介绍的回滚流程看看我们这个例子在回滚时会发生什么

  1. 先获取数据源,此时dataSourceProxy.getPlainConnection()获取到的是MasterSlaveDataSource数据源
  2. select for update操作的时候,通过MasterSlaveDataSource获取一个Connection,前面说到过MasterSlaveDataSource是一个逻辑数据源,里面有路由逻辑,根据上面介绍的,这时候拿到的是dsSlaveConnection
  3. 在执行delete from undo ...操作的时候,这时候拿到的是dsMasterConnection
  4. 虽然dsSlavedsMaster对应的是相同的地址,但此时获取到的肯定是不同的连接,所以此时两个操作肯定是分布在两个事务中
  5. 执行select for update的事务,会一直等待直到删除undo_log完成才会提交
  6. 执行delete from undo ...的事务,会一直等待select for update的事务释放锁
  7. 典型的死锁问题

验证猜想

我尝试用了两个方法验证这个问题:

  1. 修改Seata代码,将select for update改成select,此时在查询undo_log就不需要持有该记录的锁,也就不会造成死锁

  2. 修改数据源代理逻辑,这才是问题的关键,该问题主要原因不是select for update。在此之前多层代理问题已经产生,然后才会造成死锁问题。从头到尾我们就不应该对masterSlave数据源进行代理。它只是一个逻辑数据源,为什么要对它进行代理呢?如果代理masterSlave,就不会造成多层代理问题,也就不会造成删除undo_log时的死锁问题

最终实现

masterSlave也是一个DataSource类型,该如何仅仅对dsMasterdsSlave 代理而不对masterSlave代理呢?观察SeataAutoDataSourceProxyCreator#shouldSkip方法,我们可以通过EnableAutoDataSourceProxy注解的excludes属性解决这个问题

@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}

即: 将数据源自动代理关闭,然后在启动类加上这个注解

@EnableAutoDataSourceProxy(excludes = {"org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource"})

自动代理在新版本中的优化

因为Seata 1.4.0还没有正式发布,我目前看的是1.4.0-SNAPSHOT版本的代码,即当前时间ddevelop分支最新的代码

代码改动

主要改动如下,一些小的细节就不过多说明了:

  1. DataSourceProxyHolder调整
  2. DataSourceProxy调整
  3. SeataDataSourceBeanPostProcessor新增

DataSourceProxyHolder

在这个类改动中,最主要是其putDataSource方法的改动

public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource;
if (dataSource instanceof SeataDataSourceProxy) {
SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
// 如果是代理数据源,并且和当前应用配置的数据源代理模式(AT/XA)一样, 则直接返回
if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
return (SeataDataSourceProxy)dataSource;
}

// 如果是代理数据源,和当前应用配置的数据源代理模式(AT/XA)不一样,则需要获取其TargetDataSource,然后为其创建一个代理数据源
originalDataSource = dataSourceProxy.getTargetDataSource();
} else {
originalDataSource = dataSource;
}

// 如果有必要,基于 TargetDataSource 创建 代理数据源
return this.dataSourceProxyMap.computeIfAbsent(originalDataSource,
BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}

DataSourceProxyHolder#putDataSource方法主要在两个地方被用到:一个是在SeataAutoDataSourceProxyAdvice切面中;一个是在SeataDataSourceBeanPostProcessor中。 这段判断为我们解决了什么问题?数据源多层代理问题。在开启了数据源自动代理的前提下,思考以下场景:

  1. 如果我们在项目中手动注入了一个DataSourceProxy,这时候在切面调用DataSourceProxyHolder#putDataSource方法时会直接返回该DataSourceProxy本身,而不会为其再创建一个DataSourceProxy
  2. 如果我们在项目中手动注入了一个DruidSource,这时候在切面调用DataSourceProxyHolder#putDataSource方法时会为其再创建一个DataSourceProxy并返回

这样看好像问题已经解决了,有没有可能会有其它的问题呢?看看下面的代码

@Bean
public DataSourceProxy dsA(){
return new DataSourceProxy(druidA)
}

@Bean
public DataSourceProxy dsB(DataSourceProxy dsA){
return new DataSourceProxy(dsA)
}
  1. 这样写肯定是不对,但如果他就要这样写你也没办法
  2. dsA没什么问题,但dsB还是会产生双层代理的问题,因为此时dsB 的 TargetDataSourcedsA
  3. 这就涉及到DataSourceProxy的改动

DataSourceProxy

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
// 下面这个判断,保证了在我们传入一个DataSourceProxy的时候,也不会产生双层代理问题
if (targetDataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
init(targetDataSource, resourceGroupId);
}

SeataDataSourceBeanPostProcessor

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

......

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}

//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the bean of the data source," +
" and return the original data source to replace the data source proxy.");
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
  1. SeataDataSourceBeanPostProcessor实现了BeanPostProcessor接口,在一个bean初始化后,会执行BeanPostProcessor#postProcessAfterInitialization方法。也就是说,在postProcessAfterInitialization方法中,这时候的bean已经是可用状态了
  2. 为什么要提供这么一个类呢?从它的代码上来看,仅仅是为了再bean初始化之后,为数据源初始化对应的DataSourceProxy,但为什么要这样做呢?

因为有些数据源在应用启动之后,可能并不会初始化(即不会调用数据源的相关方法)。如果没有提供SeataDataSourceBeanPostProcessor类,那么就只有在SeataAutoDataSourceProxyAdvice切面中才会触发DataSourceProxyHolder#putDataSource方法。假如有一个客户端在回滚的时候宕机了,在重启之后,Server端通过定时任务向其派发回滚请求,这时候客户端需要先根据rsourceId(连接地址)找到对应的DatasourceProxy。但如果在此之前客户端还没有主动触发数据源的相关方法,就不会进入SeataAutoDataSourceProxyAdvice切面逻辑,也就不会为该数据源初始化对应的DataSourceProxy,从而导致回滚失败

多层代理总结

通过上面的分析,我们大概已经知道了seata在避免多层代理上的一些优化,但其实还有一个问题需要注意:逻辑数据源的代理

这时候的调用关系为: masterSlaveProxy -> masterSlave -> masterproxy/slaveProxy -> master/slave

此时可以通过excludes属性排除逻辑数据源,从而不为其创建数据源代理。

总结一下:

  1. 在为数据源初始化对应的DataSourceProxy时,判断是否有必要为其创建对应的DataSourceProxy,如果本身就是DataSourceProxy,就直接返回
  2. 针对一些数据源手动注入的情况,为了避免一些人为误操作的导致的多层代理问题,在DataSourceProxy构造函数中添加了判断,如果入参TragetDatasource本身就是一个DataSourceProxy, 则获取其target属性作为新DataSourceProxy的tragetDatasource
  3. 针对一些其它情况,比如逻辑数据源代理问题,通过excludes属性添加排除项,这样可以避免为逻辑数据源创建DataSourceProxy

全局事务和本地事务使用建议

有一个问题,如果在一个方法里涉及到多个DB操作,比如涉及到3条update操作,我们需不需在这个方法使用spring中的@Transactional注解?针对这个问题,我们分别从两个角度考虑:不使用@Transactional注解 和 使用@Transactional注解

不使用@Transactional注解

  1. 在提交阶段,因为该分支事务有3条update操作,每次执行update操作的时候,都会通过数据代理向TC注册一个分支事务,并为其生成对应的undo_log,最终3个update操作被当作3个分支事务来处理
  2. 在回滚阶段,需要回滚3个分支事务
  3. 数据的一致性通过seata全局事务来保证

使用@Transactional注解

  1. 在提交阶段,3个update操作被当作一个分支事务来提交,所以最终只会注册一个分支事务
  2. 在回滚阶段,需要回滚1个分支事务
  3. 数据的一致性:这3个update的操作通过本地事务的一致性保证;全局一致性由seata全局事务来保证。此时3个update仅仅是一个分支事务而已

结论

通过上面的对比,答案是显而易见的,合理的使用本地事务,可以大大的提升全局事务的处理速度。上面仅仅是3个DB操作,如果一个方法里面涉及到的DB操作更多呢,这时候两种方式的差别是不是更大呢?

最后,感谢@FUNKYE大佬为我解答了很多问题并提供了宝贵建议!

· 阅读需 12 分钟

【分布式事务Seata源码解读二】Client端启动流程

本文从源码的角度分析一下AT模式下Client端启动流程,所谓的Client端,即业务应用方。分布式事务分为三个模块:TC、TM、RM。其中TC位于seata-server端,而TM、RM通过SDK的方式运行在client端。

下图展示了Seata官方Demo的一个分布式事务场景,分为如下几个微服务,共同实现了一个下订单、扣库存、扣余额的分布式事务。

  • BusinessService: 业务服务,下单服务的入口
  • StorageService: 库存微服务,用于扣减商品库存
  • OrderService: 订单微服务,创建订单
  • AccountService: 账户微服务,扣减用户账户的余额

在这里插入图片描述

从上图也可以看出,在AT模式下Seata Client端主要通过如下三个模块来实现分布式事务:

  • GlobalTransactionScanner: GlobalTransactionScanner负责初始TM、RM模块,并为添加分布式事务注解的方法添加拦截器,拦截器负责全局事务的开启、提交或回滚
  • DatasourceProxy: DatasourceProxy为DataSource添加拦截,拦截器会拦截所有SQL执行,并作为RM事务参与方的角色参与分布式事务执行。
  • Rpc Interceptor: 在上一篇分布式事务Seata源码解读一中有提到分布式事务的几个核心要点,其中有一个是分布式事务的跨服务实例传播。Rpc Interceptor的职责就是负责在多个微服务之间传播事务。

seata-spring-boot-starter

引用seata分布式事务SDK有两种方式,依赖seata-all或者seata-spring-boot-starter,推荐使用seata-spring-boot-starter,因为该starter已经自动注入了上面提到的三个模块,用户只要添加相应的配置,在业务代码添加全局分布式事务注解即可。下面从seata-spring-boot-starter项目中的代码入手:

如下图所示是seata-spring-boot-starter的项目结构: 在这里插入图片描述 主要分为以下几个模块:

  • properties: properties目录下都是Springboot 适配seata的相关配置类,即可以通过SpringBoot的配置方式来Seata的相关参数
  • provider: provider目录下的类负责把Springboot、SpringCloud的配置适配到Seata配置中
  • resources: resources目录下主要有两个文件,spring.factories用于注册Springboot的自动装配类,ExtConfigurationProvider用于注册SpringbootConfigurationProvider类,该Provider类负责把SpringBoot的相关配置类适配到Seata中。

对于springboot-starter项目,我们先查看resources/META-INF/spring.factories文件:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration

可以看到在spring.factories中配置了自动装配类:SeataAutoConfiguration,在该装配类中主要注入了GlobalTransactionScanner和seataAutoDataSourceProxyCreator两个实例。代码如下:

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled",
havingValue = "true",
matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {

...

// GlobalTransactionScanner负责为添加GlobalTransaction注解的方法添加拦截器,
// 并且负责初始化RM、TM
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties,
FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(),
seataProperties.getTxServiceGroup(),
failureHandler);
}

// SeataAutoDataSourceProxyCreator负责为Spring中的所有DataSource生成代理对象,
// 从而实现拦截所有SQL的执行
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {
"enableAutoDataSourceProxy", "enable-auto" +
"-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying());
}
}

GlobalTransactionScanner

GlobalTransactionScanner继承于AutoProxyCreator,AutoProxyCreator是Spring中实现AOP的一种方式,可以拦截Spring中的所有实例,判断是否需要进行代理。下面列出了GlobalTransactionScanner中一些比较重要的字段和拦截代理的核心方法:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean {
...
// interceptor字段是对应一个代理对象的拦截器,
// 可以认为是一个临时变量,有效期是一个被代理对象
private MethodInterceptor interceptor;

// globalTransactionalInterceptor是通用的Interceptor,
// 非TCC事务方式的都使用该Interceptor
private MethodInterceptor globalTransactionalInterceptor;

// PROXYED_SET存储已经代理过的实例,防止重复处理
private static final Set<String> PROXYED_SET = new HashSet<>();

// applicationId是一个服务的唯一标识,
// 对应springcloud项目中的spring.application.name
private final String applicationId;
// 事务的分组标识,参考文章wiki:https://seata.apache.org/zh-cn/docs/user/txgroup/transaction-group/
private final String txServiceGroup;

...

// 判断是否需要代理目标对象,如果需要代理,则生成拦截器赋值到类变量interceptor中
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 判断是否禁用分布式事务
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}

// 每次处理一个被代理对象时先把interceptor置为null,所以interceptor的
// 生命周期是一个被代理对象,由于是在另外一个方法getAdvicesAndAdvisorsForBean
// 中使用interceptor,所以该interceptor要定义为一个类变量
interceptor = null;

// 判断是否是TCC事务模式,判断的主要依据是方法上是否有TwoPhaseBusinessAction注解
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName,
applicationContext)) {
// 创建一个TCC事务的拦截器
interceptor =
new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
// 获取待处理对象的class类型
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
// 获取待处理对象继承的所有接口
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// 如果待处理对象的class或者继承的接口上有GlobalTransactional注解,
// 或者待处理对象的class的任一个方法上有GlobalTransactional或者
// GlobalLock注解则返回true,即需要被代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

// 如果interceptor为null,即不是TCC模式,
// 则使用globalTransactionalInterceptor作为拦截器
if (interceptor == null) {
// globalTransactionalInterceptor只会被创建一次
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor =
new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener) globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}

if (!AopUtils.isAopProxy(bean)) {
// 如果bean本身不是Proxy对象,则直接调用父类的wrapIfNecessary生成代理对象即可
// 在父类中会调用getAdvicesAndAdvisorsForBean获取到上面定义的interceptor
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 如果该bean已经是代理对象了,则直接在代理对象的拦截调用链AdvisedSupport
// 上直接添加新的interceptor即可。
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName,
getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
// 标识该beanName已经处理过了
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

// 返回wrapIfNecessary方法中计算出的interceptor对象
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName,
TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
}

上面介绍了GlobalTransactionScanner是如何通过注解拦截全局事务的,具体拦截器实现为TccActionInterceptor和GlobalTransactionalInterceptor,对于AT模式来说我们主要关心GlobalTransactionalInterceptor,在后续的文章中会介绍GlobalTransactionalInterceptor的具体实现。

另外GloabalTransactionScanner还负责TM、RM的初始化工作,是在initClient方法中实现的:

private void initClient() {
...

//初始化TM
TMClient.init(applicationId, txServiceGroup);
...

//初始化RM
RMClient.init(applicationId, txServiceGroup);
...

// 注册Spring shutdown的回调,用来释放资源
registerSpringShutdownHook();

}

TMClient、RMClient都是Seata基于Netty实现的Rpc框架的客户端类,只是业务逻辑不同,由于TMClient相对来说更简单一些,我们以RMClient为例看一下源码:

public class RMClient {
// RMClient的init是一个static方法,创建了一个RmNettyRemotingClient实例,并调用init方法
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient =
RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
}

RmNettyRemotingClient的实现如下:

@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
// ResourceManager负责处理事务参与方,支持AT、TCC、Saga三种模式
private ResourceManager resourceManager;
// RmNettyRemotingClient单例
private static volatile RmNettyRemotingClient instance;
private final AtomicBoolean initialized = new AtomicBoolean(false);
// 微服务的唯一标识
private String applicationId;
// 分布式事务分组名称
private String transactionServiceGroup;

// RMClient中init方法会调用该init方法
public void init() {
// 注册Seata自定义Rpc的Processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
// 调用父类的init方法,在父类中负责Netty的初始化,与Seata-Server建立连接
super.init();
}
}

// 注册Seata自定义Rpc的Processor
private void registerProcessor() {
// 1.注册Seata-Server发起branchCommit的处理Processor
RmBranchCommitProcessor rmBranchCommitProcessor =
new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor,
messageExecutor);

// 2.注册Seata-Server发起branchRollback的处理Processor
RmBranchRollbackProcessor rmBranchRollbackProcessor =
new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor
, messageExecutor);

// 3.注册Seata-Server发起删除undoLog的处理Processor
RmUndoLogProcessor rmUndoLogProcessor =
new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor,
messageExecutor);

// 4.注册Seata-Server返回Response的处理Processor,ClientOnResponseProcessor
// 用于处理由Client主动发起Request,Seata-Server返回的Response。
// ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
// 返回的Response对应起来,从而实现Rpc
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(),
getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor,
null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT,
onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);

// 5. 处理Seata-Server返回的Pong消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor,
null);
}
}

上面的逻辑看起来比较复杂,相关类也比较多,如:各种Processor、各种MessageType、TransactionMessageHandler、ResourceManager。其实本质上就是Rpc调用,分为Rm主动调用和Seata主动调用。

  • Rm主动调用方法: 如:注册分支、汇报分支状态、申请全局锁等。Rm主动调用的方法都需要在ClientOnResponseProcessor中处理Seata-Server返回的Response
  • Seata-Server主动调用方法: 如:提交分支事务、回滚分支事务、删除undolog日志。Seata-Server主动调用的方法,Client端分别对应不同的Processor来处理,并且处理结束后要返回给Seata-Server处理结果Response。而事务提交、回滚的核心实现逻辑都在TransactionMessageHandler、ResourceManager中。

关于TransactionMessageHandler、ResourceManager的具体实现也会在后续的章节中详细描述。

下一篇会介绍一下SeataAutoDataSourceProxyCreator、Rpc Interceptor是如何初始化以及拦截的。

· 阅读需 9 分钟

前言

最近因为工作需要,研究学习了Seata分布式事务框架,本文把自己学习的知识记录一下

Seata总览

cloc代码统计

先看一下seata项目cloc代码统计(截止到2020-07-20)

cloc-seata

Java代码行数大约是 97K

代码质量

单元测试覆盖率50%

cloc-seata

Demo代码

本文讲的Demo代码是seata-samples项目下的seata-samples-dubbo模块,地址如下:

https://github.com/apache/incubator-seata-samples/tree/master/dubbo

解决的核心问题

AT模式的Demo例子给出了一个典型的分布式事务场景:

  • 在一个采购交易中,需要:
  1. 扣减商品库存
  2. 扣减用户账号余额
  3. 生成采购订单
  • 很明显,以上3个步骤必须:要么全部成功,要么全部失败,否则系统的数据会错乱
  • 而现在流行的微服务架构,一般来说,库存,账号余额,订单是3个独立的系统
  • 每个微服务有自己的数据库,相互独立

这里就是分布式事务的场景。

设计图

解决方案

AT模式解决这个问题的思路其实很简单,一句话概括就是:

在分布式事务过程中,记录待修改的数据修改前和修改后的值到undo_log表,万一交易中出现异常,通过这个里的数据做回滚

当然,具体代码实现起来,我相信很多细节远没这么简单。

Demo代码结构

从github上clone最新的代码

git clone git@github.com:apache/incubator-seata-samples.git

阅读Demo代码结构

$ cd seata-samples/dubbo/
$ tree -C -I 'target' .
.
├── README.md
├── pom.xml
├── seata-samples-dubbo.iml
└── src
└── main
├── java
│ └── io
│ └── seata
│ └── samples
│ └── dubbo
│ ├── ApplicationKeeper.java
│ ├── Order.java
│ ├── service
│ │ ├── AccountService.java
│ │ ├── BusinessService.java
│ │ ├── OrderService.java
│ │ ├── StorageService.java
│ │ └── impl
│ │ ├── AccountServiceImpl.java
│ │ ├── BusinessServiceImpl.java
│ │ ├── OrderServiceImpl.java
│ │ └── StorageServiceImpl.java
│ └── starter
│ ├── DubboAccountServiceStarter.java
│ ├── DubboBusinessTester.java
│ ├── DubboOrderServiceStarter.java
│ └── DubboStorageServiceStarter.java
└── resources
├── file.conf
├── jdbc.properties
├── log4j.properties
├── registry.conf
├── spring
│ ├── dubbo-account-service.xml
│ ├── dubbo-business.xml
│ ├── dubbo-order-service.xml
│ └── dubbo-storage-service.xml
└── sql
├── dubbo_biz.sql
└── undo_log.sql

13 directories, 27 files
  • 在io.seata.samples.dubbo.starter包下的4个*Starter类,分别模拟上面所述的4个微服务

    • Account
    • Business
    • Order
    • Storage
  • 4个服务都是标准的dubbo服务,配置文件在seata-samples/dubbo/src/main/resources/spring目录下

  • 运行demo需要把这4个服务都启动起来,Business最后启动

  • 主要的逻辑在io.seata.samples.dubbo.service,4个实现类分别对应4个微服务的业务逻辑

  • 数据库信息的配置文件:src/main/resources/jdbc.properties

时序图

cloc-seata

Ok, 赶紧动手, Make It Happen!

运行Demo

MySQL

建表

执行seata-samples/dubbo/src/main/resources/sql的脚本dubbo_biz.sql和undo_log.sql

mysql> show tables;
+-----------------+
| Tables_in_seata |
+-----------------+
| account_tbl |
| order_tbl |
| storage_tbl |
| undo_log |
+-----------------+
4 rows in set (0.01 sec)

执行完之后,数据库里应该有4个表

修改seata-samples/dubbo/src/main/resources/jdbc.properties文件

根据你MySQL运行的环境修改变量的值

jdbc.account.url=jdbc:mysql://localhost:3306/seata
jdbc.account.username=your_username
jdbc.account.password=your_password
jdbc.account.driver=com.mysql.jdbc.Driver
# storage db config
jdbc.storage.url=jdbc:mysql://localhost:3306/seata
jdbc.storage.username=your_username
jdbc.storage.password=your_password
jdbc.storage.driver=com.mysql.jdbc.Driver
# order db config
jdbc.order.url=jdbc:mysql://localhost:3306/seata
jdbc.order.username=your_username
jdbc.order.password=your_password
jdbc.order.driver=com.mysql.jdbc.Driver

ZooKeeper

启动ZooKeeper,我的本地的Mac是使用Homebrew安装启动的

$ brew services start zookeeper 
==> Successfully started `zookeeper` (label: homebrew.m

$ brew services list
Name Status User Plist
docker-machine stopped
elasticsearch stopped
kafka stopped
kibana stopped
mysql started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.mysql.plist
nginx stopped
postgresql stopped
redis stopped
zookeeper started portman /Users/portman/Librar
y/LaunchAgents/homebrew.mxcl.zookeeper.plist

启动TC事务协调器

在这个链接里页面中,下载对应版本的seata-server程序,我本地下载的是1.2.0版本

  1. 进入文件所在目录并解压文件
  2. 进入seata目录
  3. 执行启动脚本
$ tar -zxvf seata-server-1.2.0.tar.gz
$ cd seata
$ bin/seata-server.sh

观察启动日志是否有报错信息,如果一切正常,并看到了以下的Server started的信息,说明启动成功了。

2020-07-23 13:45:13.810 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...

IDE中启动模拟的微服务

  1. 首先要把seata-samples项目导入到本地IDE中,这里我用的是IntelliJ IDEA
  2. 刷新Maven的工程依赖
  3. 先启动Account,Order,Storage这个3个服务,然后Business才能去调用,对应的启动类分别是:
io.seata.samples.dubbo.starter.DubboStorageServiceStarter
io.seata.samples.dubbo.starter.DubboOrderServiceStarter
io.seata.samples.dubbo.starter.DubboStorageServiceStarter

每个服务启动完之后,看到这句提示信息,说明服务启动成功了

Application is keep running ...

cloc-seata

启动成功后,account_tbl,storage_tbl表会有两条初始化的数据,分别是账户余额和商品库存

mysql> SELECT * FROM account_tbl; SELECT * FROM storage_tbl;
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+
| 1 | U100001 | 999 |
+----+---------+-------+
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+
| 1 | C00321 | 100 |
+----+----------------+-------+
1 row in set (0.00 sec)

使用Business验证效果

正常情况

还是在IDE中执行DubboBusinessTester类的主函数,程序跑完会自动退出

在程序一切正常的情况下,每个微服务的事物都应该是提交了的,数据保持一致

我们来看一下MySQL中数据的变化

mysql> SELECT * FROM account_tbl; SELECT * FROM order_tbl; SELECT * FROM storage_tbl;
+----+---------+-------+
| id | user_id | money |
+----+---------+-------+
| 1 | U100001 | 599 |
+----+---------+-------+
1 row in set (0.00 sec)

+----+---------+----------------+-------+-------+
| id | user_id | commodity_code | count | money |
+----+---------+----------------+-------+-------+
| 1 | U100001 | C00321 | 2 | 400 |
+----+---------+----------------+-------+-------+
1 row in set (0.00 sec)

+----+----------------+-------+
| id | commodity_code | count |
+----+----------------+-------+
| 1 | C00321 | 98 |
+----+----------------+-------+
1 row in set (0.00 sec)

从3个表的数据可以看到:账户余额扣减了400块;订单表增加了1条记录;商品库存扣减了2个

这个结果是程序的逻辑是一致的,说明事务没有问题

异常情况

其实即使不加入分布式事务的控制,一切都正常情况下,事务本身就不会有问题的

所以我们来重点关注,当程序出现异常时的情况

现在我把BusinessServiceImpl的抛异常的代码注释放开,然后再执行一次DubboBusinessTester,来看看有什么情况发生

		@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);

//放开这句抛异常的注释,模拟程序出现异常
throw new RuntimeException("portman's foooooobar error.");

}

接着,我再一次执行DubboBusinessTester,执行过程中在控制台可以看到异常报错信息

Exception in thread "main" java.lang.RuntimeException: portman's foooooobar error.

现在我们再看一下MySQL里的数据变化,发现数据没有任何变化,说明分布式事务的控制已经起作用了

待思考问题

上面的步骤只是演示了seata最简单的demo程序,更多更复杂的情况后续大家可以一起讨论和验证

学习过程中还有一些问题和疑惑,后续进一步学习

  • 全局锁对性能的影响程度
  • undo_log日志可以回滚到原来状态,但是如果数据状态已经发生变化如何处理(比如增加的用户积分已经被别的本地事务花掉了)

参考文献

作者信息

许晓加,金蝶软件架构师

Github