跳到主要内容

· 阅读需 15 分钟

在微服务架构体系下,我们可以按照业务模块分层设计,单独部署,减轻了服务部署压力,也解耦了业务的耦合,避免了应用逐渐变成一个庞然怪物,从而可以轻松扩展,在某些服务出现故障时也不会影响其它服务的正常运行。总之,微服务在业务的高速发展中带给我们越来越多的优势,但是微服务并不是十全十美,因此不能盲目过度滥用,它有很多不足,而且会给系统带来一定的复杂度,其中伴随而来的分布式事务问题,是微服务架构体系下必然需要处理的一个痛点,也是业界一直关注的一个领域,因此也出现了诸如 CAP 和 BASE 等理论。

在今年年初,阿里开源了一个分布式事务中间件,起初起名为 Fescar,后改名为 Seata,在它开源之初,我就知道它肯定要火,因为这是一个解决痛点的开源项目,Seata 一开始就是冲着对业务无侵入与高性能方向走,这正是我们对解决分布式事务问题迫切的需求。因为待过的几家公司,用的都是微服务架构,但是在解决分布式事务的问题上都不太优雅,所以我也在一直关注 Seata 的发展,今天就简要说说它的一些设计上的原理,后续我将会对它的各个模块进行深入源码分析,感兴趣的可以持续关注我的公众号或者博客,不要跟丢。

分布式事务解决的方案有哪些?

目前分布式事务解决的方案主要有对业务无入侵和有入侵的方案,无入侵方案主要有基于数据库 XA 协议的两段式提交(2PC)方案,它的优点是对业务代码无入侵,但是它的缺点也是很明显:必须要求数据库对 XA 协议的支持,且由于 XA 协议自身的特点,它会造成事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,因此它性能很差,它的存在相当于七伤拳那样“伤人七分,损己三分”,因此在互联网项目中并不是很流行这种解决方案。

为了这个弥补这种方案带来性能低的问题,大佬们又想出了很多种方案来解决,但这无一例外都需要通过在应用层做手脚,即入侵业务的方式,比如很出名的 TCC 方案,基于 TCC 也有很多成熟的框架,如 ByteTCC、tcc-transaction 等。以及基于可靠消息的最终一致性来实现,如 RocketMQ 的事务消息。

入侵代码的方案是基于现有情形“迫不得已”才推出的解决方案,实际上它们实现起来非常不优雅,一个事务的调用通常伴随而来的是对该事务接口增加一系列的反向操作,比如 TCC 三段式提交,提交逻辑必然伴随着回滚的逻辑,这样的代码会使得项目非常臃肿,维护成本高。

Seata 各模块之间的关系

针对上面所说的分布式事务解决方案的痛点,那很显然,我们理想的分布式事务解决方案肯定是性能要好而且要对业务无入侵,业务层上无需关心分布式事务机制的约束,Seata 正是往这个方向发展的,因此它非常值得期待,它将给我们的微服务架构带来质的提升。

那 Seata 是怎么做到的呢?下面说说它的各个模块之间的关系。

Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。

Seata 内部定义了 3个模块来处理全局事务和分支事务的关系和处理过程,这三个组件分别是:

  • Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

简要说说整个全局事务的执行步骤:

  1. TM 向 TC 申请开启一个全局事务,TC 创建全局事务后返回全局唯一的 XID,XID 会在全局事务的上下文中传播;
  2. RM 向 TC 注册分支事务,该分支事务归属于拥有相同 XID 的全局事务;
  3. TM 向 TC 发起全局提交或回滚;
  4. TC 调度 XID 下的分支事务完成提交或者回滚。

与 XA 方案有什么不同?

Seata 的事务提交方式跟 XA 协议的两段式提交在总体上来说基本是一致的,那它们之间有什么不同呢?

我们都知道 XA 协议它依赖的是数据库层面来保障事务的一致性,也即是说 XA 的各个分支事务是在数据库层面上驱动的,由于 XA 的各个分支事务需要有 XA 的驱动程序,一方面会导致数据库与 XA 驱动耦合,另一方面它会导致各个分支的事务资源锁定周期长,这也是它没有在互联网公司流行的重要因素。

基于 XA 协议以上的问题,Seata 另辟蹊径,既然在依赖数据库层会导致这么多问题,那我就从应用层做手脚,这还得从 Seata 的 RM 模块说起,前面也说过 RM 的主要作用了,其实 RM 在内部做了对数据库操作的代理层,如下:

Seata 在数据源做了一层代理层,所以我们使用 Seata 时,我们使用的数据源实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,主要是解析 SQL,把业务数据在更新前后的数据镜像组织成回滚日志,并将 undo log 日志插入 undo_log 表中,保证每条更新数据的业务 sql 都有对应的回滚日志存在。

这样做的好处就是,本地事务执行完可以立即释放本地事务锁定的资源,然后向 TC 上报分支状态。当 TM 决议全局提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 undo log 日志即可,这个步骤非常快速地可以完成;当 TM 决议全局回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 undo log 回滚日志,然后执行回滚日志完成回滚操作。

如上图所示,XA 方案的 RM 是放在数据库层的,它依赖了数据库的 XA 驱动程序。

如上图所示,Seata 的 RM 实际上是已中间件的形式放在应用层,不用依赖数据库对协议的支持,完全剥离了分布式事务方案对数据库在协议支持上的要求。

分支事务如何提交和回滚?

下面详细说说分支事务是如何提交和回滚的:

  • 第一阶段:

分支事务利用 RM 模块中对 JDBC 数据源代理,加入了若干流程,对业务 SQL 进行解释,把业务数据在更新前后的数据镜像组织成回滚日志,并生成 undo log 日志,对全局事务锁的检查以及分支事务的注册等,利用本地事务 ACID 特性,将业务 SQL 和 undo log 写入同一个事物中一同提交到数据库中,保证业务 SQL 必定存在相应的回滚日志,最后对分支事务状态向 TC 进行上报。

  • 第二阶段:

TM决议全局提交:

当 TM 决议提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 undo log 日志即可,这个步骤非常快速地可以完成。这个机制对于性能提升非常关键,我们知道正常的业务运行过程中,事务执行的成功率是非常高的,因此可以直接在本地事务中提交,这步对于提升性能非常显著。

TM决议全局回滚:

当 TM 决议回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 undo log 回滚日志,然后利用本地事务 ACID 特性,执行回滚日志完成回滚操作并删除 undo log 日志,最后向 TC 进行回滚结果上报。

业务对以上所有的流程都无感知,业务完全不关心全局事务的具体提交和回滚,而且最重要的一点是 Seata 将两段式提交的同步协调分解到各个分支事务中了,分支事务与普通的本地事务无任何差异,这意味着我们使用 Seata 后,分布式事务就像使用本地事务一样,完全将数据库层的事务协调机制交给了中间件层 Seata 去做了,这样虽然事务协调搬到应用层了,但是依然可以做到对业务的零侵入,从而剥离了分布式事务方案对数据库在协议支持上的要求,且 Seata 在分支事务完成之后直接释放资源,极大减少了分支事务对资源的锁定时间,完美避免了 XA 协议需要同步协调导致资源锁定时间过长的问题。

其它方案的补充

上面说的其实是 Seata 的默认模式,也叫 AT 模式,它是类似于 XA 方案的两段式提交方案,并且是对业务无侵入,但是这种机制依然是需要依赖数据库本地事务的 ACID 特性,有没有发现,我在上面的图中都强调了必须是支持 ACID 特性的关系型数据库,那么问题就来了,非关系型或者不支持 ACID 的数据库就无法使用 Seata 了,别慌,Seata 现阶段为我们准备了另外一种模式,叫 MT 模式,它是一种对业务有入侵的方案,提交回滚等操作需要我们自行定义,业务逻辑需要被分解为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务,它存在的意义是为 Seata 触达更多的场景。

只不过,它不是 Seata “主打”的模式,它的存在仅仅作为补充的方案,从以上官方的发展远景就可以看出来,Seata 的目标是始终是对业务无入侵的方案。

注:本文图片设计参考Seata官方图

作者简介:

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。

· 阅读需 7 分钟

前言

