跳到主要内容

· 阅读需 21 分钟

作者简介:煊檍,GitHub ID:sharajava,阿里巴巴中件间 GTS 研发团队负责人,SEATA 开源项目发起人,曾在 Oracle 北京研发中心多年,从事 WebLogic 核心研发工作。长期专注于中间件,尤其是分布式事务领域的技术实践。

Seata 1.2.0 版本重磅发布新的事务模式:XA 模式,实现对 XA 协议的支持。

这里,我们从三个方面来深入解读这个新的特性:

  • 是什么(What):XA 模式是什么?
  • 为什么(Why):为什么支持 XA?
  • 怎么做(How):XA 模式是如何实现的,以及怎样使用?

1. XA 模式是什么?

这里有两个基本的前置概念:

  1. 什么是 XA?
  2. 什么是 Seata 定义的所谓 事务模式?

基于这两点,再来理解 XA 模式就很自然了。

1.1 什么是 XA?

XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准。

XA 规范 描述了全局的事务管理器与局部的资源管理器之间的接口。 XA规范 的目的是允许的多个资源(如数据库,应用服务器,消息队列等)在同一事务中访问,这样可以使 ACID 属性跨越应用程序而保持有效。

XA 规范 使用两阶段提交(2PC,Two-Phase Commit)来保证所有资源同时提交或回滚任何特定的事务。

XA 规范 在上世纪 90 年代初就被提出。目前,几乎所有主流的数据库都对 XA 规范 提供了支持。

1.2 什么是 Seata 的事务模式?

Seata 定义了全局事务的框架。

全局事务 定义为若干 分支事务 的整体协调:

  1. TM 向 TC 请求发起(Begin)、提交(Commit)、回滚(Rollback)全局事务。
  2. TM 把代表全局事务的 XID 绑定到分支事务上。
  3. RM 向 TC 注册,把分支事务关联到 XID 代表的全局事务中。
  4. RM 把分支事务的执行结果上报给 TC。(可选)
  5. TC 发送分支提交(Branch Commit)或分支回滚(Branch Rollback)命令给 RM。
seata-mod

Seata 的 全局事务 处理过程,分为两个阶段:

  • 执行阶段 :执行 分支事务,并 保证 执行结果满足是 可回滚的(Rollbackable)持久化的(Durable)
  • 完成阶段: 根据 执行阶段 结果形成的决议,应用通过 TM 发出的全局提交或回滚的请求给 TC,TC 命令 RM 驱动 分支事务 进行 Commit 或 Rollback。

Seata 的所谓 事务模式 是指:运行在 Seata 全局事务框架下的 分支事务 的行为模式。准确地讲,应该叫作 分支事务模式。

不同的 事务模式 区别在于 分支事务 使用不同的方式达到全局事务两个阶段的目标。即,回答以下两个问题:

  • 执行阶段 :如何执行并 保证 执行结果满足是 可回滚的(Rollbackable)持久化的(Durable)
  • 完成阶段: 收到 TC 的命令后,如何做到分支的提交或回滚?

以我们 Seata 的 AT 模式和 TCC 模式为例来理解:

AT 模式

at-mod
  • 执行阶段:

    • 可回滚:根据 SQL 解析结果,记录回滚日志
    • 持久化:回滚日志和业务 SQL 在同一个本地事务中提交到数据库
  • 完成阶段:

    • 分支提交:异步删除回滚日志记录
    • 分支回滚:依据回滚日志进行反向补偿更新

TCC 模式