TaaS 是 Seata 服务端(TC, Transaction Coordinator)的一种高可用实现,使用 Golang 编写。Taas 由InfiniVision (http://infinivision.cn) 贡献给Seata开源社区。现已正式开源,并贡献给 Seata 社区。

在Seata开源之前,我们内部开始借鉴GTS以及一些开源项目来实现分布式事务的解决方案TaaS(Transaction as a Service)。

在我们完成TaaS的服务端的开发工作后,Seata(当时还叫Fescar)开源了,并且引起了开源社区的广泛关注,加上阿里巴巴的平台影响力以及社区活跃度,我们认为Seata会成为今后开源分布式事务的标准,我们决定TaaS兼容Seata。

在发现Seata的服务端的实现是单机的,高可用等并没有实现,于是我们与Seata社区负责人取得联系,并且决定把TaaS开源,回馈开源社区。 同时,我们会长期维护,并且和Seata版本保持同步。

目前,Seata官方的Java高可用版本也在开发中,TaaS和该高可用版本的设计思想不同,在今后会长期共存。

TaaS已经开源, github (https://github.com/apache/incubator-seata-go-server),欢迎大家试用。

设计原则

  1. 高性能,性能和机器数量成正比,即通过加入新机器到集群中,就可以提升性能
  2. 高可用,一台机器出现故障,系统能依旧可以对外提供服务,或者在较短的时间内恢复对外服务(Leader切换的时间)
  3. Auto-Rebalance,集群中增加新的机器,或者有机器下线,系统能够自动的做负载均衡
  4. 强一致,系统的元数据强一致在多个副本中存储

设计

高性能

TaaS的性能和机器数量成正比,为了支持这个特性,在TaaS中处理全局事务的最小单元称为Fragment,系统在启动的时候会设定每个Fragment支持的活跃全局事务的并发数,同时系统会对每个Fragment进行采样,一旦发现Fragment超负荷,会生成新的Fragment来处理更多的并发。

高可用

每个Fragment有多个副本和一个Leader,由Leader来处理请求。当Leader出现故障,系统会产生一个新的Leader来处理请求,在新Leader的选举过程中,这个Fragment对外不提供服务,通常这个间隔时间是几秒钟。

强一致

TaaS本身不存储全局事务的元数据,元数据存储在Elasticell (https://github.com/deepfabric/elasticell) 中,Elasticell是一个兼容redis协议的分布式的KV存储,它基于Raft协议来保证数据的一致性。

Auto-Rebalance

随着系统的运行,在系统中会存在许多Fragment以及它们的副本,这样会导致在每个机器上,Fragment的分布不均匀,特别是当旧的机器下线或者新的机器上线的时候。TaaS在启动的时候,会选择3个节点作为调度器的角色,调度器负责调度这些Fragment,用来保证每个机器上的Fragment的数量以及Leader个数大致相等,同时还会保证每个Fragment的副本数维持在指定的副本个数。

Fragment副本创建

  1. t0时间点,Fragment1在Seata-TC1机器上创建
  2. t1时间点,Fragment1的副本Fragment1'在Seata-TC2机器上创建
  3. t2时间点,Fragment1的副本Fragment1"在Seata-TC3机器上创建

在t2时间点,Fragment1的三个副本创建完毕。

Fragment副本迁移

  1. t0时刻点,系统一个存在4个Fragment,分别存在于Seata-TC1,Seata-TC2,Seata-TC3三台机器上
  2. t1时刻,加入新机器Seata-TC4
  3. t2时刻,有3个Fragment的副本被迁移到了Seata-TC4这台机器上

在线快速体验

我们在公网搭建了一个体验的环境:

本地快速体验

使用docker-compose快速体验TaaS的功能。

git clone https://github.com/seata/taas.git
docker-compse up -d

由于组件依赖较多,docker-compose启动30秒后,可以对外服务

Seata服务地址

服务默认监听在8091端口,修改Seata对应的服务端地址体验

Seata UI

访问WEB UI http://127.0.0.1:8084/ui/index.html

关于InfiniVision

深见网络是一家技术驱动的企业级服务提供商,致力于利用人工智能、云计算、区块链、大数据,以及物联网边缘计算技术助力传统企业的数字化转型和升级。深见网络积极拥抱开源文化并将核心算法和架构开源,知名人脸识别软件 InsightFace (https://github.com/deepinsight/insightface) (曾多次获得大规模人脸识别挑战冠军),以及分布式存储引擎 Elasticell (https://github.com/deepfabric/elasticell) 等均是深见网络的开源产品。

关于作者

作者张旭,开源网关Gateway (https://github.com/fagongzi/gateway) 作者,目前就职于InfiniVision,负责基础架构相关的研发工作。

· 阅读需 21 分钟

Fescar 简介

常见的分布式事务方式有基于 2PC 的 XA (e.g. atomikos),从业务层入手的 TCC( e.g. byteTCC)、事务消息 ( e.g. RocketMQ Half Message) 等等。XA 是需要本地数据库支持的分布式事务的协议,资源锁在数据库层面导致性能较差,而支付宝作为布道师引入的 TCC 模式需要大量的业务代码保证,开发维护成本较高。

分布式事务是业界比较关注的领域,这也是短短时间 Fescar 能收获 6k Star 的原因之一。Fescar 名字取自 Fast & Easy Commit And Rollback ,简单来说 Fescar 通过对本地 RDBMS 分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是相对于 XA 模式是性能较好不长时间占用连接资源,相对于 TCC 方式开发成本和业务侵入性较低。

类似于 XA,Fescar 将角色分为 TC、RM、TM,事务整体过程模型如下:

Fescar事务过程

1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
2. XID 在微服务调用链路的上下文中传播。
3. RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
4. TM 向 TC 发起针对 XID 的全局提交或回滚决议。
5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

其中在目前的实现版本中 TC 是独立部署的进程,维护全局事务的操作记录和全局锁记录,负责协调并驱动全局事务的提交或回滚。TM RM 则与应用程序工作在同一应用进程。RM 对 JDBC 数据源采用代理的方式对底层数据库做管理,利用语法解析,在执行事务时保留快照,并生成 undo log。大概的流程和模型划分就介绍到这里,下面开始对 Fescar 事务传播机制的分析。

Fescar 事务传播机制

Fescar 事务传播包括应用内事务嵌套调用和跨服务调用的事务传播。Fescar 事务是怎么在微服务调用链中传播的呢?Fescar 提供了事务 API 允许用户手动绑定事务的 XID 并加入到全局事务中,所以我们根据不同的服务框架机制,将 XID 在链路中传递即可实现事务的传播。

RPC 请求过程分为调用方与被调用方两部分,我们需要对 XID 在请求与响应时做相应的处理。大致过程为:调用方即请求方将当前事务上下文中的 XID 取出,通过 RPC 协议传递给被调用方;被调用方从请求中的将 XID 取出,并绑定到自己的事务上下文中,纳入全局事务。微服务框架一般都有相应的 Filter 和 Interceptor 机制,我们来具体分析下 Spring Cloud 与 Fescar 的整合过程。

Fescar 与 Spring Cloud Alibaba 集成部分源码解析

本部分源码全部来自于 spring-cloud-alibaba-fescar. 源码解析部分主要包括 AutoConfiguration、微服务被调用方和微服务调用方三大部分。对于微服务调用方方式具体分为 RestTemplate 和 Feign,其中对于 Feign 请求方式又进一步细分为结合 Hystrix 和 Sentinel 的使用模式。

Fescar AutoConfiguration

对于 AutoConfiguration 的解析此处只介绍与 Fescar 启动相关的部分,其他部分的解析将穿插于【微服务被调用方】和 【微服务调用方】章节进行介绍。

Fescar 的启动需要配置 GlobalTransactionScanner,GlobalTransactionScanner 负责初始化 Fescar 的 RM client、TM client 和 自动代理标注 GlobalTransactional 注解的类。GlobalTransactionScanner bean 的启动通过 GlobalTransactionAutoConfiguration 加载并注入 FescarProperties。
FescarProperties 包含了 Fescar 的重要属性 txServiceGroup ,此属性的可通过 application.properties 文件中的 key: spring.cloud.alibaba.fescar.txServiceGroup 读取,默认值为 ${spring.application.name}-fescar-service-group 。txServiceGroup 表示 Fescar 的逻辑事务分组名,此分组名通过配置中心(目前支持文件、Apollo)获取逻辑事务分组名对应的 TC 集群名称,进一步通过集群名称构造出 TC 集群的服务名,通过注册中心(目前支持 nacos、redis、zk 和 eureka)和服务名找到可用的 TC 服务节点,然后 RM client、TM client 与 TC 进行 rpc 交互。

微服务被调用方

由于调用方的逻辑比较多一点,我们先分析被调用方的逻辑。针对于 Spring Cloud 项目,默认采用的 RPC 传输协议是 HTTP 协议,所以使用了 HandlerInterceptor 机制来对 HTTP 的请求做拦截。

HandlerInterceptor 是 Spring 提供的接口, 它有以下三个方法可以被覆写。

    /**
* Intercept the execution of a handler. Called after HandlerMapping determined
* an appropriate handler object, but before HandlerAdapter invokes the handler.
*/
default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {

return true;
}

/**
* Intercept the execution of a handler. Called after HandlerAdapter actually
* invoked the handler, but before the DispatcherServlet renders the view.
* Can expose additional model objects to the view via the given ModelAndView.
*/
default void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable ModelAndView modelAndView) throws Exception {
}

/**
* Callback after completion of request processing, that is, after rendering
* the view. Will be called on any outcome of handler execution, thus allows
* for proper resource cleanup.
*/
default void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable Exception ex) throws Exception {
}

根据注释,我们可以很明确的看到各个方法的作用时间和常用用途。对于 Fescar 集成来讲,它根据需要重写了 preHandle、afterCompletion 方法。

FescarHandlerInterceptor 的作用是将服务链路传递过来的 XID,绑定到服务节点的事务上下文中,并且在请求完成后清理相关资源。FescarHandlerInterceptorConfiguration 中配置了所有的 url 均进行拦截,对所有的请求过来均会执行该拦截器,进行 XID 的转换与事务绑定。

/**
* @author xiaojing
*
* Fescar HandlerInterceptor, Convert Fescar information into
* @see com.alibaba.fescar.core.context.RootContext from http request's header in
* {@link org.springframework.web.servlet.HandlerInterceptor#preHandle(HttpServletRequest , HttpServletResponse , Object )},
* And clean up Fescar information after servlet method invocation in
* {@link org.springframework.web.servlet.HandlerInterceptor#afterCompletion(HttpServletRequest, HttpServletResponse, Object, Exception)}
*/
public class FescarHandlerInterceptor implements HandlerInterceptor {

private static final Logger log = LoggerFactory
.getLogger(FescarHandlerInterceptor.class);

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {

String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}

if (xid == null && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) throws Exception {

String rpcXid = request.getHeader(RootContext.KEY_XID);

if (StringUtils.isEmpty(rpcXid)) {
return;
}

String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}

}

preHandle 在请求执行前被调用,xid 为当前事务上下文已经绑定的全局事务的唯一标识,rpcXid 为请求通过 HTTP Header 传递过来需要绑定的全局事务标识。preHandle 方法中判断如果当前事务上下文中没有 XID,且 rpcXid 不为空,那么就将 rpcXid 绑定到当前的事务上下文。

afterCompletion 在请求完成后被调用,该方法用来执行资源的相关清理动作。Fescar 通过 RootContext.unbind() 方法对事务上下文涉及到的 XID 进行解绑。下面 if 中的逻辑是为了代码的健壮性考虑,如果遇到 rpcXid 和 unbindXid 不相等的情况,再将 unbindXid 重新绑定回去。

对于 Spring Cloud 来讲,默认采用的 RPC 方式是 HTTP 的方式,所以对被调用方来讲,它的请求拦截方式不用做任何区分,只需要从 Header 中将 XID 就可以取出绑定到自己的事务上下文中即可。但是对于调用方由于请求组件的多样化,包括熔断隔离机制,所以要区分不同的情况做处理,后面我们来具体分析一下。

微服务调用方

Fescar 将请求方式分为:RestTemplate、Feign、Feign+Hystrix 和 Feign+Sentinel 。不同的组件通过 Spring Boot 的 Auto Configuration 来完成自动的配置,具体的配置类清单可以看 spring.factories ,下文也会介绍相关的配置类。

RestTemplate

先来看下如果调用方如果是是基于 RestTemplate 的请求,Fescar 是怎么传递 XID 的。

public class FescarRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);

String xid = RootContext.getXID();

if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}

FescarRestTemplateInterceptor 实现了 ClientHttpRequestInterceptor 接口的 intercept 方法,对调用的请求做了包装,在发送请求时若存在 Fescar 事务上下文 XID 则取出并放到 HTTP Header 中。

FescarRestTemplateInterceptor 通过 FescarRestTemplateAutoConfiguration 实现将 FescarRestTemplateInterceptor 配置到 RestTemplate 中去。

@Configuration
public class FescarRestTemplateAutoConfiguration {

@Bean
public FescarRestTemplateInterceptor fescarRestTemplateInterceptor() {
return new FescarRestTemplateInterceptor();
}

@Autowired(required = false)
private Collection<RestTemplate> restTemplates;

@Autowired
private FescarRestTemplateInterceptor fescarRestTemplateInterceptor;

@PostConstruct
public void init() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
interceptors.add(this.fescarRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}

}

init 方法遍历所有的 restTemplate ,并将原来 restTemplate 中的拦截器取出,增加 fescarRestTemplateInterceptor 后置入并重排序。

Feign

Feign 类关系图

接下来看下 Feign 的相关代码,该包下面的类还是比较多的,我们先从其 AutoConfiguration 入手。

@Configuration
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FescarFeignClientAutoConfiguration {

@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
return FescarHystrixFeignBuilder.builder(beanFactory);
}

@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
return FescarSentinelFeignBuilder.builder(beanFactory);
}

@Bean
@ConditionalOnMissingBean
@Scope("prototype")
Feign.Builder feignBuilder(BeanFactory beanFactory) {
return FescarFeignBuilder.builder(beanFactory);
}

@Configuration
protected static class FeignBeanPostProcessorConfiguration {

@Bean
FescarBeanPostProcessor fescarBeanPostProcessor(
FescarFeignObjectWrapper fescarFeignObjectWrapper) {
return new FescarBeanPostProcessor(fescarFeignObjectWrapper);
}

@Bean
FescarContextBeanPostProcessor fescarContextBeanPostProcessor(
BeanFactory beanFactory) {
return new FescarContextBeanPostProcessor(beanFactory);
}

@Bean
FescarFeignObjectWrapper fescarFeignObjectWrapper(BeanFactory beanFactory) {
return new FescarFeignObjectWrapper(beanFactory);
}
}

}

FescarFeignClientAutoConfiguration 在存在 Client.class 时生效,且要求作用在 FeignAutoConfiguration 之前。由于 FeignClientsConfiguration 是在 FeignAutoConfiguration 生成 FeignContext 生效的,所以根据依赖关系, FescarFeignClientAutoConfiguration 同样早于 FeignClientsConfiguration。

FescarFeignClientAutoConfiguration 自定义了 Feign.Builder,针对于 feign.sentinel,feign.hystrix 和 feign 的情况做了适配,目的是自定义 feign 中 Client 的真正实现为 FescarFeignClient。

HystrixFeign.builder().retryer(Retryer.NEVER_RETRY)
.client(new FescarFeignClient(beanFactory))
SentinelFeign.builder().retryer(Retryer.NEVER_RETRY)
.client(new FescarFeignClient(beanFactory));
Feign.builder().client(new FescarFeignClient(beanFactory));

FescarFeignClient 是对原来的 Feign 客户端代理增强,具体代码见下图:

public class FescarFeignClient implements Client {

private final Client delegate;
private final BeanFactory beanFactory;

FescarFeignClient(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.delegate = new Client.Default(null, null);
}

FescarFeignClient(BeanFactory beanFactory, Client delegate) {
this.delegate = delegate;
this.beanFactory = beanFactory;
}

@Override
public Response execute(Request request, Request.Options options) throws IOException {

Request modifiedRequest = getModifyRequest(request);

try {
return this.delegate.execute(modifiedRequest, options);
}
finally {

}
}

private Request getModifyRequest(Request request) {

String xid = RootContext.getXID();

if (StringUtils.isEmpty(xid)) {
return request;
}

Map<String, Collection<String>> headers = new HashMap<>();
headers.putAll(request.headers());

List<String> fescarXid = new ArrayList<>();
fescarXid.add(xid);
headers.put(RootContext.KEY_XID, fescarXid);

return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}

上面的过程中我们可以看到,FescarFeignClient 对原来的 Request 做了修改,它首先将 XID 从当前的事务上下文中取出,在 XID 不为空的情况下,将 XID 放到了 Header 中。

FeignBeanPostProcessorConfiguration 定义了 3 个 bean:FescarContextBeanPostProcessor、FescarBeanPostProcessor 和 FescarFeignObjectWrapper。其中 FescarContextBeanPostProcessor FescarBeanPostProcessor 实现了 Spring BeanPostProcessor 接口。 以下为 FescarContextBeanPostProcessor 实现。

    @Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof FeignContext && !(bean instanceof FescarFeignContext)) {
return new FescarFeignContext(getFescarFeignObjectWrapper(),
(FeignContext) bean);
}
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}

BeanPostProcessor 中的两个方法可以对 Spring 容器中的 Bean 做前后处理,postProcessBeforeInitialization 处理时机是初始化之前,postProcessAfterInitialization 的处理时机是初始化之后,这 2 个方法的返回值可以是原先生成的实例 bean,或者使用 wrapper 包装后的实例。

FescarContextBeanPostProcessor 将 FeignContext 包装成 FescarFeignContext。
FescarBeanPostProcessor 将 FeignClient 根据是否继承了 LoadBalancerFeignClient 包装成 FescarLoadBalancerFeignClient 和 FescarFeignClient。

FeignAutoConfiguration 中的 FeignContext 并没有加 ConditionalOnXXX 的条件,所以 Fescar 采用预置处理的方式将 FeignContext 包装成 FescarFeignContext。

    @Bean
public FeignContext feignContext() {
FeignContext context = new FeignContext();
context.setConfigurations(this.configurations);
return context;
}

而对于 Feign Client,FeignClientFactoryBean 中会获取 FeignContext 的实例对象。对于开发者采用 @Configuration 注解的自定义配置的 Feign Client 对象,这里会被配置到 builder,导致 FescarFeignBuilder 中增强后的 FescarFeignCliet 失效。FeignClientFactoryBean 中关键代码如下:

	/**
* @param <T> the target type of the Feign client
* @return a {@link Feign} client created with the specified data and the context information
*/
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);

if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}

上述代码根据是否指定了注解参数中的 URL 来选择直接调用 URL 还是走负载均衡,targeter.target 通过动态代理创建对象。大致过程为:将解析出的 feign 方法放入 map ,再通过将其作为参数传入生成 InvocationHandler,进而生成动态代理对象。
FescarContextBeanPostProcessor 的存在,即使开发者对 FeignClient 自定义操作,依旧可以完成 Fescar 所需的全局事务的增强。

对于 FescarFeignObjectWrapper,我们重点关注下 Wrapper 方法:

	Object wrap(Object bean) {
if (bean instanceof Client && !(bean instanceof FescarFeignClient)) {
if (bean instanceof LoadBalancerFeignClient) {
LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
return new FescarLoadBalancerFeignClient(client.getDelegate(), factory(),
clientFactory(), this.beanFactory);
}
return new FescarFeignClient(this.beanFactory, (Client) bean);
}
return bean;
}

wrap 方法中,如果 bean 是 LoadBalancerFeignClient 的实例对象,那么首先通过 client.getDelegate() 方法将 LoadBalancerFeignClient 代理的实际 Client 对象取出后包装成 FescarFeignClient,再生成 LoadBalancerFeignClient 的子类 FescarLoadBalancerFeignClient 对象。如果 bean 是 Client 的实例对象且不是 FescarFeignClient LoadBalancerFeignClient,那么 bean 会直接包装生成 FescarFeignClient。

上面的流程设计还是比较巧妙的,首先根据 Spring boot 的 Auto Configuration 控制了配置的先后顺序,同时自定义了 Feign Builder 的 Bean,保证了 Client 均是经过增强后的 FescarFeignClient 。再通过 BeanPostProcessor 对 Spring 容器中的 Bean 做了一遍包装,保证容器内的 Bean 均是增强后 FescarFeignClient ,避免 FeignClientFactoryBean getTarget 方法的替换动作。

Hystrix 隔离

下面我们再来看下 Hystrix 部分,为什么要单独把 Hystrix 拆出来看呢,而且 Fescar 代码也单独实现了个策略类。目前事务上下文 RootContext 的默认实现是基于 ThreadLocal 方式的 ThreadLocalContextCore,也就是上下文其实是和线程绑定的。Hystrix 本身有两种隔离状态的模式,基于信号量或者基于线程池进行隔离。Hystrix 官方建议是采取线程池的方式来充分隔离,也是一般情况下在采用的模式:

Thread or Semaphore
The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).

Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.

Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

service 层的业务代码和请求发出的线程肯定不是同一个,那么 ThreadLocal 的方式就没办法将 XID 传递给 Hystrix 的线程并传递给被调用方的。怎么处理这件事情呢,Hystrix 提供了机制让开发者去自定义并发策略,只需要继承 HystrixConcurrencyStrategy 重写 wrapCallable 方法即可。

public class FescarHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

private HystrixConcurrencyStrategy delegate;

public FescarHystrixConcurrencyStrategy() {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
}

@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
if (c instanceof FescarContextCallable) {
return c;
}

Callable<K> wrappedCallable;
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
wrappedCallable = c;
}
if (wrappedCallable instanceof FescarContextCallable) {
return wrappedCallable;
}

return new FescarContextCallable<>(wrappedCallable);
}

private static class FescarContextCallable<K> implements Callable<K> {

private final Callable<K> actual;
private final String xid;

FescarContextCallable(Callable<K> actual) {
this.actual = actual;
this.xid = RootContext.getXID();
}

@Override
public K call() throws Exception {
try {
RootContext.bind(xid);
return actual.call();
}
finally {
RootContext.unbind();
}
}

}
}

Fescar 也提供一个 FescarHystrixAutoConfiguration,在存在 HystrixCommand 的时候生成 FescarHystrixConcurrencyStrategy

@Configuration
@ConditionalOnClass(HystrixCommand.class)
public class FescarHystrixAutoConfiguration {

@Bean
FescarHystrixConcurrencyStrategy fescarHystrixConcurrencyStrategy() {
return new FescarHystrixConcurrencyStrategy();
}

}

参考文献

本文作者

郭树抗,社区昵称 ywind,曾就职于华为终端云,现搜狐智能媒体中心 Java 工程师,目前主要负责搜狐号相关开发,对分布式事务、分布式系统和微服务架构有异常浓厚的兴趣。
季敏(清铭),社区昵称 slievrly,Fescar 开源项目负责人,阿里巴巴中间件 TXC/GTS 核心研发成员,长期从事于分布式中间件核心研发工作,在分布式事务领域有着较丰富的技术积累。

· 阅读需 9 分钟

针对Fescar 相信很多开发者已经对他并不陌生,当然Fescar 已经成为了过去时,为什么说它是过去时,因为Fescar 已经华丽的变身为Seata。如果还不知道Seata 的朋友,请登录下面网址查看。

SEATA GITHUB:[https://github.com/apache/incubator-seata]

对于阿里各位同学的前仆后继,给我们广大开发者带来很多开源软件,在这里对他们表示真挚的感谢与问候。

今天在这里和大家分享下Spring Cloud 整合Seata 的相关心得。也让更多的朋友在搭建的道路上少走一些弯路,少踩一些坑。

2.工程内容

本次搭建流程为:client->网关->服务消费者->服务提供者.

                        技术框架:spring cloud gateway

spring cloud fegin

nacos1.0.RC2

fescar-server0.4.1(Seata)

关于nacos的启动方式请参考:Nacos启动参考

首先seata支持很多种注册服务方式,在 fescar-server-0.4.1\conf 目录下

    file.conf
logback.xml
nacos-config.sh
nacos-config.text
registry.conf

总共包含五个文件,其中 file.conf和 registry.conf 分别是我们在 服务消费者 & 服务提供者 代码段需要用到的文件。 注:file.conf和 registry.conf 必须在当前使用的应用程序中,即: 服务消费者 & 服务提供者 两个应用在都需要包含。 如果你采用了配置中心 是nacos 、zk ,file.cnf 是可以忽略的。但是type=“file” 如果是为file 就必须得用file.cnf

下面是registry.conf 文件中的配置信息,其中 registry 是注册服务中心配置。config为配置中心的配置地方。

从下面可知道目前seata支持nacos,file eureka redis zookeeper 等注册配置方式,默认下载的type=“file” 文件方式,当然这里选用什么方式,取决于

每个人项目的实际情况,这里我选用的是nacos,eureka的也是可以的,我这边分别对这两个版本进行整合测试均可以通过。

注:如果整合eureka请选用官方最新版本。

3.核心配置

registry {
# file 、nacos 、eureka、redis、zk
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
apollo {
app.id = "fescar-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}

这里要说明的是nacos-config.sh 是针对采用nacos配置中心的话,需要执行的一些默认初始化针对nacos的脚本。

SEATA的启动方式参考官方: 注意,这里需要说明下,命令启动官方是通过 空格区分参数,所以要注意。这里的IP 是可选参数,因为涉及到DNS解析,在部分情况下,有的时候在注册中心fescar 注入nacos的时候会通过获取地址,如果启动报错注册发现是计算机名称,需要指定IP。或者host配置IP指向。不过这个问题,在最新的SEATA中已经进行了修复。

sh fescar-server.sh 8091 /home/admin/fescar/data/ IP(可选)

上面提到过,在我们的代码中也是需要file.conf 和registry.conf 这里着重的地方要说的是file.conf,file.conf只有当registry中 配置file的时候才会进行加载,如果采用ZK、nacos、作为配置中心,可以忽略。因为type指定其他是不加载file.conf的,但是对应的 service.localRgroup.grouplist 和 service.vgroupMapping 需要在支持配置中心 进行指定,这样你的client 在启动后会通过自动从配置中心获取对应的 SEATA 服务 和地址。如果不配置会出现无法连接server的错误。当然如果你采用的eureka在config的地方就需要采用type="file" 目前SEATA config暂时不支持eureka的形势

transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
}
service {
#vgroup->rgroup
vgroup_mapping.service-provider-fescar-service-group = "default"
#only support single node
localRgroup.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}

4.服务相关

这里有两个地方需要注意

    grouplist IP,这里是当前fescar-sever的IP端口,
vgroup_mapping的配置。

vgroup_mapping.服务名称-fescar-service-group,这里 要说下服务名称其实是你当前的consumer 或者provider application.properties的配置的应用名称:spring.application.name=service-provider,源代码中是 获取应用名称与 fescar-service-group 进行拼接,做key值。同理value是当前fescar的服务名称, cluster = "default" / application = "default"

     vgroup_mapping.service-provider-fescar-service-group = "default"
#only support single node
localRgroup.grouplist = "127.0.0.1:8091"

同理无论是provider 还是consumer 都需要这两个文件进行配置。

如果你采用nacos做配置中心,需要在nacos通过添加配置方式进行配置添加。

5.事务使用

我这里的代码逻辑是请求通过网关进行负载转发到我的consumer上,在consumer 中通过fegin进行provider请求。官方的例子中是通过fegin进行的,而我们这边直接通过网关转发,所以全局事务同官方的demo一样 也都是在controller层。

@RestController
public class DemoController {
@Autowired
private DemoFeignClient demoFeignClient;

@Autowired
private DemoFeignClient2 demoFeignClient2;
@GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
@GetMapping("/getdemo")
public String demo() {

// 调用A 服务 简单save
ResponseData<Integer> result = demoFeignClient.insertService("test",1);
if(result.getStatus()==400) {
System.out.println(result+"+++++++++++++++++++++++++++++++++++++++");
throw new RuntimeException("this is error1");
}

// 调用B 服务。报错测试A 服务回滚
ResponseData<Integer> result2 = demoFeignClient2.saveService();

if(result2.getStatus()==400) {
System.out.println(result2+"+++++++++++++++++++++++++++++++++++++++");
throw new RuntimeException("this is error2");
}

return "SUCCESS";
}
}

到此为止核心的事务整合基本到此结束了,我这里是针对A,B 两个provider进行调用,当B发生报错后,进行全局事务回滚。当然每个事务内部都可以通过自己的独立本地事务去处理自己本地事务方式。

SEATA是通过全局的XID方式进行事务统一标识方式。这里就不列出SEATA需要用的数据库表。具体参考:spring-cloud-fescar 官方DEMO

5.数据代理

这里还有一个重要的说明就是,在分库服务的情况下,每一个数据库内都需要有一个undo_log的数据库表进行XID统一存储处理。

同事针对每个提供服务的项目,需要进行数据库连接池的代理。也就是:

目前只支持Druid连接池,后续会继续支持。

@Configuration
public class DatabaseConfiguration {


@Bean(destroyMethod = "close", initMethod = "init")
@ConfigurationProperties(prefix="spring.datasource")
public DruidDataSource druidDataSource() {

return new DruidDataSource();
}


@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {

return new DataSourceProxy(druidDataSource);
}


@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
return factoryBean.getObject();
}
}

大家要注意的就是配置文件和数据代理。如果没有进行数据源代理,undo_log是无数据的,也就是没办进行XID的管理。

本文作者:大菲.Fei

· 阅读需 26 分钟

前言

在分布式系统中,分布式事务是一个必须要解决的问题,目前使用较多的是最终一致性方案。自年初阿里开源了Fescar(四月初更名为Seata)后,该项目受到了极大的关注度,目前已接近8000Star。Seata以高性能和零侵入的方式为目标解决微服务领域的分布式事务难题,目前正处于快速迭代中,近期小目标是生产可用的Mysql版本。关于Seata的总体介绍,可以查看官方WIKI获得更多更全面的内容介绍。

本文主要基于spring cloud+spring jpa+spring cloud alibaba fescar+mysql+seata的结构,搭建一个分布式系统的demo,通过seata的debug日志和源代码,从client端(RM、TM)的角度分析说明其工作流程及原理。

文中代码基于fescar-0.4.1,由于项目刚更名为seata不久,例如一些包名、类名、jar包名称还都是fescar的命名,故下文中仍使用fescar进行表述。

示例项目:https://github.com/fescar-group/fescar-samples/tree/master/springcloud-jpa-seata

相关概念

  • XID:全局事务的唯一标识,由ip:port:sequence组成
  • Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚
  • Transaction Manager (TM ):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议
  • Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

分布式框架支持

Fescar使用XID表示一个分布式事务,XID需要在一次分布式事务请求所涉的系统中进行传递,从而向feacar-server发送分支事务的处理情况,以及接收feacar-server的commit、rollback指令。 Fescar官方已支持全版本的dubbo协议,而对于spring cloud(spring-boot)的分布式项目社区也提供了相应的实现

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-fescar</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>

该组件实现了基于RestTemplate、Feign通信时的XID传递功能。

业务逻辑

业务逻辑是经典的下订单、扣余额、减库存流程。 根据模块划分为三个独立的服务,且分别连接对应的数据库

  • 订单:order-server
  • 账户:account-server
  • 库存:storage-server

另外还有发起分布式事务的业务系统

  • 业务:business-server

项目结构如下图 在这里插入图片描述

正常业务

  1. business发起购买请求
  2. storage扣减库存
  3. order创建订单
  4. account扣减余额

异常业务

  1. business发起购买请求
  2. storage扣减库存
  3. order创建订单
  4. account扣减余额异常

正常流程下2、3、4步的数据正常更新全局commit,异常流程下的数据则由于第4步的异常报错全局回滚。

配置文件

fescar的配置入口文件是registry.conf,查看代码ConfigurationFactory得知目前还不能指定该配置文件,所以配置文件名称只能为registry.conf

private static final String REGISTRY_CONF = "registry.conf";
public static final Configuration FILE_INSTANCE = new FileConfiguration(REGISTRY_CONF);

registry中可以指定具体配置的形式,默认使用file类型,在file.conf中有3部分配置内容

  1. transport transport部分的配置对应NettyServerConfig类,用于定义Netty相关的参数,TM、RM与fescar-server之间使用Netty进行通信
  2. service
	 service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#配置Client连接TC的地址
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
是否启用seata的分布式事务
disableGlobalTransaction = false
}
  1. client
	client {
#RM接收TC的commit通知后缓冲上限
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}

数据源Proxy

除了前面的配置文件,fescar在AT模式下稍微有点代码量的地方就是对数据源的代理指定,且目前只能基于DruidDataSource的代理。 注:在最新发布的0.4.2版本中已支持任意数据源类型

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}

使用DataSourceProxy的目的是为了引入ConnectionProxy,fescar无侵入的一方面就体现在ConnectionProxy的实现上,即分支事务加入全局事务的切入点是在本地事务的commit阶段,这样设计可以保证业务数据与undo_log是在一个本地事务中。

undo_log是需要在业务库上创建的一个表,fescar依赖该表记录每笔分支事务的状态及二阶段rollback的回放数据。不用担心该表的数据量过大形成单点问题,在全局事务commit的场景下事务对应的undo_log会异步删除。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

启动Server

前往https://github.com/apache/incubator-seata/releases 下载与Client版本对应的fescar-server,避免由于版本的不同导致的协议不一致问题 进入解压之后的 bin 目录,执行

./fescar-server.sh 8091 ../data

启动成功输出

2019-04-09 20:27:24.637 INFO [main]c.a.fescar.core.rpc.netty.AbstractRpcRemotingServer.start:152 -Server started ... 

启动Client

fescar的加载入口类位于GlobalTransactionAutoConfiguration,对基于spring boot的项目能够自动加载,当然也可以通过其他方式示例化GlobalTransactionScanner

@Configuration
@EnableConfigurationProperties({FescarProperties.class})
public class GlobalTransactionAutoConfiguration {
private final ApplicationContext applicationContext;
private final FescarProperties fescarProperties;

public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, FescarProperties fescarProperties) {
this.applicationContext = applicationContext;
this.fescarProperties = fescarProperties;
}