tcc-mod
  • 执行阶段:

    • 调用业务定义的 Try 方法(完全由业务层面保证 可回滚持久化
  • 完成阶段:

    • 分支提交:调用各事务分支定义的 Confirm 方法
    • 分支回滚:调用各事务分支定义的 Cancel 方法

1.3 什么是 Seata 的 XA 模式?

XA 模式:

在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。

xa-mod
  • 执行阶段:

    • 可回滚:业务 SQL 操作放在 XA 分支中进行,由资源对 XA 协议的支持来保证 可回滚
    • 持久化:XA 分支完成后,执行 XA prepare,同样,由资源对 XA 协议的支持来保证 持久化(即,之后任何意外都不会造成无法回滚的情况)
  • 完成阶段:

    • 分支提交:执行 XA 分支的 commit
    • 分支回滚:执行 XA 分支的 rollback

2. 为什么支持 XA?

为什么要在 Seata 中增加 XA 模式呢?支持 XA 的意义在哪里呢?

2.1 补偿型事务模式的问题

本质上,Seata 已经支持的 3 大事务模式:AT、TCC、Saga 都是 补偿型 的。

补偿型 事务处理机制构建在 事务资源 之上(要么在中间件层面,要么在应用层面),事务资源 本身对分布式事务是无感知的。

img

事务资源 对分布式事务的无感知存在一个根本性的问题:无法做到真正的 全局一致性 。

比如,一条库存记录,处在 补偿型 事务处理过程中,由 100 扣减为 50。此时,仓库管理员连接数据库,查询统计库存,就看到当前的 50。之后,事务因为异外回滚,库存会被补偿回滚为 100。显然,仓库管理员查询统计到的 50 就是 脏 数据。

可以看到,补偿型 分布式事务机制因为不要求 事务资源 本身(如数据库)的机制参与,所以无法保证从事务框架之外的全局视角的数据一致性。

2.2 XA 的价值

与 补偿型 不同,XA 协议 要求 事务资源 本身提供对规范和协议的支持。

nct

因为 事务资源 感知并参与分布式事务处理过程,所以 事务资源(如数据库)可以保障从任意视角对数据的访问有效隔离,满足全局数据一致性。

比如,上一节提到的库存更新场景,XA 事务处理过程中,中间态数据库存 50 由数据库本身保证,是不会仓库管理员的查询统计 到的。(当然隔离级别需要 读已提交 以上)

除了 全局一致性 这个根本性的价值外,支持 XA 还有如下几个方面的好处:

  1. 业务无侵入:和 AT 一样,XA 模式将是业务无侵入的,不给应用设计和开发带来额外负担。
  2. 数据库的支持广泛:XA 协议被主流关系型数据库广泛支持,不需要额外的适配即可使用。
  3. 多语言支持容易:因为不涉及 SQL 解析,XA 模式对 Seata 的 RM 的要求比较少,为不同语言开发 SDK 较之 AT 模式将更 ,更容易。
  4. 传统基于 XA 应用的迁移:传统的,基于 XA 协议的应用,迁移到 Seata 平台,使用 XA 模式将更平滑。

2.3 XA 广泛被质疑的问题

不存在某一种分布式事务机制可以完美适应所有场景,满足所有需求。

XA 规范早在上世纪 90 年代初就被提出,用以解决分布式事务处理这个领域的问题。

现在,无论 AT 模式、TCC 模式还是 Saga 模式,这些模式的提出,本质上都源自 XA 规范对某些场景需求的无法满足。

XA 规范定义的分布式事务处理机制存在一些被广泛质疑的问题,针对这些问题,我们是如何思考的呢?

  1. 数据锁定:数据在整个事务处理过程结束前,都被锁定,读写都按隔离级别的定义约束起来。

思考:

数据锁定是获得更高隔离性和全局一致性所要付出的代价。

补偿型 的事务处理机制,在 执行阶段 即完成分支(本地)事务的提交,(资源层面)不锁定数据。而这是以牺牲 隔离性 为代价的。

另外,AT 模式使用 全局锁 保障基本的 写隔离,实际上也是锁定数据的,只不过锁在 TC 侧集中管理,解锁效率高且没有阻塞的问题。

  1. 协议阻塞:XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待。

思考:

协议的阻塞机制本身并不是问题,关键问题在于 协议阻塞 遇上 数据锁定。

如果一个参与全局事务的资源 “失联” 了(收不到分支事务结束的命令),那么它锁定的数据,将一直被锁定。进而,甚至可能因此产生死锁。

这是 XA 协议的核心痛点,也是 Seata 引入 XA 模式要重点解决的问题。

基本思路是两个方面:避免 “失联” 和 增加 “自解锁” 机制。(这里涉及非常多技术细节,暂时不展开,在后续 XA 模式演进过程中,会专门拿出来讨论)

  1. 性能差:性能的损耗主要来自两个方面:一方面,事务协调过程,增加单个事务的 RT;另一方面,并发事务数据的锁冲突,降低吞吐。

思考:

和不使用分布式事务支持的运行场景比较,性能肯定是下降的,这点毫无疑问。

本质上,事务(无论是本地事务还是分布式事务)机制就是拿部分 性能的牺牲 ,换来 编程模型的简单 。

与同为 业务无侵入 的 AT 模式比较:

首先,因为同样运行在 Seata 定义的分布式事务框架下,XA 模式并没有产生更多事务协调的通信开销。

其次,并发事务间,如果数据存在热点,产生锁冲突,这种情况,在 AT 模式(默认使用全局锁)下同样存在的。

所以,在影响性能的两个主要方面,XA 模式并不比 AT 模式有非常明显的劣势。

AT 模式性能优势主要在于:集中管理全局数据锁,锁的释放不需要 RM 参与,释放锁非常快;另外,全局提交的事务,完成阶段 异步化。

3. XA 模式如何实现以及怎样用?

3.1 XA 模式的设计

3.1.1 设计目标

XA 模式的基本设计目标,两个主要方面:

  1. 从 场景 上,满足 全局一致性 的需求。
  2. 从 应用上,保持与 AT 模式一致的无侵入。
  3. 从 机制 上,适应分布式微服务架构的特点。

整体思路:

  1. 与 AT 模式相同的:以应用程序中 本地事务 的粒度,构建到 XA 模式的 分支事务。
  2. 通过数据源代理,在应用程序本地事务范围外,在框架层面包装 XA 协议的交互机制,把 XA 编程模型 透明化。
  3. 把 XA 的 2PC 拆开,在分支事务 执行阶段 的末尾就进行 XA prepare,把 XA 协议完美融合到 Seata 的事务框架,减少一轮 RPC 交互。

3.1.2 核心设计

1. 整体运行机制

XA 模式 运行在 Seata 定义的事务框架内:

xa-fw
  • 执行阶段(E xecute):

    • XA start/XA end/XA prepare + SQL + 注册分支
  • 完成阶段(F inish):

    • XA commit/XA rollback

2. 数据源代理

XA 模式需要 XAConnection。

获取 XAConnection 两种方式:

  • 方式一:要求开发者配置 XADataSource
  • 方式二:根据开发者的普通 DataSource 来创建

第一种方式,给开发者增加了认知负担,需要为 XA 模式专门去学习和使用 XA 数据源,与 透明化 XA 编程模型的设计目标相违背。

第二种方式,对开发者比较友好,和 AT 模式使用一样,开发者完全不必关心 XA 层面的任何问题,保持本地编程模型即可。

我们优先设计实现第二种方式:数据源代理根据普通数据源中获取的普通 JDBC 连接创建出相应的 XAConnection。

类比 AT 模式的数据源代理机制,如下:

img

但是,第二种方法有局限:无法保证兼容的正确性。

实际上,这种方法是在做数据库驱动程序要做的事情。不同的厂商、不同版本的数据库驱动实现机制是厂商私有的,我们只能保证在充分测试过的驱动程序上是正确的,开发者使用的驱动程序版本差异很可能造成机制的失效。

这点在 Oracle 上体现非常明显。参见 Druid issue:https://github.com/alibaba/druid/issues/3707

综合考虑,XA 模式的数据源代理设计需要同时支持第一种方式:基于 XA 数据源进行代理。

类比 AT 模式的数据源代理机制,如下:

img

3. 分支注册

XA start 需要 Xid 参数。

这个 Xid 需要和 Seata 全局事务的 XID 和 BranchId 关联起来,以便由 TC 驱动 XA 分支的提交或回滚。

目前 Seata 的 BranchId 是在分支注册过程,由 TC 统一生成的,所以 XA 模式分支注册的时机需要在 XA start 之前。

将来一个可能的优化方向:

把分支注册尽量延后。类似 AT 模式在本地事务提交之前才注册分支,避免分支执行失败情况下,没有意义的分支注册。

这个优化方向需要 BranchId 生成机制的变化来配合。BranchId 不通过分支注册过程生成,而是生成后再带着 BranchId 去注册分支。

4. 小结

这里只通过几个重要的核心设计,说明 XA 模式的基本工作机制。

此外,还有包括 连接保持异常处理 等重要方面,有兴趣可以从项目代码中进一步了解。

以后会陆续写出来和大家交流。

3.1.3 演进规划

XA 模式总体的演进规划如下:

  1. 第 1 步(已经完成):首个版本(1.2.0),把 XA 模式原型机制跑通。确保只增加,不修改,不给其他模式引入的新问题。
  2. 第 2 步(计划 5 月完成):与 AT 模式必要的融合、重构。
  3. 第 3 步(计划 7 月完成):完善异常处理机制,进行上生产所必需的打磨。
  4. 第 4 步(计划 8 月完成):性能优化。
  5. 第 5 步(计划 2020 年内完成):结合 Seata 项目正在进行的面向云原生的 Transaction Mesh 设计,打造云原生能力。

3.2 XA 模式的使用

从编程模型上,XA 模式与 AT 模式保持完全一致。

可以参考 Seata 官网的样例:seata-xa

样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。

在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。

    @Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);

// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}

4. 总结

在当前的技术发展阶段,不存一个分布式事务处理机制可以完美满足所有场景的需求。

一致性、可靠性、易用性、性能等诸多方面的系统设计约束,需要用不同的事务处理机制去满足。

Seata 项目最核心的价值在于:构建一个全面解决分布式事务问题的 标准化 平台。

基于 Seata,上层应用架构可以根据实际场景的需求,灵活选择合适的分布式事务解决方案。

img

XA 模式的加入,补齐了 Seata 在 全局一致性 场景下的缺口,形成 AT、TCC、Saga、XA 四大 事务模式 的版图,基本可以满足所有场景的分布式事务处理诉求。

当然 XA 模式和 Seata 项目本身都还不尽完美,有很多需要改进和完善的地方。非常欢迎大家参与到项目的建设中,共同打造一个标准化的分布式事务平台。

· 阅读需 14 分钟

Seata阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

1.1 四种事务模式

Seata 目标打造一站式的分布事务的解决方案,最终会提供四种事务模式:

目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习 Seata 的时候,可以花更多精力在 AT 模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。

友情提示:具体的流行度,朋友可以选择看看 Wanted: who's using Seata 每个公司登记的使用方式。

1.2 三种角色

在 Seata 的架构中,一共有三个角色:

三个角色

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
  • RM ( Resource Manager ) - 资源管理器:管理分支事务处理的资源( Resource ),与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。

在 Seata 中,一个分布式事务的生命周期如下:

架构图

友情提示:看下艿艿添加的红色小勾。

  • TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。

    XID,会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。

  • RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。

  • TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。

  • TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

1.3 框架支持情况

Seata 目前提供了对主流的微服务框架的支持:

同时方便我们集成到 Java 项目当中,Seata 也提供了相应的 Starter 库:

因为 Seata 是基于 DataSource 数据源进行代理来拓展,所以天然对主流的 ORM 框架提供了非常好的支持:

  • MyBatis、MyBatis-Plus
  • JPA、Hibernate

1.4 案例情况

Wanted: who's using Seata 的登记情况,Seata 已经在国内很多团队开始落地,其中不乏有滴滴、韵达等大型公司。可汇总如下图:

汇总图

另外,在 awesome-seata 仓库中,艿艿看到了滴滴等等公司的落地时的技术分享,还是非常真实可靠的。如下图所示:awesome-seata 滴滴

从案例的情况来说,Seata 可能给是目前已知最可靠的分布式事务解决方案,至少对它进行技术投入是非常不错的选择。

2. 部署单机 TC Server

本小节,我们来学习部署单机 Seata TC Server,常用于学习或测试使用,不建议在生产环境中部署单机。

因为 TC 需要进行全局事务和分支事务的记录,所以需要对应的存储。目前,TC 有两种存储模式( store.mode ):

  • file 模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。
  • db 模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。