/**
* 示例化GlobalTransactionScanner
* scanner为client初始化的发起类
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
String txServiceGroup = this.fescarProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
this.fescarProperties.setTxServiceGroup(txServiceGroup);
}

return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
}

可以看到支持一个配置项FescarProperties,用于配置事务分组名称

spring.cloud.alibaba.fescar.tx-service-group=my_test_tx_group

如果不指定服务组,则默认使用spring.application.name+ -fescar-service-group生成名称,所以不指定spring.application.name启动会报错

@ConfigurationProperties("spring.cloud.alibaba.fescar")
public class FescarProperties {
private String txServiceGroup;

public FescarProperties() {
}

public String getTxServiceGroup() {
return this.txServiceGroup;
}

public void setTxServiceGroup(String txServiceGroup) {
this.txServiceGroup = txServiceGroup;
}
}

获取applicationId和txServiceGroup后,创建GlobalTransactionScanner对象,主要看类中initClient方法

private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(
"applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
}
//init TM
TMClient.init(applicationId, txServiceGroup);

//init RM
RMClient.init(applicationId, txServiceGroup);

}

方法中可以看到初始化了TMClientRMClient,对于一个服务既可以是TM角色也可以是RM角色,至于什么时候是TM或者RM则要看在一次全局事务中@GlobalTransactional注解标注在哪。 Client创建的结果是与TC的一个Netty连接,所以在启动日志中可以看到两个Netty Channel,其中标明了transactionRole分别为TMROLERMROLE

2019-04-09 13:42:57.417  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":101,"version":"0.4.1"},"transactionRole":"TMROLE"}
2019-04-09 13:42:57.505 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":103,"version":"0.4.1"},"transactionRole":"RMROLE"}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterTMRequest{applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterRMRequest{resourceIds='null', applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:1
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:2
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@3b06d101 msgId:1, future :com.alibaba.fescar.core.protocol.MessageFuture@28bb1abd, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.TmRpcClient@65fc3fb7 msgId:2, future :com.alibaba.fescar.core.protocol.MessageFuture@9a1e3df, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.710 INFO 93715 --- [imeoutChecker_1] c.a.fescar.core.rpc.netty.RmRpcClient : register RM success. server version:0.4.1,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.710 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 114 ms, version:0.4.1,role:TMROLE,channel:[id: 0xd22fe0c5, L:/127.0.0.1:57398 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.711 INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 125 ms, version:0.4.1,role:RMROLE,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]

日志中可以看到

  1. 创建Netty连接
  2. 发送注册请求
  3. 得到响应结果
  4. RmRpcClientTmRpcClient成功实例化

TM处理流程

在本例中,TM的角色是business-service,BusinessService的purchase方法标注了@GlobalTransactional注解

@Service
public class BusinessService {

@Autowired
private StorageFeignClient storageFeignClient;
@Autowired
private OrderFeignClient orderFeignClient;

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount){
storageFeignClient.deduct(commodityCode, orderCount);

orderFeignClient.create(userId, commodityCode, orderCount);
}
}

方法调用后将会创建一个全局事务,首先关注@GlobalTransactional注解的作用,在GlobalTransactionalInterceptor中被拦截处理

/**
* AOP拦截方法调用
*/
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

//获取方法GlobalTransactional注解
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);

//如果方法有GlobalTransactional注解,则拦截到相应方法处理
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

handleGlobalTransaction方法中对TransactionalTemplate的execute进行了调用,从类名可以看到这是一个标准的模版方法,它定义了TM对全局事务处理的标准步骤,注释已经比较清楚了

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

try {
// 2. begin transaction
try {
triggerBeforeBegin();
tx.begin(business.timeout(), business.name());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}

通过DefaultGlobalTransaction的begin方法开启全局事务

public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
check();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid != null) {
throw new IllegalStateException();
}
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
//具体开启事务的方法,获取TC返回的XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Begin a NEW global transaction [" + xid + "]");
}
}

方法开头处if (role != GlobalTransactionRole.Launcher)对role的判断有关键的作用,表明当前是全局事务的发起者(Launcher)还是参与者(Participant)。如果在分布式事务的下游系统方法中也加上@GlobalTransactional注解,那么它的角色就是Participant,会忽略后面的begin直接return,而判断是Launcher还是Participant是根据当前上下文是否已存在XID来判断,没有XID的就是Launcher,已经存在XID的就是Participant. 由此可见,全局事务的创建只能由Launcher执行,而一次分布式事务中也只有一个Launcher存在。

DefaultTransactionManager负责TM与TC通讯,发送begin、commit、rollback指令

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}

至此拿到fescar-server返回的XID表示一个全局事务创建成功,日志中也反应了上述流程

2019-04-09 13:46:57.417 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : offer message: timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.417 DEBUG 31326 --- [geSend_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int), channel:[id: 0xa148545e, L:/127.0.0.1:56120 - R:/127.0.0.1:8091],active?true,writable?true,isopen?true
2019-04-09 13:46:57.418 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.421 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.GlobalBeginResponse@2dc480dc,messageId:1196
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.fescar.core.context.RootContext : bind 192.168.224.93:8091:2008502699
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.tm.api.DefaultGlobalTransaction : Begin a NEW global transaction [192.168.224.93:8091:2008502699]

全局事务创建后,就开始执行business.execute(),即业务代码storageFeignClient.deduct(commodityCode, orderCount)进入RM处理流程,此处的业务逻辑为调用storage-service的扣减库存接口。

RM处理流程

@GetMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count){
storageService.deduct(commodityCode,count);
return true;
}

@Transactional
public void deduct(String commodityCode, int count){
Storage storage = storageDAO.findByCommodityCode(commodityCode);
storage.setCount(storage.getCount()-count);

storageDAO.save(storage);
}

storage的接口和service方法并未出现fescar相关的代码和注解,体现了fescar的无侵入。那它是如何加入到这次全局事务中的呢?答案在ConnectionProxy中,这也是前面说为什么必须要使用DataSourceProxy的原因,通过DataSourceProxy才能在业务代码的本地事务提交时,fescar通过该切入点,向TC注册分支事务并发送RM的处理结果。

由于业务代码本身的事务提交被ConnectionProxy代理实现,所以在提交本地事务时,实际执行的是ConnectionProxy的commit方法

public void commit() throws SQLException {
//如果当前是全局事务,则执行全局事务的提交
//判断是不是全局事务,就是看当前上下文是否存在XID
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}

private void processGlobalTransactionCommit() throws SQLException {
try {
//首先是向TC注册RM,拿到TC分配的branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (context.hasUndoLog()) {
//写入undolog
UndoLogManager.flushUndoLogs(this);
}

//提交本地事务,写入undo_log和业务数据在同一个本地事务中
targetConnection.commit();
} catch (Throwable ex) {
//向TC发送RM的事务处理失败的通知
report(false);
if (ex instanceof SQLException) {
throw new SQLException(ex);
}
}
//向TC发送RM的事务处理成功的通知
report(true);
context.reset();
}

private void register() throws TransactionException {
//注册RM,构建request通过netty向TC发送注册指令
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
//将返回的branchId存在上下文中
context.setBranchId(branchId);
}

通过日志印证一下上面的流程

2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor   : xid in RootContext null xid in RpcContext 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext : bind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor : bind 192.168.0.2:8091:2008546211 to RootContext
2019-04-09 21:57:48.386 INFO 38933 --- [nio-8081-exec-1] o.h.h.i.QueryTranslatorFactoryInitiator : HHH000397: Using ASTQueryTranslatorFactory
Hibernate: select storage0_.id as id1_0_, storage0_.commodity_code as commodit2_0_, storage0_.count as count3_0_ from storage_tbl storage0_ where storage0_.commodity_code=?
Hibernate: update storage_tbl set count=? where id=?
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : will connect to 192.168.0.2:8091
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : RM will register :jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false
2019-04-09 21:57:48.673 INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to {"address":"192.168.0.2:8091","message":{"applicationId":"storage-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"resourceIds":"jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false","transactionServiceGroup":"hello-service-fescar-service-group","typeCode":103,"version":"0.4.0"},"transactionRole":"RMROLE"}
2019-04-09 21:57:48.677 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false', applicationId='storage-service', transactionServiceGroup='hello-service-fescar-service-group'}
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:9
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:9, future :com.alibaba.fescar.core.protocol.MessageFuture@186cd3e0, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 21:57:48.680 INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient : register RM success. server version:0.4.1,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory : register success, cost 3 ms, version:0.4.1,role:RMROLE,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting : offer message: transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.681 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.681 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.687 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage BranchRegisterResponse: transactionId=2008546211,branchId=2008546212,result code =Success,getMsg =null,messageId:11
2019-04-09 21:57:48.702 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.rm.datasource.undo.UndoLogManager : Flushing UNDO LOG: {"branchId":2008546212,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":993}]}],"tableName":"storage_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":994}]}],"tableName":"storage_tbl"},"sqlType":"UPDATE","tableName":"storage_tbl"}],"xid":"192.168.0.2:8091:2008546211"}
2019-04-09 21:57:48.755 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting : offer message: transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.755 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : write message:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.756 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.758 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.BranchReportResponse@582a08cf,messageId:13
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext : unbind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor : unbind 192.168.0.2:8091:2008546211 from RootContext
  1. 获取business-service传来的XID
  2. 绑定XID到当前上下文中
  3. 执行业务逻辑sql
  4. 向TC创建本次RM的Netty连接
  5. 向TC发送分支事务的相关信息
  6. 获得TC返回的branchId
  7. 记录Undo Log数据
  8. 向TC发送本次事务PhaseOne阶段的处理结果
  9. 从当前上下文中解绑XID

其中第1步和第9步,是在FescarHandlerInterceptor中完成的,该类并不属于fescar,是前面提到的spring-cloud-alibaba-fescar,它实现了基于feign、rest通信时将xid bind和unbind到当前请求上下文中。到这里RM完成了PhaseOne阶段的工作,接着看PhaseTwo阶段的处理逻辑。

事务提交

各分支事务执行完成后,TC对各RM的汇报结果进行汇总,给各RM发送commit或rollback的指令

2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null,messageId:1
2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:1, body:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.814 INFO 38933 --- [atch_RMROLE_1_8] c.a.f.core.rpc.netty.RmMessageListener : onMessage:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.816 INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler : Branch committing: 192.168.0.2:8091:2008546211 2008546212 jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false null
2019-04-09 21:57:49.816 INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
2019-04-09 21:57:49.817 INFO 38933 --- [atch_RMROLE_1_8] c.a.fescar.core.rpc.netty.RmRpcClient : RmRpcClient sendResponse branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null
2019-04-09 21:57:49.817 DEBUG 38933 --- [atch_RMROLE_1_8] c.a.f.c.rpc.netty.AbstractRpcRemoting : send response:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:49.817 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler : Send:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null

从日志中可以看到

  1. RM收到XID=192.168.0.2:8091:2008546211,branchId=2008546212的commit通知
  2. 执行commit动作
  3. 将commit结果发送给TC,branchStatus为PhaseTwo_Committed

具体看下二阶段commit的执行过程,在AbstractRMHandler类的doBranchCommit方法

/**
* 拿到通知的xid、branchId等关键参数
* 然后调用RM的branchCommit
*/
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("Branch commit result: " + status);
}

最终会将branchCommit的请求调用到AsyncWorker的branchCommit方法。AsyncWorker的处理方式是fescar架构的一个关键部分,因为大部分事务都是会正常提交的,所以在PhaseOne阶段就已经结束了,这样就可以将锁最快的释放。PhaseTwo阶段接收commit的指令后,异步处理即可。将PhaseTwo的时间消耗排除在一次分布式事务之外。

private static final List<Phase2Context> ASYNC_COMMIT_BUFFER = Collections.synchronizedList( new ArrayList<Phase2Context>());

/**
* 将需要提交的XID加入list
*/
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
ASYNC_COMMIT_BUFFER.add(new Phase2Context(branchType, xid, branchId, resourceId, applicationData));
} else {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}

/**
* 通过定时任务消费list中的XID
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... " + e.getMessage());
}
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}

private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
return;
}
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>();
Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();

//一次定时循环取出ASYNC_COMMIT_BUFFER中的所有待办数据
//以resourceId作为key分组待commit数据,resourceId是一个数据库的连接url
//在前面的日志中可以看到,目的是为了覆盖应用的多数据源创建
while (iterator.hasNext()) {
Phase2Context commitContext = iterator.next();
List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
if (contextsGroupedByResourceId == null) {
contextsGroupedByResourceId = new ArrayList<>();
mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
}
contextsGroupedByResourceId.add(commitContext);

iterator.remove();

}

for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
try {
try {
//根据resourceId获取数据源以及连接
DataSourceProxy dataSourceProxy = DataSourceManager.get().get(entry.getKey());
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
for (Phase2Context commitContext : contextsGroupedByResourceId) {
try {
//执行undolog的处理,即删除xid、branchId对应的记录
UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
} catch (Exception ex) {
LOGGER.warn(
"Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex);
}
}

} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}

所以对于commit动作的处理,RM只需删除xid、branchId对应的undo_log即可。

事务回滚

对于rollback场景的触发有两种情况

  1. 分支事务处理异常,即ConnectionProxyreport(false)的情况
  2. TM捕获到下游系统上抛的异常,即发起全局事务标有@GlobalTransactional注解的方法捕获到的异常。在前面TransactionalTemplate类的execute模版方法中,对business.execute()的调用进行了catch,catch后会调用rollback,由TM通知TC对应XID需要回滚事务
 public void rollback() throws TransactionException {
//只有Launcher能发起这个rollback
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
}
return;
}
if (xid == null) {
throw new IllegalStateException();
}

status = transactionManager.rollback(xid);
if (RootContext.getXID() != null) {
if (xid.equals(RootContext.getXID())) {
RootContext.unbind();
}
}
}

TC汇总后向参与者发送rollback指令,RM在AbstractRMHandler类的doBranchRollback方法中接收这个rollback的通知

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("Branch rolling back: " + xid + " " + branchId + " " + resourceId);
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("Branch rollback result: " + status);
}

然后将rollback请求传递到DataSourceManager类的branchRollback方法

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
//根据resourceId获取对应的数据源
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}

最终会执行UndoLogManager类的undo方法,因为是纯jdbc操作代码比较长就不贴出来了,可以通过连接到github查看源码,说一下undo的具体流程

  1. 根据xid和branchId查找PhaseOne阶段提交的undo_log
  2. 如果找到了就根据undo_log中记录的数据生成回放sql并执行,即还原PhaseOne阶段修改的数据
  3. 第2步处理完后,删除该条undo_log数据
  4. 如果第1步没有找到对应的undo_log,就插入一条状态为GlobalFinished的undo_log. 出现没找到的原因可能是PhaseOne阶段的本地事务异常了,导致没有正常写入。 因为xid和branchId是唯一索引,所以第4步的插入,可以防止PhaseOne阶段恢复后的成功写入,那么PhaseOne阶段就会异常,这样一来业务数据也就不会提交成功,数据达到了最终回滚了的效果

总结

本地结合分布式业务场景,分析了fescar client侧的主要处理流程,对TM和RM角色的主要源码进行了解析,希望能对大家理解fescar的工作原理有所帮助。

随着fescar的快速迭代以及后期的Roadmap规划,假以时日相信fescar能够成为开源分布式事务的标杆解决方案。

· 阅读需 30 分钟

再前不久,我写了一篇关于分布式事务中间件 Fescar 的解析,没过几天 Fescar 团队对其进行了品牌升级,取名为 Seata(Simpe Extensible Autonomous Transaction Architecture),而以前的 Fescar 的英文全称为 Fast & EaSy Commit And Rollback。可以看见 Fescar 从名字上来看更加局限于 Commit 和 Rollback,而新的品牌名字 Seata 旨在打造一套一站式分布式事务解决方案。更换名字之后,我对其未来的发展更有信心。

这里先大概回忆一下 Seata 的整个过程模型:

  • TM:事务的发起者。用来告诉 TC,全局事务的开始,提交,回滚。
  • RM:具体的事务资源,每一个 RM 都会作为一个分支事务注册在 TC。
  • TC:事务的协调者。也可以看做是 Fescar-server,用于接收我们的事务的注册,提交和回滚。

在之前的文章中对整个角色有个大体的介绍,在这篇文章中我将重点介绍其中的核心角色 TC,也就是事务协调器。

2.Transcation Coordinator

为什么之前一直强调 TC 是核心呢?那因为 TC 这个角色就好像上帝一样,管控着云云众生的 RM 和 TM。如果 TC 一旦不好使,那么 RM 和 TM 一旦出现小问题,那必定会乱的一塌糊涂。所以要想了解 Seata,那么必须要了解它的 TC。

那么一个优秀的事务协调者应该具备哪些能力呢?我觉得应该有以下几个:

  • 正确的协调:能正确的协调 RM 和 TM 接下来应该做什么,做错了应该怎么办,做对了应该怎么办。
  • 高可用: 事务协调器在分布式事务中很重要,如果不能保证高可用,那么它也没有存在的必要了。
  • 高性能:事务协调器的性能一定要高,如果事务协调器性能有瓶颈那么它所管理的 RM 和 TM 那么会经常遇到超时,从而引起回滚频繁。
  • 高扩展性:这个特点是属于代码层面的,如果是一个优秀的框架,那么需要给使用方很多自定义扩展,比如服务注册/发现,读取配置等等。

下面我也将逐步阐述 Seata 是如何做到上面四点。

2.1 Seata-Server 的设计

Seata-Server 整体的模块图如上所示:

  • Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否 commit,rollback 等协调活动。
  • Store:存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
  • Discovery: 服务注册/发现模块,用于将 Server 地址暴露给我们 Client。
  • Config: 用来存储和查找我们服务端的配置。
  • Lock: 锁模块,用于给 Seata 提供全局锁的功能。
  • RPC:用于和其它端通信。
  • HA-Cluster:高可用集群,目前还没开源,为 Seata 提供可靠的高可用服务,预计将会在 0.6 版本开源。

2.2 Discovery

首先来讲讲比较基础的 Discovery 模块,又称服务注册/发现模块。我们将 Seata-Sever 启动之后,需要将自己的地址暴露给其它使用者,那么就需要我们这个模块帮忙。

这个模块有个核心接口 RegistryService,如上图所示:

  • register:服务端使用,进行服务注册。
  • unregister:服务端使用,一般在 JVM 关闭钩子,ShutdownHook 中调用。
  • subscribe:客户端使用,注册监听事件,用来监听地址的变化。
  • unsubscribe:客户端使用,取消注册监听事件。
  • lookup:客户端使用,根据 key 查找服务地址列表。
  • close:都可以使用,用于关闭 Registry 资源。

如果需要添加自己定义的服务注册/发现,那么实现这个接口即可。截止目前在社区的不断开发推动下,已经有五种服务注册/发现,分别是 redis、zk、nacos、eruka 和 consul。下面简单介绍下 Nacos 的实现:

2.2.1 register 接口:

step1:校验地址是否合法

step2:获取 Nacos 的 Naming 实例,然后将地址注册到服务名为 serverAddr(固定服务名) 的对应集群分组(registry.conf 文件配置)上面。

unregister 接口类似,这里不做详解。

2.2.2 lookup 接口:

step1:获取当前 clusterName 名字。

step2:判断当前集群名对应的服务是否已经订阅过了,如果是直接从 map 中取订阅返回的数据。

step3:如果没有订阅先主动查询一次服务实例列表,然后添加订阅并将订阅返回的数据存放到 map 中,之后直接从 map 获取最新数据。

2.2.3 subscribe 接口

这个接口比较简单,具体分两步:

step1:对将要订阅的 cluster-> listener 存放到 map 中,此处 nacos 未提交单机已订阅列表,所以需要自己实现。

step2:使用 Nacos api 订阅。

2.3 Config

配置模块也是一个比较基础,比较简单的模块。我们需要配置一些常用的参数比如:Netty 的 select 线程数量,work 线程数量,session 允许最大为多少等等,当然这些参数再 Seata 中都有自己的默认设置。

同样的在 Seata 中也提供了一个接口 Configuration,用来自定义我们需要的获取配置的地方:

  • getInt/Long/Boolean/getConfig():通过 dataId 来获取对应的值,读取不到配置、异常或超时将返回参数中的默认值。
  • putConfig:用于添加配置。
  • removeConfig:删除一个配置。
  • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。

目前为止有四种方式获取 Config:File(文件获取)、Nacos、Apollo 和 ZK(不推荐)。在 Seata 中首先需要配置 registry.conf,来配置 config.type 。实现 conf 比较简单这里就不深入分析。

2.4 Store

存储层的实现对于 Seata 是否高性能,是否可靠非常关键。 如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。

在 Seata 中默认提供了文件方式的存储,下面我们定义我们存储的数据为 Session,而我们的 TM 创造的全局事务操作数据叫 GloabSession,RM 创造的分支事务操作数据叫 BranchSession,一个 GloabSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。

在 FileTransactionStoreManager#writeSession 代码中:

上面的代码主要分为下面几步:

  • step1:生成一个 TransactionWriteFuture。
  • step2:将这个 futureRequest 丢进一个 LinkedBlockingQueue 中。为什么需要将所有数据都丢进队列中呢?当然这里其实也可以用锁来实现,再另外一个阿里开源的 RocketMQ 中,使用的锁。不论是队列还是锁它们的目的是为了保证单线程写,这又是为什么呢?有人会解释说,需要保证顺序写,这样速度就很快,这个理解是错误的,我们的 FileChannel 的写方法是线程安全的,已经能保证顺序写了。保证单线程写其实是为了让我们这个写逻辑都是单线程的,因为可能有些文件写满或者记录写数据位置等等逻辑,当然这些逻辑都可以主动加锁去做,但是为了实现简单方便,直接再整个写逻辑排队处理是最为合适的。
  • step3:调用 future.get,等待我们该条数据写逻辑完成通知。

我们将数据提交到队列之后,我们接下来需要对其进行消费,代码如下:

这里将一个 WriteDataFileRunnable()提交进我们的线程池,这个 Runnable 的 run()方法如下:

分为下面几步:

step1: 判断是否停止,如果 stopping 为 true 则返回 null。

step2:从我们的队列中获取数据。

step3:判断 future 是否已经超时了,如果超时,则设置结果为 false,此时我们生产者 get()方法会接触阻塞。

step4:将我们的数据写进文件,此时数据还在 pageCahce 层并没有刷新到磁盘,如果写成功然后根据条件判断是否进行刷盘操作。

step5:当写入数量到达一定的时候,或者写入时间到达一定的时候,需要将我们当前的文件保存为历史文件,删除以前的历史文件,然后创建新的文件。这一步是为了防止我们文件无限增长,大量无效数据浪费磁盘资源。

在我们的 writeDataFile 中有如下代码:

step1:首先获取我们的 ByteBuffer,如果超出最大循环 BufferSize 就直接创建一个新的,否则就使用我们缓存的 Buffer。这一步可以很大的减少 GC。

step2:然后将数据添加进入 ByteBuffer。

step3:最后将 ByteBuffer 写入我们的 fileChannel,这里会重试三次。此时的数据还在 pageCache 层,受两方面的影响,OS 有自己的刷新策略,但是这个业务程序不能控制,为了防止宕机等事件出现造成大量数据丢失,所以就需要业务自己控制 flush。下面是 flush 的代码:

这里 flush 的条件写入一定数量或者写的时间超过一定时间,这样也会有个小问题如果是停电,那么 pageCache 中有可能还有数据并没有被刷盘,会导致少量的数据丢失。目前还不支持同步模式,也就是每条数据都需要做刷盘操作,这样可以保证每条消息都落盘,但是性能也会受到极大的影响,当然后续会不断的演进支持。

我们的 store 核心流程主要是上面几个方法,当然还有一些比如,session 重建等,这些比较简单,读者可以自行阅读。

2.5 Lock

大家知道数据库实现隔离级别主要是通过锁来实现的,同样的再分布式事务框架 Seata 中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在 Seata 中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。

Lock 模块也就是 Seata 实现隔离级别的核心模块。在 Lock 模块中提供了一个接口用于管理我们的锁:

其中有三个方法:

  • acquireLock:用于对我们的 BranchSession 加锁,这里虽然是传的分支事务 Session,实际上是对分支事务的资源加锁,成功返回 true。
  • isLockable:根据事务 ID,资源 Id,锁住的 Key 来查询是否已经加锁。
  • cleanAllLocks:清除所有的锁。 对于锁我们可以在本地实现,也可以通过 redis 或者 mysql 来帮助我们实现。官方默认提供了本地全局锁的实现:

在本地锁的实现中有两个常量需要关注:

  • BUCKET_PER_TABLE:用来定义每个 table 有多少个 bucket,目的是为了后续对同一个表加锁的时候减少竞争。
  • LOCK_MAP:这个 map 从定义上来看非常复杂,里里外外套了很多层 Map,这里用个表格具体说明一下:
层数keyvalue
1-LOCK_MAPresourceId(jdbcUrl)dbLockMap
2- dbLockMaptableName (表名)tableLockMap
3- tableLockMapPK.hashcode%Bucket (主键值的 hashcode%bucket)bucketLockMap
4- bucketLockMapPKtrascationId

可以看见实际上的加锁在 bucketLockMap 这个 map 中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到 bucketLockMap,然后将当前 trascationId 塞进去,如果这个主键当前有 TranscationId,那么比较是否是自己,如果不是则加锁失败。

2.6 RPC

保证 Seata 高性能的关键之一也是使用了 Netty 作为 RPC 框架,采用默认配置的线程模型如下图所示:

如果采用默认的基本配置那么会有一个 Acceptor 线程用于处理客户端的链接,会有 cpu*2 数量的 NIO-Thread,再这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和 TM 注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为 100,最大为 500。

Seata 目前允许配置的传输层配置如图所示,用户可根据需要进行 Netty 传输层面的调优,配置通过配置中心配置,首次加载时生效。

这里需要提一下的是 Seata 的心跳机制,这里是使用 Netty 的 IdleStateHandler 完成的,如下:

在 Sever 端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为 15s(客户端默认写空闲为 5s,发送 ping 消息),如果超过 15s 则会将链接断开,关闭资源。

step1:判断是否是读空闲的检测事件。

step2:如果是则断开链接,关闭资源。
另外 Seata 做了内存池、客户端做了批量小包合并发送、Netty 连接池(减少连接创建时的服务不可用时间)等功能,以下为批量小包合并功能。

客户端的消息发送并不是真正的消息发送通过 AbstractRpcRemoting#sendAsyncRequest 包装成 RpcMessage 存储至 basket 中并唤醒合并发送线程。合并发送线程通过 while true 的形式 最长等待 1ms 对 basket 的消息取出包装成 merge 消息进行真正发送,此时若 channel 出现异常则会通过 fail-fast 快速失败返回结果。merge 消息发送前在 map 中标识,收到结果后批量确认(AbstractRpcRemotingClient#channelRead),并通过 dispatch 分发至 messageListener 和 handler 去处理。同时,timerExecutor 定时对已发送消息进行超时检测,若超时置为失败。具体消息协议设计将会在后续的文章中给出,敬请关注。
Seata 的 Netty Client 由 TMClient 和 RMClient 组成,根据事务角色功能区分,都继承 AbstractRpcRemotingClient,AbstractRpcRemotingClient 实现了 RemotingService(服务启停), RegisterMsgListener(netty 连接池连接创建回调)和 ClientMessageSender(消息发送)继承了 AbstractRpcRemoting( Client 和 Server 顶层消息发送和处理的模板)。
RMClient 类关系图如下图所示: TMClient 和 RMClient 又会根据自身的 poolConfig 配置与 NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> 进行 channel 连接的交互,channel 连接池根据角色 key+ip 作为连接池的 key 来定位各个连接池 ,连接池对 channel 进行统一的管理。TMClient 和 RMClient 在发送过程中对于每个 ip 只会使用一个长连接,但连接不可用时,会从连接池中快速取出已经创建好并可用的连接,减少服务的不可用时间。

2.7 HA-Cluster

目前官方没有公布 HA-Cluster,但是通过一些其它中间件和官方的一些透露,可以将 HA-Cluster 用如下方式设计:

具体的流程如下:

step1:客户端发布信息的时候根据 transcationId 保证同一个 transcation 是在同一个 master 上,通过多个 Master 水平扩展,提供并发处理性能。

step2:在 server 端中一个 master 有多个 slave,master 中的数据近实时同步到 slave 上,保证当 master 宕机的时候,还能有其它 slave 顶上来可以用。

当然上述一切都是猜测,具体的设计实现还得等 0.5 版本之后。目前有一个 Go 版本的 Seata-Server 也捐赠给了 Seata(还在流程中),其通过 raft 实现副本一致性,其它细节不是太清楚。

2.8 Metrics

这个模块也是一个没有具体公布实现的模块,当然有可能会提供插件口,让其它第三方 metric 接入进来,最近 Apache SkyWalking 正在和 Seata 小组商讨如何接入进来。

3.Coordinator Core

上面我们讲了很多 Server 基础模块,想必大家对 Seata 的实现已经有个大概,接下来我会讲解事务协调器具体逻辑是如何实现的,让大家更加了解 Seata 的实现内幕。

3.1 启动流程

启动方法在 Server 类有个 main 方法,定义了我们启动流程:

step1:创建一个 RpcServer,再这个里面包含了我们网络的操作,用 Netty 实现了服务端。

step2:解析端口号、本地文件地址(用户 Server 宕机未处理完成事务恢复)、IP(可选,本机只能获取内网 ip,在跨网络时需要一个对外的 vip 注册服务)。

step3:初始化 SessionHoler,其中最重要的重要就是重我们 dataDir 这个文件夹中恢复我们的数据,重建我们的 Session。

step4:创建一个 CoorDinator,这个也是我们事务协调器的逻辑核心代码,然后将其初始化,其内部初始化的逻辑会创建四个定时任务:

  • retryRollbacking:重试 rollback 定时任务,用于将那些失败的 rollback 进行重试的,每隔 5ms 执行一次。
  • retryCommitting:重试 commit 定时任务,用于将那些失败的 commit 进行重试的,每隔 5ms 执行一次。
  • asyncCommitting:异步 commit 定时任务,用于执行异步的 commit,每隔 10ms 一次。
  • timeoutCheck:超时定时任务检测,用于检测超时的任务,然后执行超时的逻辑,每隔 2ms 执行一次。

step5: 初始化 UUIDGenerator 这个也是我们生成各种 ID(transcationId,branchId)的基本类。

step6:将本地 IP 和监听端口设置到 XID 中,初始化 rpcServer 等待客户端的连接。

启动流程比较简单,下面我会介绍分布式事务框架中的常见的一些业务逻辑 Seata 是如何处理的。

3.2 Begin-开启全局事务

一次分布式事务的起始点一定是开启全局事务,首先我们看看全局事务 Seata 是如何实现的:

step1: 根据应用 ID,事务分组,名字,超时时间创建一个 GloabSession,这个在前面也提到过它和 branchSession 分别是什么。

step2:对其添加一个 RootSessionManager 用于监听一些事件,这里要说一下目前在 Seata 里面有四种类型的 Listener(这里要说明的是所有的 sessionManager 都实现了 SessionLifecycleListener):

  • ROOT_SESSION_MANAGER:最全,最大的,拥有所有的 Session。
  • ASYNC_COMMITTING_SESSION_MANAGER:用于管理需要做异步 commit 的 Session。
  • RETRY_COMMITTING_SESSION_MANAGER:用于管理重试 commit 的 Session。
  • RETRY_ROLLBACKING_SESSION_MANAGER:用于管理重试回滚的 Session。 由于这里是开启事务,其它 SessionManager 不需要关注,我们只添加 RootSessionManager 即可。

step3:开启 Globalsession

这一步会把状态变为 Begin,记录开始时间,并且调用 RootSessionManager 的 onBegin 监听方法,将 Session 保存到 map 并写入到我们的文件。

step4:最后返回 XID,这个 XID 是由 ip+port+transactionId 组成的,非常重要,当 TM 申请到之后需要将这个 ID 传到 RM 中,RM 通过 XID 来决定到底应该访问哪一台 Server。

3.3 BranchRegister-分支事务注册

当我们全局事务在 TM 开启之后,我们 RM 的分支事务也需要注册到我们的全局事务之上,这里看看是如何处理的:

step1:通过 transactionId 获取并校验全局事务是否是开启状态。

step2:创建一个新的分支事务,也就是我们的 BranchSession。

step3:对分支事务进行加全局锁,这里的逻辑就是使用的我们锁模块的逻辑。

step4:添加 branchSession,主要是将其添加到 globalSession 对象中,并写入到我们的文件中。

step5:返回 branchId,这个 ID 也很重要,我们后续需要用它来回滚我们的事务,或者对我们分支事务状态更新。

分支事务注册之后,还需要汇报分支事务的本地事务的执行到底是成功还是失败,在 Server 目前只是简单的做一下保存记录,汇报的目的是,就算这个分支事务失败,如果 TM 还是执意要提交全局事务(catch 异常不抛出),那么再遍历提交分支事务的时候,这个失败的分支事务就不需要提交(用户选择性跳过)。

3.4 GlobalCommit - 全局提交

当我们分支事务执行完成之后,就轮到我们的 TM-事务管理器来决定是提交还是回滚,如果是提交,那么就会走到下面的逻辑:

step1:首先找到我们的 globalSession。如果它为 null 证明已经被 commit 过了,那么直接幂等操作,返回成功。

step2:关闭我们的 GloabSession 防止再次有新的 branch 进来(跨服务调用超时回滚,provider 在继续执行)。

step3:如果 status 是等于 Begin,那么久证明还没有提交过,改变其状态为 Committing 也就是正在提交。

step4:判断是否是可以异步提交,目前只有 AT 模式可以异步提交,二阶段全局提交时只是删除 undolog 并无严格顺序,此处使用定时任务,客户端收到后批量合并删除。

step5:如果是异步提交,直接将其放进我们 ASYNC_COMMITTING_SESSION_MANAGER,让其再后台线程异步去做我们的 step6,如果是同步的那么直接执行我们的 step6。

step6:遍历我们的 BranchSession 进行提交,如果某个分支事务失败,根据不同的条件来判断是否进行重试,可异步执行此 branchSession 不成功可以继续执行下一个,因为其本身都在 manager 中,只要没有成功就不会被删除会一直重试,如果是同步提交的会放进重试队列进行定时重试并卡住按照顺序提交。

3.5 GlobalRollback - 全局回滚

如果我们的 TM 决定全局回滚,那么会走到下面的逻辑:

这个逻辑和提交流程基本一致,可以看作是它的反向,这里就不展开讲了。

4.总结

最后在总结一下开始我们提出了分布式事务的关键 4 点,Seata 到底是怎么解决的:

  • 正确的协调:通过后台定时任务各种正确的重试,并且未来会推出监控平台有可能可以手动回滚。
  • 高可用: 通过 HA-Cluster 保证高可用。
  • 高性能:文件顺序写,RPC 通过 netty 实现,Seata 未来可以水平扩展,提高处理性能。
  • 高扩展性:提供给用户可以自由实现的地方,比如配置,服务发现和注册,全局锁等等。

最后希望大家能从这篇文章能了解 Seata-Server 的核心设计原理,当然你也可以想象如果你自己去实现一个分布式事务的 Server 应该怎样去设计?

Seata GitHub 地址:https://github.com/apache/incubator-seata

本文作者:

李钊,GitHub ID @CoffeeLatte007,公众号「咖啡拿铁」作者,Seata 社区 Committer,猿辅导 Java 工程师,曾就职于美团。对分布式中间件,分布式系统有浓厚的兴趣。
季敏(清铭),GitHub ID @slievrly,Seata 开源项目负责人,阿里巴巴中间件 TXC/GTS 核心研发成员,长期从事于分布式中间件核心研发工作,在分布式事务领域有着较丰富的技术积累。

· 阅读需 17 分钟

Fescar 0.4.0 版本发布了 TCC 模式,由蚂蚁金服团队贡献,欢迎大家试用,文末也提供了项目后续的 Roadmap,欢迎关注。

前言:基于 TCC 模型的应用场景

 
1.png

TCC 分布式事务模型直接作用于服务层。不与具体的服务框架耦合,与底层 RPC 协议无关,与底层存储介质无关,可以灵活选择业务资源的锁定粒度,减少资源锁持有时间,可扩展性好,可以说是为独立部署的 SOA 服务而设计的。

一、TCC 模型优势

对于 TCC 分布式事务模型,笔者认为其在业务场景应用上,有两方面的意义。

1.1 跨服务的分布式事务

服务的拆分,也可以认为是资源的横向扩展,只不过方向不同而已。

横向扩展可能沿着两个方向发展:

  1. 功能扩展,根据功能对数据进行分组,并将不同的功能组分布在多个不同的数据库上,这实际上就是 SOA 架构下的服务化。
  2. 数据分片,在功能组内部将数据拆分到多个数据库上,为横向扩展增加一个新的维度。

下图简要阐释了横向数据扩展策略:

2.png

因此,TCC 的其中一个作用就是在按照功能横向扩展资源时,保证多资源访问的事务属性。

1.2 两阶段拆分

TCC 另一个作用就是把两阶段拆分成了两个独立的阶段,通过资源业务锁定的方式进行关联。资源业务锁定方式的好处在于,既不会阻塞其他事务在第一阶段对于相同资源的继续使用,也不会影响本事务第二阶段的正确执行。

传统模型的并发事务:
3.png

TCC 模型的并发事务:
4.png

这对业务有什么好处呢?拿支付宝的担保交易场景来说,简化情况下,只需要涉及两个服务,交易服务和账务服务。交易作为主业务服务,账务作为从业务服务,提供 Try、Commit、Cancel 接口:

  1. Try 接口扣除用户可用资金,转移到预冻结资金。预冻结资金就是业务锁定方案,每个事务第二阶段只能使用本事务的预冻结资金,在第一阶段执行结束后,其他并发事务也可以继续处理用户的可用资金。
  2. Commit 接口扣除预冻结资金,增加中间账户可用资金(担保交易不能立即把钱打给商户,需要有一个中间账户来暂存)。

假设只有一个中间账户的情况下,每次调用支付服务的 Commit 接口,都会锁定中间账户,中间账户存在热点性能问题。 但是,在担保交易场景中,七天以后才需要将资金从中间账户划拨给商户,中间账户并不需要对外展示。因此,在执行完支付服务的第一阶段后,就可以认为本次交易的支付环节已经完成,并向用户和商户返回支付成功的结果,并不需要马上执行支付服务二阶段的 Commit 接口,等到低锋期时,再慢慢消化,异步地执行。
5.png

这就是 TCC 分布式事务模型的二阶段异步化功能,从业务服务的第一阶段执行成功,主业务服务就可以提交完成,然后再由框架异步的执行各从业务服务的第二阶段。

二、通用型 TCC 解决方案

通用型 TCC 解决方案就是最典型的 TCC 分布式事务模型实现,所有从业务服务都需要参与到主业务服务的决策当中。
6.png
 

适用场景

由于从业务服务是同步调用,其结果会影响到主业务服务的决策,因此通用型 TCC 分布式事务解决方案适用于执行时间确定且较短的业务,比如互联网金融企业最核心的三个服务:交易、支付、账务:
7.png
 
当用户发起一笔交易时,首先访问交易服务,创建交易订单;然后交易服务调用支付服务为该交易创建支付订单,执行收款动作,最后支付服务调用账务服务记录账户流水和记账。

为了保证三个服务一起完成一笔交易,要么同时成功,要么同时失败,可以使用通用型 TCC 解决方案,将这三个服务放在一个分布式事务中,交易作为主业务服务,支付作为从业务服务,账务作为支付服务的嵌套从业务服务,由 TCC 模型保证事务的原子性。
8.png

支付服务的 Try 接口创建支付订单,开启嵌套分布式事务,并调用账务服务的 Try 接口;账务服务在 Try 接口中冻结买家资金。一阶段调用完成后,交易完成,提交本地事务,由 TCC 框架完成分布式事务各从业务服务二阶段的调用。

支付服务二阶段先调用账务服务的 Confirm 接口,扣除买家冻结资金;增加卖家可用资金。调用成功后,支付服务修改支付订单为完成状态,完成支付。

当支付和账务服务二阶段都调用完成后,整个分布式事务结束。

三、异步确保型 TCC 解决方案

异步确保型 TCC 解决方案的直接从业务服务是可靠消息服务,而真正的从业务服务则通过消息服务解耦,作为消息服务的消费端,异步地执行。
9.png
 
可靠消息服务需要提供 Try,Confirm,Cancel 三个接口。Try 接口预发送,只负责持久化存储消息数据;Confirm 接口确认发送,这时才开始真正的投递消息;Cancel 接口取消发送,删除消息数据。

消息服务的消息数据独立存储,独立伸缩,降低从业务服务与消息系统间的耦合,在消息服务可靠的前提下,实现分布式事务的最终一致性。

此解决方案虽然增加了消息服务的维护成本,但由于消息服务代替从业务服务实现了 TCC 接口,从业务服务不需要任何改造,接入成本非常低。

适用场景

由于从业务服务消费消息是一个异步的过程,执行时间不确定,可能会导致不一致时间窗口增加。因此,异步确保性 TCC 分布式事务解决方案只适用于对最终一致性时间敏感度较低的一些被动型业务(从业务服务的处理结果不影响主业务服务的决策,只被动的接收主业务服务的决策结果)。比如会员注册服务和邮件发送服务:
10.png
 
当用户注册会员成功,需要给用户发送一封邮件,告诉用户注册成功,并提示用户激活该会员。但要注意两点:

  1. 如果用户注册成功,一定要给用户发送一封邮件;
  2. 如果用户注册失败,一定不能给用户发送邮件。

因此,这同样需要会员服务和邮件服务保证原子性,要么都执行,要么都不执行。不一样的是,邮件服务只是一种被动型的业务,并不影响用户是否能够注册成功,它只需要在用户注册成功以后发送邮件给用户即可,邮件服务不需要参与到会员服务的活动决策中。

对于此种业务场景,可以使用异步确保型TCC分布式事务解决方案,如下:
11.png
 
 
由可靠消息服务来解耦会员和邮件服务,会员服务与消息服务组成 TCC 事务模型,保证事务原子性。然后通过消息服务的可靠特性,确保消息一定能够被邮件服务消费,从而使得会员与邮件服务在同一个分布式事务中。同时,邮件服务也不会影响会员服务的执行过程,只在会员服务执行成功后被动接收发送邮件的请求。

四、补偿型 TCC 解决方案

补偿型 TCC 解决方案与通用型 TCC 解决方案的结构相似,其从业务服务也需要参与到主业务服务的活动决策当中。但不一样的是,前者的从业务服务只需要提供 Do 和 Compensate 两个接口,而后者需要提供三个接口。
12.png
 
Do 接口直接执行真正的完整业务逻辑,完成业务处理,业务执行结果外部可见;Compensate 操作用于业务补偿,抵消或部分抵消正向业务操作的业务结果,Compensate操作需满足幂等性。
与通用型解决方案相比,补偿型解决方案的从业务服务不需要改造原有业务逻辑,只需要额外增加一个补偿回滚逻辑即可,业务改造量较小。但要注意的是,业务在一阶段就执行完整个业务逻辑,无法做到有效的事务隔离,当需要回滚时,可能存在补偿失败的情况,还需要额外的异常处理机制,比如人工介入。

适用场景

由于存在回滚补偿失败的情况,补偿型 TCC 分布式事务解决方案只适用于一些并发冲突较少或者需要与外部交互的业务,这些外部业务不属于被动型业务,其执行结果会影响主业务服务的决策,比如机票代理商的机票预订服务:
13.png
 
该机票服务提供多程机票预订服务,可以同时预订多趟行程航班机票,比如从北京到圣彼得堡,需要第一程从北京到莫斯科,以及第二程从莫斯科到圣彼得堡。

当用户预订机票时,肯定希望能同时预订这两趟航班的机票,只预订一趟航班对用户来说没有意义。因此,对于这样的业务服务同样提出了原子性要求,如果其中一趟航班的机票预订失败,另外一趟需要能够取消预订。

但是,由于航空公司相对于机票代理商来说属于外部业务,只提供订票接口和取消预订接口,想要推动航空公司改造是极其困难的。因此,对于此类业务服务,可以使用补偿型 TCC 分布式事务解决方案,如下:
14.png

网关服务在原有逻辑基础上增加 Compensate 接口,负责调用对应航空公司的取消预订接口。

在用户发起机票预订请求时,机票服务先通过网关 Do 接口,调用各航空公司的预订接口,如果所有航班都预订成功,则整个分布式事务直接执行成功;一旦某趟航班机票预订失败,则分布式事务回滚,由 TCC 事务框架调用各网关的 Compensate 补偿接口,其再调用对应航空公司的取消预订接口。通过这种方式,也可以保证多程机票预订服务的原子性。

五. 总结

对于现在的互联网应用来说,资源横向扩展提供了更多的灵活性,是一种比较容易实现的向外扩展方案,但是同时也明显增加了复杂度,引入一些新的挑战,比如资源之间的数据一致性问题。

横向数据扩展既可以按数据分片扩展,也可以按功能扩展。TCC 模型能在功能横向扩展资源的同时,保证多资源访问的事务属性。

TCC 模型除了跨服务的分布式事务这一层作用之外,还具有两阶段划分的功能,通过业务资源锁定,允许第二阶段的异步执行,而异步化思想正是解决热点数据并发性能问题的利器之一。
 

Roadmap

当前已经发布到 0.4.0,后续我们会发布 0.5 ~ 1.0 版本,继续对 AT、TCC 模式进行功能完善和和丰富,并解决服务端高可用问题,在 1.0 版本之后,本开源产品将达到生产环境使用的标准。


图片1.png

· 阅读需 9 分钟

Fescar 0.4.0 版本发布了 TCC 模式,由蚂蚁金服团队贡献,欢迎大家试用,
Sample 地址:https://github.com/fescar-group/fescar-samples/tree/master/tcc
文末也提供了项目后续的 Roadmap,欢迎关注。

一、TCC 简介

在两阶段提交协议(2PC,Two Phase Commitment Protocol)中,资源管理器(RM, resource manager)需要提供“准备”、“提交”和“回滚” 3 个操作;而事务管理器(TM, transaction manager)分 2 阶段协调所有资源管理器,在第一阶段询问所有资源管理器“准备”是否成功,如果所有资源均“准备”成功则在第二阶段执行所有资源的“提交”操作,否则在第二阶段执行所有资源的“回滚”操作,保证所有资源的最终状态是一致的,要么全部提交要么全部回滚。

资源管理器有很多实现方式,其中 TCC(Try-Confirm-Cancel)是资源管理器的一种服务化的实现;TCC 是一种比较成熟的分布式事务解决方案,可用于解决跨数据库、跨服务业务操作的数据一致性问题;TCC 其 Try、Confirm、Cancel 3 个方法均由业务编码实现,故 TCC 可以被称为是服务化的资源管理器。

TCC 的 Try 操作作为一阶段,负责资源的检查和预留;Confirm 操作作为二阶段提交操作,执行真正的业务;Cancel 是二阶段回滚操作,执行预留资源的取消,使资源回到初始状态。

如下图所示,用户实现 TCC 服务之后,该 TCC 服务将作为分布式事务的其中一个资源,参与到整个分布式事务中;事务管理器分 2 阶段协调 TCC 服务,在第一阶段调用所有 TCC 服务的 Try 方法,在第二阶段执行所有 TCC 服务的 Confirm 或者 Cancel 方法;最终所有 TCC 服务要么全部都是提交的,要么全部都是回滚的。

image.png

二、TCC 设计

用户在接入 TCC 时,大部分工作都集中在如何实现 TCC 服务上,经过蚂蚁金服多年的 TCC 应用,总结如下主要的TCC 设计和实现主要事项:

1、业务操作分两阶段完成

接入 TCC 前,业务操作只需要一步就能完成,但是在接入 TCC 之后,需要考虑如何将其分成 2 阶段完成,把资源的检查和预留放在一阶段的 Try 操作中进行,把真正的业务操作的执行放在二阶段的 Confirm 操作中进行。

以下举例说明业务模式如何分成两阶段进行设计,举例场景:“账户A的余额中有 100 元,需要扣除其中 30 元”;

在接入 TCC 之前,用户编写 SQL:“update 账户表 set 余额 = 余额 - 30 where 账户 = A”,便能一步完成扣款操作。

在接入 TCC 之后,就需要考虑如何将扣款操作分成 2 步完成:

  • Try 操作:资源的检查和预留;

在扣款场景,Try 操作要做的事情就是先检查 A 账户余额是否足够,再冻结要扣款的 30 元(预留资源);此阶段不会发生真正的扣款。

  • Confirm 操作:执行真正业务的提交;

在扣款场景下,Confirm 阶段走的事情就是发生真正的扣款,把A账户中已经冻结的 30 元钱扣掉。

  • Cancel 操作:预留资源的是否释放;

在扣款场景下,扣款取消,Cancel 操作执行的任务是释放 Try 操作冻结的 30 元钱,是 A 账户回到初始状态。

image.png

2、并发控制

用户在实现 TCC 时,应当考虑并发性问题,将锁的粒度降到最低,以最大限度的提高分布式事务的并发性。

以下还是以A账户扣款为例,“账户 A 上有 100 元,事务 T1 要扣除其中的 30 元,事务 T2 也要扣除 30 元,出现并发”。

在一阶段 Try 操作中,分布式事务 T1 和分布式事务 T2 分别冻结资金的那一部分资金,相互之间无干扰;这样在分布式事务的二阶段,无论 T1 是提交还是回滚,都不会对 T2 产生影响,这样 T1 和 T2 在同一笔业务数据上并行执行。

image.png

3、允许空回滚

如下图所示,事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因为丢包而导致的网络超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,而 Cancel 操作调用未出现超时。

TCC 服务在未收到 Try 请求的情况下收到 Cancel 请求,这种场景被称为空回滚;空回滚在生产环境经常出现,用户在实现TCC服务时,应允许允许空回滚的执行,即收到空回滚时返回成功。

image.png

4、防悬挂控制

如下图所示,事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。

用户在实现  TCC 服务时,要允许空回滚,但是要拒绝执行空回滚之后 Try 请求,要避免出现悬挂。

image.png

5、幂等控制

无论是网络数据包重传,还是异常事务的补偿执行,都会导致 TCC 服务的 Try、Confirm 或者 Cancel 操作被重复执行;用户在实现 TCC 服务时,需要考虑幂等控制,即 Try、Confirm、Cancel 执行一次和执行多次的业务结果是一样的。
image.png

Roadmap

当前已经发布到 0.4.0 版本,后续我们会发布 0.5 ~ 1.0 版本,继续对 AT、TCC 模式进行功能完善和和丰富,并解决服务端高可用问题,在 1.0 版本之后,本开源产品将达到生产环境使用的标准。

图片1.png

· 阅读需 4 分钟

案例

用户采购商品业务,整个业务包含3个微服务:

  • 库存服务: 扣减给定商品的库存数量。
  • 订单服务: 根据采购请求生成订单。
  • 账户服务: 用户账户金额扣减。

业务结构图

Architecture

StorageService

public interface StorageService {

/**
* deduct storage count
*/
void deduct(String commodityCode, int count);
}