显然,我们将采用 file 模式,最终我们部署单机 TC Server 如下图所示:单机 TC Server

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

2.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

2.2 启动 TC Server

执行 nohup sh bin/seata-server.sh & 命令,启动 TC Server 在后台。在 nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 File 存储器
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[FILE] extension by class[io.seata.server.store.file.FileTransactionStoreManager]
2020-04-02 08:36:01.302 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[FILE] extension by class[io.seata.server.session.file.FileBasedSessionManager]
# 启动成功
2020-04-02 08:36:01.597 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
  • 默认配置下,Seata TC Server 启动在 8091 端点。

因为我们使用 file 模式,所以可以看到用于持久化的本地文件 root.data。操作命令如下:

$ ls -ls sessionStore/
total 0
0 -rw-r--r-- 1 yunai staff 0 Apr 2 08:36 root.data

后续,朋友可以阅读「4. 接入 Java 应用」小节,开始使用 Seata 实现分布式事务。

3. 部署集群 TC Server

本小节,我们来学习部署集群 Seata TC Server,实现高可用,生产环境下必备。在集群时,多个 Seata TC Server 通过 db 数据库,实现全局事务会话信息的共享。

同时,每个 Seata TC Server 可以注册自己到注册中心上,方便应用从注册中心获得到他们。最终我们部署 集群 TC Server 如下图所示:集群 TC Server

Seata TC Server 对主流的注册中心都提供了集成,具体可见 discovery 目录。考虑到国内使用 Nacos 作为注册中心越来越流行,这里我们就采用它。

友情提示:如果对 Nacos 不了解的朋友,可以参考《Nacos 安装部署》文章。

哔哔完这么多,我们开始正式部署单机 TC Server,这里艿艿使用 macOS 系统,和 Linux、Windows 是差不多的,朋友脑补翻译。

3.1 下载 Seata 软件包

打开 Seata 下载页面,选择想要的 Seata 版本。这里,我们选择 v1.1.0 最新版本。

# 创建目录
$ mkdir -p /Users/yunai/Seata
$ cd /Users/yunai/Seata

# 下载
$ wget https://github.com/apache/incubator-seata/releases/download/v1.1.0/seata-server-1.1.0.tar.gz

# 解压
$ tar -zxvf seata-server-1.1.0.tar.gz

# 查看目录
$ cd seata
$ ls -ls
24 -rw-r--r-- 1 yunai staff 11365 May 13 2019 LICENSE
0 drwxr-xr-x 4 yunai staff 128 Apr 2 07:46 bin # 执行脚本
0 drwxr-xr-x 9 yunai staff 288 Feb 19 23:49 conf # 配置文件
0 drwxr-xr-x 138 yunai staff 4416 Apr 2 07:46 lib # seata-*.jar + 依赖库

3.2 初始化数据库

① 使用 mysql.sql 脚本,初始化 Seata TC Server 的 db 数据库。脚本内容如下:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

在 MySQL 中,创建 seata 数据库,并在该库下执行该脚本。最终结果如下图:seata 数据库 - MySQL 5.X

② 修改 conf/file 配置文件,修改使用 db 数据库,实现 Seata TC Server 的全局事务会话信息的共享。如下图所示:conf/file 配置文件

③ MySQL8 的支持

如果朋友使用的 MySQL 是 8.X 版本,则需要看该步骤。否则,可以直接跳过。

首先,需要下载 MySQL 8.X JDBC 驱动,命令行操作如下:

$ cd lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

然后,修改 conf/file 配置文件,使用该 MySQL 8.X JDBC 驱动。如下图所示:seata 数据库 - MySQL 8.X

3.3 设置使用 Nacos 注册中心

修改 conf/registry.conf 配置文件,设置使用 Nacos 注册中心。如下图所示:conf/registry.conf 配置文件

3.4 启动 TC Server

① 执行 nohup sh bin/seata-server.sh -p 18091 -n 1 & 命令,启动第一个 TC Server 在后台。

  • -p:Seata TC Server 监听的端口。
  • -n:Server node。在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突。

nohup.out 文件中,我们看到如下日志,说明启动成功:

# 使用 DB 存储器
2020-04-05 16:54:12.793 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load DataSourceGenerator[dbcp] extension by class[io.seata.server.store.db.DbcpDataSourceGenerator]
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load LogStore[DB] extension by class[io.seata.core.store.db.LogStoreDataBaseDAO]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load TransactionStoreManager[DB] extension by class[io.seata.server.store.db.DatabaseTransactionStoreManager]
2020-04-05 16:54:13.442 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load SessionManager[DB] extension by class[io.seata.server.session.db.DataBaseSessionManager]
# 启动成功
2020-04-05 16:54:13.779 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...
# 使用 Nacos 注册中心
2020-04-05 16:54:13.788 INFO [main]io.seata.common.loader.EnhancedServiceLoader.loadFile:247 -load RegistryProvider[Nacos] extension by class[io.seata.discovery.registry.nacos.NacosRegistryProvider]

② 执行 nohup sh bin/seata-server.sh -p 28091 -n 2 & 命令,启动第二个 TC Server 在后台。

③ 打开 Nacos 注册中心的控制台,我们可以看到有两个 Seata TC Server 示例。如下图所示:Nacos 控制台

4. 接入 Java 应用

4.1 AT 模式

① Spring Boot

1、《芋道 Spring Boot 分布式事务 Seata 入门》「2. AT 模式 + 多数据源」小节,实现单体 Spring Boot 项目在多数据源下的分布式事务。

整体图

2、《芋道 Spring Boot 分布式事务 Seata 入门》「AT 模式 + HttpClient 远程调用」小节,实现多个 Spring Boot 项目的分布事务。

整体图

② Dubbo

《Dubbo 分布式事务 Seata 入门》「2. AT 模式」小节,实现多个 Dubbo 服务下的分布式事务。

整体图

③ Spring Cloud

《芋道 Spring Cloud Alibaba 分布式事务 Seata 入门》「3. AT 模式 + Feign」小节,实现多个 Spring Cloud 服务下的分布式事务。

整体图

4.2 TCC 模式

4.3 Saga 模式

4.4 XA 模式

Seata 正在开发中...

· 阅读需 7 分钟

使用配置中心和数据库来实现 Seata 的高可用,以 Nacos 和 MySQL 为例,将cloud-seata-nacos应用部署到 Kubernetes 集群中

该应用使用 Nacos 作为配置和注册中心,总共有三个服务: order-service, pay-service, storage-service, 其中 order-service 对外提供下单接口,当余额和库存充足时,下单成功,会提交事务,当不足时会抛出异常,下单失败,回滚事务

准备工作

需要准备可用的注册中心、配置中心 Nacos 和 MySQL,通常情况下,注册中心、配置中心和数据库都是已有的,不需要特别配置,在这个实践中,为了简单,只部署单机的注册中心、配置中心和数据库,假设他们是可靠的

  • 部署 Nacos

在服务器部署 Nacos,开放 8848 端口,用于 seata-server 注册,服务器地址为 192.168.199.2

docker run --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server
  • 部署 MySQL

部署一台MySQL 数据库,用于保存事务数据,服务器地址为 192.168.199.2

docker run --name mysql -p 30060:3306-e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17

部署 seata-server

  • 创建seata-server需要的表

具体的 SQL 参考 script/server/db,这里使用的是 MySQL 的脚本,数据库名称为 seata

同时,也需要创建 undo_log 表, 可以参考 script/client/at/db/

  • 修改seata-server配置

将以下配置添加到 Nacos 配置中心,具体添加方法可以参考 script/config-center

service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true
store.db.user=root
store.db.password=123456

部署 seata-server 到 Kubernetes

  • seata-server.yaml

需要将 ConfigMap 的注册中心和配置中心地址改成相应的地址

apiVersion: v1
kind: Service
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
type: ClusterIP
ports:
- port: 8091
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-server

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: seata-ha-server
namespace: default
labels:
app.kubernetes.io/name: seata-ha-server
spec:
serviceName: seata-ha-server
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-server
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-server
spec:
containers:
- name: seata-ha-server
image: docker.io/seataio/seata-server:latest
imagePullPolicy: IfNotPresent
env:
- name: SEATA_CONFIG_NAME
value: file:/root/seata-config/registry
ports:
- name: http
containerPort: 8091
protocol: TCP
volumeMounts:
- name: seata-config
mountPath: /root/seata-config
volumes:
- name: seata-config
configMap:
name: seata-ha-server-config


---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-server-config
data:
registry.conf: |
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.199.2"
}
}
config {
type = "nacos"
nacos {
serverAddr = "192.168.199.2"
group = "SEATA_GROUP"
}
}
  • 部署
kubectl apply -f seata-server.yaml

部署完成后,会有三个 pod

kubectl get pod | grep seata-ha-server

seata-ha-server-645844b8b6-9qh5j 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-pzczs 1/1 Running 0 3m14s
seata-ha-server-645844b8b6-wkpw8 1/1 Running 0 3m14s

待启动完成后,可以在 Nacos 的服务列表中发现三个 seata-server 的实例,至此,已经完成 seata-server 的高可用部署

  • 查看服务日志
kubelet logs -f seata-ha-server-645844b8b6-9qh5j
[0.012s][info   ][gc] Using Serial
2020-04-15 00:55:09.880 INFO [main]io.seata.server.ParameterParser.init:90 -The server is running in container.
2020-04-15 00:55:10.013 INFO [main]io.seata.config.FileConfiguration.<init>:110 -The configuration file used is file:/root/seata-config/registry.conf
2020-04-15 00:55:12.426 INFO [main]com.alibaba.druid.pool.DruidDataSource.init:947 -{dataSource-1} inited
2020-04-15 00:55:13.127 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started

其中{dataSource-1} 说明使用了数据库,并正常初始化完成

  • 查看注册中心,此时seata-serve 这个服务会有三个实例

seata-ha-nacos-list.png

部署业务服务

  • 创建业务表并初始化数据

具体的业务表可以参考 cloud-seata-nacos/README.md

  • 添加 Nacos 配置

在 public 的命名空间下,分别创建 data-id 为 order-service.properties, pay-service.properties, storage-service.properties 的配置,内容相同,需要修改数据库的地址、用户名和密码

# MySQL
spring.datasource.url=jdbc:mysql://192.168.199.2:30060/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Seata
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
  • 部署服务

通过 application.yaml 配置文件部署服务,需要注意的是修改 ConfigMap 的 NACOS_ADDR为自己的 Nacos 地址

apiVersion: v1
kind: Service
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
type: NodePort
ports:
- port: 8081
nodePort: 30081
protocol: TCP
name: http
selector:
app.kubernetes.io/name: seata-ha-service

---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-ha-service-config
data:
NACOS_ADDR: 192.168.199.2:8848

---
apiVersion: v1
kind: ServiceAccount
metadata:
name: seata-ha-account
namespace: default

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: seata-ha-account
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: seata-ha-account
namespace: default

---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: default
name: seata-ha-service
labels:
app.kubernetes.io/name: seata-ha-service
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: seata-ha-service
template:
metadata:
labels:
app.kubernetes.io/name: seata-ha-service
spec:
serviceAccountName: seata-ha-account
containers:
- name: seata-ha-order-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-order-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8081
protocol: TCP
- name: seata-ha-pay-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-pay-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8082
protocol: TCP
- name: seata-ha-storage-service
image: "registry.cn-qingdao.aliyuncs.com/hellowoodes/seata-ha-storage-service:1.1"
imagePullPolicy: IfNotPresent
env:
- name: NACOS_ADDR
valueFrom:
configMapKeyRef:
key: NACOS_ADDR
name: seata-ha-service-config
ports:
- name: http
containerPort: 8083
protocol: TCP

通过以下命令,将应用部署到集群中

kubectl apply -f application.yaml

然后查看创建的 pod,seata-ha-service 这个服务下有三个 pod

kubectl get pod | grep seata-ha-service

seata-ha-service-7dbdc6894b-5r8q4 3/3 Running 0 12m

待应用启动后,在 Nacos 的服务列表中,会有相应的服务

seata-ha-service-list.png

此时查看服务的日志,会看到服务向每一个 TC 都注册了

kubectl logs -f seata-ha-service-7dbdc6894b-5r8q4 seata-ha-order-service

seata-ha-service-register.png

查看任意的 TC 日志,会发现每一个服务都向 TC 注册了

kubelet logs -f seata-ha-server-645844b8b6-9qh5j

seata-ha-tc-register.png

测试

测试成功场景

调用下单接口,将 price 设置为 1,因为初始化的余额为 10,可以下单成功

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 1
}'

此时返回结果为:

{"success":true,"message":null,"data":null}

查看TC 的日志,事务成功提交:

seata-ha-commit-tc-success.png

查看 order-service 服务日志 seata-ha-commit-success.png

测试失败场景

设置 price 为 100,此时余额不足,会下单失败抛出异常,事务会回滚

curl -X POST \
http://192.168.199.2:30081/order/placeOrder \
-H 'Content-Type: application/json' \
-d '{
"userId": 1,
"productId": 1,
"price": 100
}'

查看 TC 的日志: seata-ha-commit-tc-rollback.png

查看服务的日志 : seata-ha-commit-service-rollback.png

多次调用查看服务日志,发现会随机的向其中某台TC发起事务注册,当扩容或缩容后,有相应的 TC 参与或退出,证明高可用部署生效

· 阅读需 8 分钟

一 . 导读

根据大佬定义的分类,配置可以有三种:环境配置、描述配置、扩展配置。

环境配置:像一些组件启动时的参数等,通常是离散的简单值,多是 key-value 型数据。

描述配置:与业务逻辑相关,比如:事务发起方和参与方,通常会嵌到业务的生命周期管理中。描述配置信息较多,甚至有层次关系。

扩展配置:产品需要发现第三方实现,对配置的聚合要求比较高,比如:各种配置中心和注册中心,通常做法是在 jar 包的 META-INF/services 下放置接口类全名文件,内容为每行一个实现类类名。

二. 环境配置

seata server 在加载的时候,会使用 resources/registry.conf 来确定配置中心和注册中心的类型。而 seata client 在 1.0 版本后,不仅能使用 conf 文件进行配置的加载,也可以在 springboot 的 yml 配置文件中,使用 seata.config.{type} 来进行配置中心的选择,注册中心与之类似。通过 yml 加载配置的源码在 io.seata.spring.boot.autoconfigure.properties.registry 包下。

如果 seata 客户端的使用者既在 resources 下放了 conf 配置文件又在 yml 文件中配置,那么会优先使用 yml 中配置的。代码:

CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;

这里 extConfiguration 是外部配置实例,即 ExtConfigurationProvider#provide() 外部配置提供类提供的,而 configuration 是另一个配置提供类提供的 ConfigurationProvider#provide(),这两个配置提供类是在 config 模块 ConfigurationFactory 静态块中,通过 SPI 的方式加载。

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);

上面说的是配置中心类型的选择,而配置环境的加载,是在确定了使用什么配置中心类型后,再通过相应的配置中心加载环境配置。File 即文本方式配置也是一种配置中心。

client 和 server 获取配置参数,是通过 ConfigurationFactory#getInstance() 获取配置类实例,再使用配置类实例获取配置参数,配置的 key 这些常量的定义,主要在 core 模块下 config 文件中。

一些重要的环境配置属性的意义,官网都有介绍

在实例化的时候通过 ConfigurationFactory 获取后注入构造函数中的,需要重启才能生效,而在使用时通过 ConfigurationFactory 实时获取的,配置改了就可以生效。

但是 config 模块提供了 ConfigurationChangeListener#onChangeEvent 接口方法来修改实例内部的属性。即在这个方法中,监听动态变化的属性,如果检测到自身使用的属性和刚开始注入时不一样了,就修改实例中保存的属性,和配置中心保持一致,这样就实现了动态配置。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener {
private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,false);
@Override public Object invoke(Param param) {
if(disable){//事务业务处理}
}
@Override public void onChangeEvent(Param param) {
disable = param;
}}