OrderService

public interface OrderService {

/**
* create order
*/
Order create(String userId, String commodityCode, int orderCount);
}

AccountService

public interface AccountService {

/**
* debit balance of user's account
*/
void debit(String userId, int money);
}

主要的业务逻辑:

public class BusinessServiceImpl implements BusinessService {

private StorageService storageService;

private OrderService orderService;

/**
* purchase
*/
public void purchase(String userId, String commodityCode, int orderCount) {

storageService.deduct(commodityCode, orderCount);

orderService.create(userId, commodityCode, orderCount);
}
}
public class StorageServiceImpl implements StorageService {

private StorageDAO storageDAO;

@Override
public void deduct(String commodityCode, int count) {
Storage storage = new Storage();
storage.setCount(count);
storage.setCommodityCode(commodityCode);
storageDAO.update(storage);
}
}
public class OrderServiceImpl implements OrderService {

private OrderDAO orderDAO;

private AccountService accountService;

public Order create(String userId, String commodityCode, int orderCount) {

int orderMoney = calculate(commodityCode, orderCount);

accountService.debit(userId, orderMoney);

Order order = new Order();
order.userId = userId;
order.commodityCode = commodityCode;
order.count = orderCount;
order.money = orderMoney;

return orderDAO.insert(order);
}
}