上面是 spring 模块下的 GlobalTransactionalInterceptor 与降级属性相关的伪代码。 GlobalTrarnsactionalScanner 在上面的 interceptor 类被实例化时,把 interceptor 注册到了配置变化监听列表中,当配置被改变的时候,会调用监听器:

ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);

降级的意思是,当服务某一项功能不可用的时候,通过动态配置的属性,把某一项功能给关了,这样就可以避免一直尝试失败的处理。interceptor#invoke() 只有当这个 disable 属性为 true 时,才会执行 seata 事务相关业务。

三. 描述配置

一般性框架描述性配置通常信息比较多,甚至有层次关系,用 xml 配置比较方便,因为树结构描述性更强。而现在的习惯都在提倡去繁琐的约束性配置,采用约定的方式。

seata AT 模式是通过代理数据源的方式来进行事务处理,对业务方入侵较小,只需让 seata 在启动时,识别哪些业务方需要开启全局事务,所以用注解就可以实现描述性配置。

@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
public String doBiz(String msg) {}

如果是 tcc 模式,事务参与方还需使用注解标识:

@TwoPhaseBusinessAction(name = "tccActionForSpringTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int i);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);

四 .扩展配置

扩展配置,通常对产品的聚合要求比较高,因为产品需要发现第三方实现,将其加入产品内部。

在这里插入图片描述 这是一个自定义配置中心提供类的例子,在 META-INF/services 下放置一个接口同名的文本文件,文件的内容为接口的实现类。这是标准的 spi 方式。然后修改配置文件 registry.conf 中的 config.type=test 。

但是如果你认为这样就可以被 seata 识别到,并且替换掉配置中心,那你就错了。seata 在加载配置中心的时候,使用 enum ConfigType 包裹了一下配置文件中配置的配置中心的类型的值:

private static Configuration buildConfiguration() {
configTypeName = "test";//registry.conf中配置的config.type
configType = ConfigType.getType(configTypeName);//ConfigType获取不到会抛异常
}

如果在 ConfigType 中没有定义 test 这种配置中心类型,那么会抛异常。所以单纯的修改配置文件而不改变源码是无法使用 ConfigType 中定义的配置中心提供类以外的配置中心提供类。

目前 1.0 版本在 ConfigType 中定义的配置中心类型有:File,ZK,Nacos,Apollo,Consul,Etcd3,SpringCloudConfig,Custom。如果用户想使用自定义的配置中心类型,可以使用 Custom 这种类型。

在这里插入图片描述 这里可以使用不优雅的方式,即提供一个指定名称 ZK 但是级别 order=3 更高的实现类(ZK 默认 order=1),就可以让 ConfigurationFactory 使用 TestConfigurationProvider 作为配置中心提供类。

通过上面的步骤,就可以让 seata 使用我们自己提供的代码。seata 中 codec、compressor、discovery、integration 等模块,都是使用 spi 机制加载功能类,符合微内核 + 插件化,平等对待第三方的设计思想。

五 . seata 源码分析系列地址

作者:赵润泽,系列地址

· 阅读需 4 分钟

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

​ 1.首先来看下包结构,在 seata-dubbo 和 seata-dubbo-alibaba 下有统一由 TransactionPropagationFilter 这个类,分别对应 apache-dubbo 跟 alibaba-dubbo.

20200101203229

分析源码

package io.seata.integration.dubbo;

import io.seata.core.context.RootContext;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//获取本地XID
String xid = RootContext.getXID();
String xidInterceptorType = RootContext.getXIDInterceptorType();
//获取Dubbo隐式传参中的XID
String rpcXid = getRpcXid();
String rpcXidInterceptorType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
//传递XID
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_INTERCEPTOR_TYPE, xidInterceptorType);
} else {
if (rpcXid != null) {
//绑定XID
RootContext.bind(rpcXid);
RootContext.bindInterceptorType(rpcXidInterceptorType);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] interceptorType[{}] to RootContext", rpcXid, rpcXidInterceptorType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
//进行剔除已完成事务的XID
String unbindInterceptorType = RootContext.unbindInterceptorType();
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[{}] interceptorType[{}] from RootContext", unbindXid, unbindInterceptorType);
}
//如果发现解绑的XID并不是当前接收到的XID
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}, xidInterceptorType from {} to {} ", rpcXid, unbindXid, rpcXidInterceptorType, unbindInterceptorType);
if (unbindXid != null) {
//重新绑定XID
RootContext.bind(unbindXid);
RootContext.bindInterceptorType(unbindInterceptorType);
LOGGER.warn("bind [{}] interceptorType[{}] back to RootContext", unbindXid, unbindInterceptorType);
}
}
}
}
}

/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}

}

​ 1.根据源码,我们可以推出相应的逻辑处理

20200101213336

要点知识

​ 1.Dubbo @Activate 注解:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件。
* <br />
* 包含{@link ExtensionLoader#getActivateExtension}的group参数给的值,则返回扩展。
* <br />
* 如没有Group设置,则不过滤。
*/
String[] group() default {};

/**
* Key过滤条件。包含{@link ExtensionLoader#getActivateExtension}的URL的参数Key中有,则返回扩展。
* <p/>
* 示例:<br/>
* 注解的值 <code>@Activate("cache,validatioin")</code>,
* 则{@link ExtensionLoader#getActivateExtension}的URL的参数有<code>cache</code>Key,或是<code>validatioin</code>则返回扩展。
* <br/>
* 如没有设置,则不过滤。
*/
String[] value() default {};

/**
* 排序信息,可以不提供。
*/
String[] before() default {};

/**
* 排序信息,可以不提供。
*/
String[] after() default {};

/**
* 排序信息,可以不提供。
*/
int order() default 0;
}

可以分析得知,Seata 的 dubbo 过滤器上的注解@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100),表示 dubbo 的服务提供方跟消费方都会触发到这个过滤器,所以我们的 Seata 发起者会产生一个 XID 的传递,上述流程图跟代码已经很清晰的表示了.

​ 2.Dubbo 隐式传参可以通过 RpcContext 上的 setAttachmentgetAttachment 在服务消费方和提供方之间进行参数的隐式传递。

获取:RpcContext.getContext().getAttachment(RootContext.KEY_XID);

传递:RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);

总结

更多源码阅读请访问Seata 官网

· 阅读需 11 分钟

一 .导读

spring 模块分析中讲到,Seata 的 spring 模块会对涉及到分布式业务的 bean 进行处理。项目启动时,当 GlobalTransactionalScanner 扫描到 TCC 服务的 reference 时(即tcc事务参与方),会对其进行动态代理,即给 bean 织入 TCC 模式下的 MethodInterceptor 的实现类。tcc 事务发起方依然使用 @GlobalTransactional 注解开启,织入的是通用的 MethodInterceptor 的实现类。

TCC 模式下的 MethodInterceptor 实现类即 TccActionInterceptor(spring模块) ,这个类中调用了 ActionInterceptorHandler(tcc模块) 进行 TCC 模式下事务流程的处理。

TCC 动态代理的主要功能是:生成TCC运行时上下文、透传业务参数、注册分支事务记录。

二 .TCC模式介绍

在2PC(两阶段提交)协议中,事务管理器分两阶段协调资源管理,资源管理器对外提供三个操作,分别是一阶段的准备操作,和二阶段的提交操作和回滚操作。

public interface TccAction {

@TwoPhaseBusinessAction(name = "tccActionForTest" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "a") int a,
@BusinessActionContextParameter(paramName = "b", index = 0) List b,
@BusinessActionContextParameter(isParamInProperty = true) TccParam tccParam);

public boolean commit(BusinessActionContext actionContext);

public boolean rollback(BusinessActionContext actionContext);
}

这是 TCC 参与者实例,参与者需要实现三个方法,第一个参数必须是 BusinessActionContext ,方法返回类型固定,对外发布成微服务,供事务管理器调用。

prepare:资源的检查和预留。例:扣减账户的余额,并增加相同的冻结余额。

commit:使用预留的资源,完成真正的业务操作。例:减少冻结余额,扣减资金业务完成。

cancel:释放预留资源。例:冻结余额加回账户的余额。

其中 BusinessActionContext 封装了本次事务的上下文环境:xid、branchId、actionName 和被 @BusinessActionContextParam 注解的参数等。

参与方业务有几个需要注意的地方: 1.控制业务幂等性,需要支持同一笔事务的重复提交和重复回滚。 2.防悬挂,即二阶段的回滚,比一阶段的 try 先执行。 3.放宽一致性协议,最终一致,所以是读已修改

三 . remoting 包解析

在这里插入图片描述

包中所有的类都是为包中的 DefaultRemotingParser 服务,Dubbo、LocalTCC、SofaRpc 分别负责解析各自RPC协议下的类。

DefaultRemotingParser 的主要方法: 1.判断 bean 是否是 remoting bean,代码:

    @Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

2.远程 bean 解析,把 rpc类 解析成 RemotingDesc,,代码:

@Override
public boolean isRemoting(Object bean, String beanName) throws FrameworkException {
//判断是否是服务调用方或者是否是服务提供方
return isReference(bean, beanName) || isService(bean, beanName);
}

利用 allRemotingParsers 来解析远程 bean 。allRemotingParsers是在:initRemotingParser() 中调用EnhancedServiceLoader.loadAll(RemotingParser.class) 动态进行 RemotingParser 子类的加载,即 SPI 加载机制。

如果想扩展,比如实现一个feign远程调用的解析类,只要把RemotingParser相关实现类写在 SPI 的配置中就可以了,扩展性很强。

RemotingDesc 事务流程需要的远程 bean 的一些具体信息,比如 targetBean、interfaceClass、interfaceClassName、protocol、isReference等等。

3.TCC资源注册

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName) {
RemotingDesc remotingBeanDesc = getServiceDesc(bean, beanName);
if (remotingBeanDesc == null) {
return null;
}
remotingServiceMap.put(beanName, remotingBeanDesc);

Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if (isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
new Class[] {BusinessActionContext.class}));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (isReference(bean, beanName)) {
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}

首先判断是否是事务参与方,如果是,拿到 RemotingDesc 中的 interfaceClass,遍历接口中的方法,判断方法上是否有@TwoParserBusinessAction 注解,如果有,把参数封装成 TCCRecource,通过 DefaultResourceManager 进行 TCC 资源的注册。

这里 DefaultResourceManager 会根据 Resource 的 BranchType 来寻找对应的资源管理器,TCC 模式下资源管理类,在 tcc 模块中。

这个 rpc 解析类主要提供给 spring 模块进行使用。parserRemotingServiceInfo() 被封装到了 spring 模块的 TCCBeanParserUtils 工具类中。spring 模块的 GlobalTransactionScanner 在项目启动的时候,通过工具类解析 TCC bean,工具类 TCCBeanParserUtils 会调用 TCCResourceManager 进行资源的注册,并且如果是全局事务的服务提供者,会织入 TccActionInterceptor 代理。这些个流程是 spring 模块的功能,tcc 模块是提供功能类给 spring 模块使用。

三 .tcc 资源管理器

TCCResourceManager 负责管理 TCC 模式下资源的注册、分支的注册、提交、和回滚。

1.在项目启动时, spring 模块的 GlobalTransactionScanner 扫描到 bean 是 tcc bean 时,会本地缓存资源,并向 server 注册:

    @Override
public void registerResource(Resource resource) {
TCCResource tccResource = (TCCResource)resource;
tccResourceCache.put(tccResource.getResourceId(), tccResource);
super.registerResource(tccResource);
}

与server通信的逻辑被封装在了父类 AbstractResourceManage 中,这里根据 resourceId 对 TCCResource 进行缓存。父类 AbstractResourceManage 注册资源的时候,使用 resourceGroupId + actionName,actionName 就是 @TwoParseBusinessAction 注解中的 name,resourceGroupId 默认是 DEFAULT。

2.事务分支的注册在 rm-datasource 包下的 AbstractResourceManager 中,注册时参数 lockKeys 为 null,和 AT 模式下事务分支的注册还是有些不一样的。

3.分支的提交或者回滚:

    @Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
try {
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}

通过参数 xid、branchId、resourceId、applicationData 恢复业务的上下文 businessActionContext。

根据获取到的上下文通过反射执行 commit 方法,并返回执行结果。回滚方法类似。

这里 branchCommit() 和 branchRollback() 提供给 rm 模块资源处理的抽象类 AbstractRMHandler 调用,这个 handler 是 core 模块定义的模板方法的进一步实现类。和 registerResource() 不一样,后者是 spring 扫描时主动注册资源。

四 . tcc 模式事务处理

spring 模块中的 TccActionInterceptor 的 invoke() 方法在被代理的 rpc bean 被调用时执行。该方法先获取 rpc 拦截器透传过来的全局事务 xid ,然后 TCC 模式下全局事务参与者的事务流程还是交给 tcc 模块 ActionInterceptorHandler 处理。

也就是说,事务参与者,在项目启动的时候,被代理。真实的业务方法,在 ActionInterceptorHandler 中,通过回调执行。

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(4);

//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName);

//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);

//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}

这里有两个重要操作:

1.doTccActionLogStore() 这个方法中,调用了两个比较重要的方法: fetchActionRequestContext(method, arguments),这个方法把被 @BusinessActionContextParam 注解的参数取出来,在下面的 init 方法中塞入 BusinessActionComtext ,同时塞入的还有事务相关参数。 DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null),这个方法执行 TCC 模式下事务参与者事务分支的注册。

2.回调执行 targetCallback.execute() ,被代理的 bean 具体的业务,即 prepare() 方法。

五 .总结

tcc模块,主要提供以下功能 :

  1. 定义两阶段协议注解,提供 tcc 模式下事务流程需要的属性。
  2. 提供解析不同 rpc 框架 remoting bean 的 ParserRemoting 实现,供 spring 模块调用。
  3. 提供 TCC 模式下资源管理器,进行资源注册、事务分支注册提交回滚等。
  4. 提供 TCC 模式下事务流程的处理类,让 MethodInterceptor 代理类不执行具体模式的事务流程,而是下放到 tcc 模块。

五 .相关

作者:赵润泽,系列地址

· 阅读需 1 分钟

活动介绍

亮点解读

  • Seata 开源项目发起人带来《Seata 过去、现在和未来》以及 Seata 1.0 的新特性。
  • Seata 核心贡献者详解 Seata AT, TCC, Saga 模式。
  • Seata 落地互联网医疗,滴滴出行实践剖析。

如您不能前来参会

现场福利

  • 讲师 PPT 打包下载
  • 精美茶歇,阿里公仔,天猫精灵等好礼等你来拿

议程

· 阅读需 9 分钟

一 . 导读

core 模块定义了事务的类型、状态,通用的行为,client 和 server 通信时的协议和消息模型,还有异常处理方式,编译、压缩类型方式,配置信息名称,环境 context 等,还基于 netty 封装了 rpc ,供客户端和服务端使用。

按包顺序来分析一下 core 模块主要功能类:

在这里插入图片描述

codec:定义了一个 codec 的工厂类,提供了一个方法,根据序列化类型来找对应的处理类。还提供了一个接口类 Codec ,有两个抽象方法:

<T> byte[] encode(T t);
<T> T decode(byte[] bytes);

目前 1.0 版本在 codec 模块,有三种序列化的实现:SEATA、PROTOBUF、KRYO。

compressor:和 codec 包下面类一样,都是三个类,一个压缩类型类,一个工厂类,一个压缩和解压缩操作的抽象类。1.0 版本就只有一种压缩方式:Gzip

constants:两个 ClientTableColumnsName、ServerTableColumnsName 类,分别是 client 端存储事务的表和 server 端存储事务表对应的 model 类。还有定义支持的数据库类型类和一些定义配置信息属性的前缀的类。

context:环境类 RootContext 持有一个 ThreadLocalContextCore 用来存储事务的标识信息。比如 TX_XID 用来唯一的表示一个事务。TX_LOCK 如果存在,则表示本地事务对于 update/delete/insert/selectForUpdate SQL 需要用全局锁控制。