Seata 分布式事务解决方案

undefined

此处仅仅需要一行注解 @GlobalTransactional 写在业务发起方的方法上:


@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
......
}

Dubbo 与 Seata 结合的例子

Step 1: 安装数据库

  • 要求: MySQL (InnoDB 存储引擎)。

提示: 事实上例子中3个微服务需要3个独立的数据库,但为了方便我们使用同一物理库并配置3个逻辑连接串。

更改以下xml文件中的数据库url、username和password

dubbo-account-service.xml dubbo-order-service.xml dubbo-storage-service.xml

    <property name="url" value="jdbc:mysql://x.x.x.x:3306/xxx" />
<property name="username" value="xxx" />
<property name="password" value="xxx" />

Step 2: 为 Seata 创建 UNDO_LOG 表

UNDO_LOG 此表用于 Seata 的AT模式。

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_unionkey` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=159 DEFAULT CHARSET=utf8

Step 3: 创建相关业务表


DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step 4: 启动 Seata-Server 服务

  • 下载Server package, 并解压。
  • 运行bin目录下的启动脚本。
sh seata-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA

e.g.

sh seata-server.sh 8091 /home/admin/seata/data/

Step 5: 运行例子

相关项目

· 阅读需 23 分钟

fescar发布已有时日,分布式事务一直是业界备受关注的领域,fescar发布一个月左右便受到了近5000个star足以说明其热度。当然,在fescar出来之前, 已经有比较成熟的分布式事务的解决方案开源了,比较典型的方案如 LCN 的2pc型无侵入事务, 目前lcn已发展到5.0,已支持和fescar事务模型类似的TCX型事务。还有如TCC型事务实现 hmily tcc-transaction 等。 在微服务架构流行的当下、阿里这种开源大户背景下,fescar的发布无疑又掀起了研究分布式事务的热潮。fescar脱胎于阿里云商业分布式事务服务GTS,在线上环境提供这种公共服务其模式肯定经受了非常严苛的考验。其分布式事务模型TXC又仿于传统事务模型XA方案,主要区别在于资源管理器的定位一个在应用层一个在数据库层。博主觉得fescar的txc模型实现非常有研究的价值,所以今天我们来好好翻一翻fescar项目的代码。本文篇幅较长,浏览并理解本文大概耗时30~60分钟左右。

项目地址

fescar:https://github.com/alibaba/fescar

本博文所述代码为fescar的0.1.2-SNAPSHOT版本,根据fescar后期的迭代计划,其项目结构和模块实现都可能有很大的改变,特此说明。

fescar的TXC模型

上图为fescar官方针对TXC模型制作的示意图。不得不说大厂的图制作的真的不错,结合示意图我们可以看到TXC实现的全貌。TXC的实现通过三个组件来完成。也就是上图的三个深黄色部分,其作用如下:

  1. TM:全局事务管理器,在标注开启fescar分布式事务的服务端开启,并将全局事务发送到TC事务控制端管理
  2. TC:事务控制中心,控制全局事务的提交或者回滚。这个组件需要独立部署维护,目前只支持单机版本,后续迭代计划会有集群版本
  3. RM:资源管理器,主要负责分支事务的上报,本地事务的管理

一段话简述其实现过程:服务起始方发起全局事务并注册到TC。在调用协同服务时,协同服务的事务分支事务会先完成阶段一的事务提交或回滚,并生成事务回滚的undo_log日志,同时注册当前协同服务到TC并上报其事务状态,归并到同一个业务的全局事务中。此时若没有问题继续下一个协同服务的调用,期间任何协同服务的分支事务回滚,都会通知到TC,TC在通知全局事务包含的所有已完成一阶段提交的分支事务回滚。如果所有分支事务都正常,最后回到全局事务发起方时,也会通知到TC,TC在通知全局事务包含的所有分支删除回滚日志。在这个过程中为了解决写隔离和度隔离的问题会涉及到TC管理的全局锁。

本博文的目标是深入代码细节,探究其基本思路是如何实现的。首先会从项目的结构来简述每个模块的作用,继而结合官方自带的examples实例来探究整个分布式事务的实现过程。

项目结构解析

项目拉下来,用IDE打开后的目录结构如下,下面先大致的看下每个模块的实现

  • common :公共组件,提供常用辅助类,静态变量、扩展机制类加载器、以及定义全局的异常等
  • config : 配置加载解析模块,提供了配置的基础接口,目前只有文件配置实现,后续会有nacos等配置中心的实现
  • core : 核心模块主要封装了TM、RM和TC通讯用RPC相关内容
  • dubbo :dubbo模块主要适配dubbo通讯框架,使用dubbo的filter机制来传统全局事务的信息到分支
  • examples :简单的演示实例模块,等下从这个模块入手探索
  • rm-datasource :资源管理模块,比较核心的一个模块,个人认为这个模块命名为core要更合理一点。代理了JDBC的一些类,用来解析sql生成回滚日志、协调管理本地事务
  • server : TC组件所在,主要协调管理全局事务,负责全局事务的提交或者回滚,同时管理维护全局锁。
  • spring :和spring集成的模块,主要是aop逻辑,是整个分布式事务的入口,研究fescar的突破口
  • tm : 全局事务事务管理模块,管理全局事务的边界,全局事务开启回滚点都在这个模块控制

通过【examples】模块的实例看下效果

第一步、先启动TC也就是【Server】模块,main方法直接启动就好,默认服务端口8091

第二步、回到examples模块,将订单,业务,账户、仓库四个服务的配置文件配置好,主要是mysql数据源和zookeeper连接地址,这里要注意下,默认dubbo的zk注册中心依赖没有,启动的时候回抛找不到class的异常,需要添加如下的依赖:

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

第三步、在BusinessServiceImpl中的模拟抛异常的地方打个断点,依次启动OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四个服务、等进断点后,查看数据库account_tbl表,金额已减去400元,变成了599元。然后放开断点、BusinessServiceImpl模块模拟的异常触发,全局事务回滚,account_tbl表的金额就又回滚到999元了

如上,我们已经体验到fescar事务的控制能力了,下面我们具体看下它是怎么控制的。

fescar事务过程分析

首先分析配置文件

这个是一个铁律,任何一个技术或框架要集成,配置文件肯定是一个突破口。从上面的例子我们了解到,实例模块的配置文件中配置了一个全局事务扫描器实例,如:

<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-app"/>
<constructor-arg value="my\_test\_tx_group"/>
</bean>

这个实例在项目启动时会扫描所有实例,具体实现见【spring】模块。并将标注了@GlobalTransactional注解的方法织入GlobalTransactionalInterceptor的invoke方法逻辑。同时应用启动时,会初始化TM(TmRpcClient)和RM(RmRpcClient)的实例,这个时候,服务已经和TC事务控制中心勾搭上了。在往下看就涉及到TM模块的事务模板类TransactionalTemplate。

【TM】模块启动全局事务

全局事务的开启,提交、回滚都被封装在TransactionalTemplate中完成了,代码如:


public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. begin transaction
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
tx.rollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
tx.commit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
}

更详细的实现在【TM】模块中被分成了两个Class实现,如下:

DefaultGlobalTransaction :全局事务具体的开启,提交、回滚动作

DefaultTransactionManager :负责使用TmRpcClient向TC控制中心发送指令,如开启全局事务(GlobalBeginRequest)、提交(GlobalCommitRequest)、回滚(GlobalRollbackRequest)、查询状态(GlobalStatusRequest)等。

以上是TM模块核心内容点,TM模块完成全局事务开启后,接下来就开始看看全局事务iD,xid是如何传递、RM组件是如何介入的

【dubbo】全局事务xid的传递

首先是xid的传递,目前已经实现了dubbo框架实现的微服务架构下的传递,其他的像spring cloud和motan等的想要实现也很容易,通过一般RPC通讯框架都有的filter机制,将xid从全局事务的发起节点传递到服务协从节点,从节点接收到后绑定到当前线程上线文环境中,用于在分支事务执行sql时判断是否加入全局事务。fescar的实现见【dubbo】模块如下:

@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext\[" + xid + "\] xid in RpcContext\[" + rpcXid + "\]");
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind\[" + rpcXid + "\] to RootContext");
}
}
}
try {
return invoker.invoke(invocation);

} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind\[" + unbindXid + "\] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind \[" + unbindXid + "\] back to RootContext");
}
}
}
}
}
}

上面代码rpcXid不为空时,就加入到了RootContext的ContextCore中,这里稍微深入讲下。ContextCore是一个可扩展实现的接口,目前默认的实现是ThreadLocalContextCore,基于ThreadLocal来保存维护当前的xid。这里fescar提供了可扩展的机制,实现在【common】模块中,通过一个自定义的类加载器EnhancedServiceLoader加载需要扩展的服务类,这样只需要在扩展类加上@LoadLevel注解。标记order属性声明高优先级别,就可以达到扩展实现的目的。

【RM】模块本地资源管理的介入

fescar针对本地事务相关的接口,通过代理机制都实现了一遍代理类,如数据源(DataSourceProxy)、ConnectionProxy、StatementProxy等。这个在配置文件中也可以看出来,也就是说,我们要使用fescar分布式事务,一定要配置fescar提供的代理数据源。如:

配置好代理数据源后,从DataSourceProxy出发,本地针对数据库的所有操作过程我们就可以随意控制了。从上面xid传递,已经知道了xid被保存在RootContext中了,那么请看下面的代码,就非常清楚了:

首先看StatementProxy的一段代码

在看ExecuteTemplate中的代码

和【TM】模块中的事务管理模板类TransactionlTemplate类似,这里非常关键的逻辑代理也被封装在了ExecuteTemplate模板类中。因重写了Statement有了StatementProxy实现,在执行原JDBC的executeUpdate方法时,会调用到ExecuteTemplate的execute逻辑。在sql真正执行前,会判断RootCOntext当前上下文中是否包含xid,也就是判断当前是否是全局分布式事务。如果不是,就直接使用本地事务,如果是,这里RM就会增加一些分布式事务相关的逻辑了。这里根据sql的不同的类型,fescar封装了五个不同的执行器来处理,分别是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,结构如下图:

PlainExecutor:

原生的JDBC接口实现,未做任何处理,提供给全局事务中的普通的select查询使用

UpdateExecutor、DeleteExecutor、InsertExecutor:

三个DML增删改执行器实现,主要在sql执行的前后对sql语句进行了解析,实现了如下两个抽象接口方法:

protected abstract TableRecords beforeImage() throws SQLException;

protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

在这个过程中通过解析sql生成了提供回滚操作的undo_log日志,日志目前是保存在msyql中的,和业务sql操作共用同一个事务。表的结构如下:

rollback_info保存的undo_log详细信息,是longblob类型的,结构如下:

{
    "branchId":3958194,
    "sqlUndoLogs":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PrimaryKey",
                                "name":"ID",
                                "type":4,
                                "value":10
                            },
                            {
                                "keyType":"NULL",
                                "name":"COUNT",
                                "type":4,
                                "value":98
                            }
                        ]
                    }
                ],
                "tableName":"storage_tbl"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PrimaryKey",
                                "name":"ID",
                                "type":4,
                                "value":10
                            },
                            {
                                "keyType":"NULL",
                                "name":"COUNT",
                                "type":4,
                                "value":100
                            }
                        ]
                    }
                ],
                "tableName":"storage_tbl"
            },
            "sqlType":"UPDATE",
            "tableName":"storage_tbl"
        }
    ],
    "xid":"192.168.7.77:8091:3958193"
}


这里贴的是一个update的操作,undo_log记录的非常的详细,通过全局事务xid关联branchid,记录数据操作的表名,操作字段名,以及sql执行前后的记录数,如这个记录,表名=storage_tbl,sql执行前ID=10,count=100,sql执行后id=10,count=98。如果整个全局事务失败,需要回滚的时候就可以生成:

update storage_tbl set count = 100 where id = 10;

这样的回滚sql语句执行了。

SelectForUpdateExecutor:

fescar的AT模式在本地事务之上默认支持读未提交的隔离级别,但是通过SelectForUpdateExecutor执行器,可以支持读已提交的隔离级别。代码如:

@Override
public Object doExecute(Object... args) throws Throwable {
SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;

Connection conn = statementProxy.getConnection();
ResultSet rs = null;
Savepoint sp = null;
LockRetryController lockRetryController = new LockRetryController();
boolean originalAutoCommit = conn.getAutoCommit();

StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
selectSQLAppender.append(getTableMeta().getPkName());
selectSQLAppender.append(" FROM " + getTableMeta().getTableName());
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
if (!StringUtils.isEmpty(whereCondition)) {
selectSQLAppender.append(" WHERE " + whereCondition);
}
selectSQLAppender.append(" FOR UPDATE");
String selectPKSQL = selectSQLAppender.toString();

try {
if (originalAutoCommit) {
conn.setAutoCommit(false);
}
sp = conn.setSavepoint();
rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

while (true) {
// Try to get global lock of those rows selected
Statement stPK = null;
PreparedStatement pstPK = null;
ResultSet rsPK = null;
try {
if (paramAppender.isEmpty()) {
stPK = statementProxy.getConnection().createStatement();
rsPK = stPK.executeQuery(selectPKSQL);
} else {
pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
for (int i = 0; i < paramAppender.size(); i++) {
pstPK.setObject(i + 1, paramAppender.get(i));
}
rsPK = pstPK.executeQuery();
}

TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);
break;

} catch (LockConflictException lce) {
conn.rollback(sp);
lockRetryController.sleep(lce);

} finally {
if (rsPK != null) {
rsPK.close();
}
if (stPK != null) {
stPK.close();
}
if (pstPK != null) {
pstPK.close();
}
}
}

} finally {
if (sp != null) {
conn.releaseSavepoint(sp);
}
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
}
return rs;
}

关键代码见:

TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);

通过selectPKRows表操作记录拿到lockKeys,然后到TC控制器端查询是否被全局锁定了,如果被锁定了,就重新尝试,直到锁释放返回查询结果。

分支事务的注册和上报

在本地事务提交前,fescar会注册和上报分支事务相关的信息,见ConnectionProxy类的commit部分代码:

@Override
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}

try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
report(true);
context.reset();

} else {
targetConnection.commit();
}
}

从这段代码我们可以看到,首先是判断是了是否是全局事务,如果不是,就直接提交了,如果是,就先向TC控制器注册分支事务,为了写隔离,在TC端会涉及到全局锁的获取。然后保存了用于回滚操作的undo_log日志,继而真正提交本地事务,最后向TC控制器上报事务状态。此时,阶段一的本地事务已完成了。

【server】模块协调全局

关于server模块,我们可以聚焦在DefaultCoordinator这个类,这个是AbstractTCInboundHandler控制处理器默认实现。主要实现了全局事务开启,提交,回滚,状态查询,分支事务注册,上报,锁检查等接口,如:

回到一开始的TransactionlTemplate,如果整个分布式事务失败需要回滚了,首先是TM向TC发起回滚的指令,然后TC接收到后,解析请求后会被路由到默认控制器类的doGlobalRollback方法内,最终在TC控制器端执行的代码如下:

@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
continue;
}
try {
BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
branchSession.getResourceId(), branchSession.getApplicationData());

switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
LOGGER.error("Successfully rolled back branch " + branchSession);
continue;
case PhaseTwo\_RollbackFailed\_Unretryable:
GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus.name().startsWith("Timeout")) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeStatus(GlobalStatus.RollbackFailed);
}
globalSession.end();
LOGGER.error("Failed to rollback global\[" + globalSession.getTransactionId() + "\] since branch\[" + branchSession.getBranchId() + "\] rollback failed");
return;
default:
LOGGER.info("Failed to rollback branch " + branchSession);
if (!retrying) {
queueToRetryRollback(globalSession);
}
return;

}
} catch (Exception ex) {
LOGGER.info("Exception rollbacking branch " + branchSession, ex);
if (!retrying) {
queueToRetryRollback(globalSession);
if (ex instanceof TransactionException) {
throw (TransactionException) ex;
} else {
throw new TransactionException(ex);
}
}

}

}
GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus.name().startsWith("Timeout")) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeStatus(GlobalStatus.Rollbacked);
}
globalSession.end();
}

如上代码可以看到,回滚时从全局事务会话中迭代每个分支事务,然后通知每个分支事务回滚。分支服务接收到请求后,首先会被路由到RMHandlerAT中的doBranchRollback方法,继而调用了RM中的branchRollback方法,代码如下:

@Override
public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo\_RollbackFailed\_Unretryable;
} else {
return BranchStatus.PhaseTwo\_RollbackFailed\_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}

RM分支事务端最后执行的是UndoLogManager的undo方法,通过xid和branchid从数据库查询出回滚日志,完成数据回滚操作,整个过程都是同步完成的。如果全局事务是成功的,TC也会有类似的上述协调过程,只不过是异步的将本次全局事务相关的undo_log清除了而已。至此,就完成了2阶段的提交或回滚,也就完成了完整的全局事务事务的控制。

结语

如果你看到这里,那么非常感谢你,在繁忙工作之余耐心的花时间来学习。同时,我相信花的时间没白费,完整的浏览理解估计对fescar实现的大致流程了解的十之八九了。本文从构思立题到完成大概耗时1人天左右,博主在这个过程中,对fescar的实现也有了更加深入的了解。由于篇幅原因,并没有面面俱到的对每个实现的细节去深究,如sql是如何解析的等,更多的是在fescar的TXC模型的实现过程的关键点做了详细阐述。本文已校对,但由于个人知识水平及精力有限,文中不免出现错误或理解不当的地方,欢迎指正。

作者简介:

陈凯玲,2016年5月加入凯京科技。曾任职高级研发和项目经理,现任凯京科技研发中心架构&运维部负责人。pmp项目管理认证,阿里云MVP。热爱开源,先后开源过多个热门项目。热爱分享技术点滴,独立博客KL博客(http://www.kailing.pub)博主。