event:这里用到了 guava 中 EventBus 事件总线来进行注册和通知,监听器模式。在 server 模块的 metrics 包中,MetricsManager 在初始化的时候,对 GlobalStatus 即 server 模块处理事务的几个状态变化时,注册了监挺事件,当 server 处理事务时,会回调监听的方法,主要是为了进行统计各种状态事务的数量。

lock: server 在收到 registerBranch 消息进行分支注册的时候,会加锁。1.0 版本有两种锁的实现,DataBaseLocker 和 MemoryLocker,分别是数据库锁和内存锁,数据库锁根据 rowKey = resourceId + tableName + pk 进行加锁,内存锁直接就是根据 primary key。

model:BranchStatus、GlobalStatus、BranchType 用来定义事务的类型和全局、分支状态。还有 TransactionManager 和 ResourceManager,是 rm 和 tm 的抽象类。具体的 rm 和 tm 的实现,因为各种事务类型都不同,所以这里没有具体的实现类。

protocol:定义了 rpc 模块传输用的实体类,即每个事务状态场景下 request 和 response 的 model。

store:定了与数据库打交道的数据模型,和与数据库交互的语句。

二 . exception 包中 handler 类分析

这是 AbstractExceptionHandler 的 UML 图,Callback 、AbstractCallback 是 AbstractExceptionHandler 的内部接口和内部类,AbstractCallback 抽象类实现了接口 Callback 的三个方法,还有一个 execute() 未实现。AbstractExceptionHandler 使用了 AbstractCallback 作为模板方法的参数,并使用了其实现的三个方法,但是 execute() 方法仍留给子类实现。 在这里插入图片描述 从对外暴露的角度看 AbstractExceptionHandler 定义了一个带有异常处理的模板方法,模板中有四个行为,在不同的情况下执行,其中三种行为已经实现,执行的行为交由子类自行实现,详解:

1.使用模板方法模式,在 exceptionHandlerTemplate() 中,定义好了执行的模板

    public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request,
AbstractTransactionResponse response) {
try {
callback.execute(request, response); //执行事务业务的方法
callback.onSuccess(request, response); //设置response返回码
} catch (TransactionException tex) {
LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
callback.onTransactionException(request, response, tex); //设置response返回码并设置msg
} catch (RuntimeException rex) {
LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
callback.onException(request, response, rex); //设置response返回码并设置msg
}
}

onSuccess、onTransactionException、onException 在 AbstarctCallback 中已经被实现,execute 则由 AbstractExceptionHandler 子类即负责不同事务模式的 handler 类进行实现。 AbstractExceptionHandler 目前有两个子类:AbstractTCInboundHandler 负责处理全局事务的业务,AbstractRMHandler 负责处理分支事务的业务。

2.使用回调机制,优点是:允许 AbstractExceptionHandler 把需要调用的类 Callback 作为参数传递进来,handler 不需要知道 callback 的具体执行逻辑,只要知道 callback 的特性原型和限制条件(参数、返回值),就可以使用了。

先使用模板方法,把事务业务流程定下来,再通过回调,把具体执行事务业务的方法,留给子类实现。设计的非常巧妙。

这个 exceptionHandlerTemplate() 应该翻译成带有异常处理的模板方法。异常处理已经被抽象类实现,具体的不同模式下 commit 、rollback 的业务处理则交给子类实现。

三 . rpc 包分析

seata 对于 rpc 的封装,细节不需要纠结,可以研究一下一下对于事务业务的处理。

client 端的 rpc 类是 AbstractRpcRemotingClient: 在这里插入图片描述

重要的属性和方法都在类图中,消息发送和初始化方法没画在类图中,详细分析一下类图:

clientBootstrap:是 netty 启动类 Bootstrap 的封装类,持有了 Bootstrap 的实例,并自定义自己想要的属性。

clientChannelManager:使用 ConcurrentHashMap<serverAddress,channel> 容器维护地址和 channel 的对应关系。

clientMessageListener: 消息的处理类,根据消息的类型的不同有三种具体的处理方法

public void onMessage(RpcMessage request, String serverAddress, ClientMessageSender sender) {
Object msg = request.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("onMessage:" + msg);
}
if (msg instanceof BranchCommitRequest) {
handleBranchCommit(request, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
handleBranchRollback(request, serverAddress, (BranchRollbackRequest)msg, sender);
} else if (msg instanceof UndoLogDeleteRequest) {
handleUndoLogDelete((UndoLogDeleteRequest)msg);
}
}

消息类中,持有 TransactionMessageHandler 对不同类型消息进行处理,最终会根据事务类型的不同(AT、TCC、SAGE)调用具体的处理类,即第二部分说的 exceptionHandleTemplate() 的实现类。

mergeSendExecutorService:是一个线程池,只有一个线程,负责对不同地址下的消息进行和并发送。在 sendAsyncRequest() 中,会给线程池的队列 LinkedBlockingQueue<> offer 消息,然后这个线程负责 poll 和处理消息。

channelRead():处理服务端的 HeartbeatMessage.PONG 心跳消息。还有消息类型是 MergeResultMessage 即异步消息的响应消息,根据 msgId 找到对应 MessageFuture ,并设置异步消息的 result 结果。

dispatch():调用 clientMessageListener 处理 server 发送过来的消息,不同类型 request 有不同的处理类。

简单点看 netty,只需要关注序列化方式和消息处理 handler 类。seata 的 rpc 序列化方式通过工厂类找 Codec 实现类进行处理,handler 即上文说的 TransactionMessageHandler 。

四 . 总结

core 模块涉及的功能很多,其中的类大多都是其他模块的抽象类。抽象出业务模型,具体的实现分布在不同的模块。core 模块的代码非常的优秀,很多设计都是经典,比如上文分析的基于模板模式改造的,非常实用也非常美,值得仔细研究。

五 . seata 源码分析系列地址

系列地址

· 阅读需 5 分钟

本文作者:FUNKYE(陈健斌),杭州某互联网公司主程。

前言

通过GA大会上滴滴出行的高级研发工程陈鹏志的在滴滴两轮车业务中的实践,发现动态降级的必要性是非常的高,所以这边简单利用spring boot aop来简单的处理降级相关的处理,这边非常感谢陈鹏志的分享!

可利用此demo项目地址

通过以下代码改造实践.

准备工作

​ 1.创建测试用的TestAspect:

package org.test.config;

import java.lang.reflect.Method;

import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;

@Aspect
@Component
public class TestAspect {
private final static Logger logger = LoggerFactory.getLogger(TestAspect.class);

@Before("execution(* org.test.service.*.*(..))")
public void before(JoinPoint joinPoint) throws TransactionException {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
logger.info("拦截到需要分布式事务的方法," + method.getName());
// 此处可用redis或者定时任务来获取一个key判断是否需要关闭分布式事务
// 模拟动态关闭分布式事务
if ((int)(Math.random() * 100) % 2 == 0) {
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
tx.begin(300000, "test-client");
} else {
logger.info("关闭分布式事务");
}
}

@AfterThrowing(throwing = "e", pointcut = "execution(* org.test.service.*.*(..))")
public void doRecoveryActions(Throwable e) throws TransactionException {
logger.info("方法执行异常:{}", e.getMessage());
if (!StringUtils.isBlank(RootContext.getXID()))
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
}

@AfterReturning(value = "execution(* org.test.service.*.*(..))", returning = "result")
public void afterReturning(JoinPoint point, Object result) throws TransactionException {
logger.info("方法执行结束:{}", result);
if ((Boolean)result) {
if (!StringUtils.isBlank(RootContext.getXID())) {
logger.info("分布式事务Id:{}", RootContext.getXID());
GlobalTransactionContext.reload(RootContext.getXID()).commit();
}
}
}

}

请注意上面的包名可改为你自己的service包名:

​ 2.改动service代码:

    public Object seataCommit() {
testService.Commit();
return true;
}

因为异常跟返回结果我们都会拦截,所以这边可以trycatch或者直接让他抛异常来拦截也行,或者直接判断返回结果,比如你的业务代码code=200为成功,那么就commit,反之在拦截返回值那段代码加上rollback;

进行调试

​ 1.更改代码主动抛出异常

    public Object seataCommit() {
try {
testService.Commit();
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}

​ 查看日志:

2019-12-23 11:57:55.386  INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController       : 拦截到需要分布式事务的方法,seataCommit
2019-12-23 11:57:55.489 INFO 23952 --- [.0-28888-exec-7] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.14.67:8092:2030765910]
2019-12-23 11:57:55.489 INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController : 创建分布式事务完毕192.168.14.67:8092:2030765910
2019-12-23 11:57:55.709 INFO 23952 --- [.0-28888-exec-7] org.test.controller.TestController : 方法执行异常:null
2019-12-23 11:57:55.885 INFO 23952 --- [.0-28888-exec-7] i.seata.tm.api.DefaultGlobalTransaction : [192.168.14.67:8092:2030765910] rollback status: Rollbacked
2019-12-23 11:57:55.888 ERROR 23952 --- [.0-28888-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException] with root cause

​ 可以看到已被拦截也触发了rollback了.

​ 2.恢复代码调试正常情况:

    public Object seataCommit() {
testService.Commit();
return true;
}

​ 查看日志:

2019-12-23 12:00:20.876  INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController       : 拦截到需要分布式事务的方法,seataCommit
2019-12-23 12:00:20.919 INFO 23952 --- [.0-28888-exec-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [192.168.14.67:8092:2030765926]
2019-12-23 12:00:20.920 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 创建分布式事务完毕192.168.14.67:8092:2030765926
2019-12-23 12:00:21.078 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 方法执行结束:true
2019-12-23 12:00:21.078 INFO 23952 --- [.0-28888-exec-2] org.test.controller.TestController : 分布式事务Id:192.168.14.67:8092:2030765926
2019-12-23 12:00:21.213 INFO 23952 --- [.0-28888-exec-2] i.seata.tm.api.DefaultGlobalTransaction : [192.168.14.67:8092:2030765926] commit status: Committed

​ 可以看到事务已经被提交了.

总结

更详细的内容希望希望大家访问以下地址阅读详细文档

nacos官网

dubbo官网

seata官网

docker官网

· 阅读需 8 分钟

Seata 的动态降级需要结合配置中心的动态配置订阅功能。动态配置订阅,即通过配置中心监听订阅,根据需要读取已更新的缓存值,ZK、Apollo、Nacos 等第三方配置中心都有现成的监听器可实现动态刷新配置;动态降级,即通过动态更新指定配置参数值,使得 Seata 能够在运行过程中动态控制全局事务失效(目前只有 AT 模式有这个功能)。

那么 Seata 支持的多个配置中心是如何适配不同的动态配置订阅以及如何实现降级的呢?下面从源码的层面详细给大家讲解一番。

动态配置订阅

Seata 配置中心有一个监听器基准接口,它主要有一个抽象方法和 default 方法,如下:

io.seata.config.ConfigurationChangeListener

该监听器基准接口主要有两个实现类型:

  1. 实现注册配置订阅事件监听器:用于实现各种功能的动态配置订阅,比如 GlobalTransactionalInterceptor 实现了 ConfigurationChangeListener,根据动态配置订阅实现的动态降级功能;
  2. 实现配置中心动态订阅功能与适配:对于目前还没有动态订阅功能的 file 类型默认配置中心,可以实现该基准接口来实现动态配置订阅功能;对于阻塞订阅需要另起一个线程去执行,这时候可以实现该基准接口进行适配,还可以复用该基准接口的线程池;以及还有异步订阅,有订阅单个 key,有订阅多个 key 等等,我们都可以实现该基准接口以适配各个配置中心。

Nacos 动态订阅实现

Nacos 有自己内部实现的监听器,因此直接直接继承它内部抽象监听器 AbstractSharedListener,实现如下:

如上,

  • dataId:为订阅的配置属性;
  • listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑。

值得一提的是,nacos 并没有使用 ConfigurationChangeListener 实现自己的监听配置,一方面是因为 Nacos 本身已有监听订阅功能,不需要自己再去实现;另一方面因为 nacos 属于非阻塞式订阅,不需要复用 ConfigurationChangeListener 的线程池,即无需进行适配。

添加订阅:

Nacos 配置中心为某个 dataId 添加订阅的逻辑很简单,用 dataId 和 listener 创建一个 NacosListener 调用 configService#addListener 方法,把 NacosListener 作为 dataId 的监听器,dataId 就实现了动态配置订阅功能。

file 动态订阅实现

以它的实现类 FileListener 举例子,它的实现逻辑如下:

如上,

  • dataId:为订阅的配置属性;

  • listener:配置订阅事件监听器,用于将外部传入的 listener 作为一个 wrapper,执行真正的变更逻辑,这里特别需要注意的是,该监听器与 FileListener 同样实现了 ConfigurationChangeListener 接口,只不过 FileListener 是用于给 file 提供动态配置订阅功能,而 listener 用于执行配置订阅事件

  • executor:用于处理配置变更逻辑的线程池,在 ConfigurationChangeListener#onProcessEvent 方法中用到。

FileListener#onChangeEvent 方法的实现让 file 具备了动态配置订阅的功能,它的逻辑如下:

无限循环获取订阅的配置属性当前的值,从缓存中获取旧的值,判断是否有变更,如果有变更就执行外部传入 listener 的逻辑。

ConfigurationChangeEvent 用于保存配置变更的事件类,它的成员属性如下:

这里的 getConfig 方法是如何感知 file 配置的变更呢?我们点进去,发现它最终的逻辑如下:

发现它是创建一个 future 类,然后包装成一个 Runnable 放入线程池中异步执行,最后调用 get 方法阻塞获取值,那么我们继续往下看:

allowDynamicRefresh:动态刷新配置开关;

targetFileLastModified:file 最后更改的时间缓存。

以上逻辑:

获取 file 最后更新的时间值 tempLastModified,然后对比对比缓存值 targetFileLastModified,如果 tempLastModified > targetFileLastModified,说明期间配置有更改过,这时就重新加载 file 实例,替换掉旧的 fileConfig,使得后面的操作能够获取到最新的配置值。

添加一个配置属性监听器的逻辑如下:

configListenersMap 为 FileConfiguration 的配置监听器缓存,它的数据结构如下:

ConcurrentMap<String/*dataId*/, Set<ConfigurationChangeListener>> configListenersMap

从数据结构上可看出,每个配置属性可关联多个事件监听器。

最终执行 onProcessEvent 方法,这个是监听器基准接口里面的 default 方法,它会调用 onChangeEvent 方法,即最终会调用 FileListener 中的实现。

动态降级

有了以上的动态配置订阅功能,我们只需要实现 ConfigurationChangeListener 监听器,就可以做各种各种的功能,目前 Seata 只有动态降级有用到动态配置订阅的功能。

在「Seata AT 模式启动源码分析」这篇文章中讲到,Spring 集成 Seata 的项目中,在 AT 模式启动时,会用 用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法,GlobalTransactionalInterceptor 实现了 MethodInterceptor,最终会执行 invoker 方法,那么想要实现动态降级,就可以在这里做手脚。

  • 在 GlobalTransactionalInterceptor 中加入一个成员变量:
private volatile boolean disable; 

在构造函数中进行初始化赋值:

ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION(service.disableGlobalTransaction)这个参数目前有两个功能:

  1. 在启动时决定是否开启全局事务;
  2. 在开启全局事务后,决定是否降级。
  • 实现 ConfigurationChangeListener:

这里的逻辑简单,就是判断监听事件是否属于 ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION 配置属性,如果是,直接更新 disable 值。

  • 接下来在 GlobalTransactionalInterceptor#invoke 中做点手脚

如上,disable = true 时,不执行全局事务与全局锁。

  • 配置中心订阅降级监听器

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

在 Spring AOP 进行 wrap 逻辑过程中,当前配置中心将订阅降级事件监听器。

作者简介

张乘辉,目前就职于中通科技信息中心技术平台部,担任 Java 工程师,主要负责中通消息平台与全链路压测项目的研发,热爱分享技术,微信公众号「后端进阶」作者,技术博客(https://objcoding.com/)博主,Seata Contributor,GitHub ID:objcoding。