Skip to main content

· 6 min read

Seata can support multiple third-party configuration centres, so how is Seata compatible with so many configuration centres at the same time? Below I will give you a detailed introduction to the principle of Seata Configuration Centre implementation.

Configuration Centre Property Loading

In Seata Configuration Centre, there are two default configuration files:

!

file.conf is the default configuration properties, and registry.conf mainly stores third-party registry and configuration centre information, and has two main blocks:

registry {
# file, nacos, eureka, redis, zk, consul, etcd3, sofa
# ...
}

config {
# file, nacos , apollo, zk, consul, etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
}
file {
name = "file.conf"
}
# ...
}

The registry is the configuration attribute of the registry, which is not mentioned here, and the config is the value of the attribute of the configuration centre, which is of type file by default, i.e., it will load the attributes inside the local file.conf, and if the type is of any other type, it will load the value of the configuration attribute from the third-party configuration centre.

In the core directory of the config module, there is a configuration factory class ConfigurationFactory, which has the following structure:

!

You can see that there are some static constants for configuration:

REGISTRY_CONF_PREFIX, REGISTRY_CONF_SUFFIX: the name of the configuration file, the default configuration file type;

SYSTEM_PROPERTY_SEATA_CONFIG_NAME, ENV_SEATA_CONFIG_NAME, ENV_SYSTEM_KEY, ENV_PROPERTY_KEY: custom filename configuration variables, which also indicates that we can customise the configuration centre's property files.

There is a static code block inside ConfigurationFactory as follows:

io.seata.config.ConfigurationFactory

io.seata.config.ConfigurationFactory !

According to the custom file name configuration variable to find out the name and type of configuration file, if not configured, the default use registry.conf, FileConfiguration is the default configuration implementation class of Seata, if the default value, it will be more registry.conf configuration file to generate the FileConfiguration default configuration object, here you can also use the SPP configuration centre. Configuration object, here you can also use the SPI mechanism to support third-party extended configuration implementation, the specific implementation is to inherit the ExtConfigurationProvider interface, create a file in META-INF/services/ and fill in the full path name of the implementation class, as shown below:

!

Third-party configuration centre implementation class loading

After the static code block logic loads the configuration centre properties, how does Seata select the configuration centre and get the configuration centre property values?

As we just said FileConfiguration is the default configuration implementation class for Seata, it inherits from AbstractConfiguration, which has a base class Configuration and provides methods to get parameter values:

short getShort(String dataId, int defaultValue, long timeoutMills);
int getInt(String dataId, int defaultValue, long timeoutMills);
long getLong(String dataId, long defaultValue, long timeoutMills); int getInt(String dataId, int defaultValue, long timeoutMills); long getLong(String dataId, long defaultValue, long timeoutMills); //
// ....

So that means that all that is needed is for a third party configuration centre to implement this interface and integrate into the Seata Configuration Centre, I'll use zk as an example below:

First, the third-party configuration centre needs to implement a Provider class:

!

The provider method, as its name suggests, mainly outputs a specific Configuration implementation class.

So how do we get the corresponding third-party Configuration Centre implementation class based on the configuration?

In the Seata project, this is how to get a third-party Configuration Centre implementation:

Configuration CONFIG = ConfigurationFactory.getInstance(); ``java

In the getInstance() method the singleton pattern is mainly used to construct the configuration implementation class, which is constructed as follows:

io.seata.configuration.ConfigurationFactory#buildConfiguration:

!

First of all, the static code block in ConfigurationFactory gets the configuration centre used by the current environment from the CURRENT_FILE_INSTANCE created by registry.conf, which is of type File by default. We can also configure other third-party configuration centres in registry.conf. We can also configure other third-party configuration centers in registry.conf. Here, we also use the SPI mechanism to load the implementation class of the third-party configuration centre, the specific implementation is as follows:

!

As above, that is what I just said ZookeeperConfigurationProvider configuration implementation output class, let's take a look at this line of code:

EnhancedServiceLoader.load(ConfigurationProvider.class,Objects.requireNonNull(configType).name()).provide();
``

The EnhancedServiceLoader is the core class of the Seata SPI implementation, and this line of code loads the class names of the files in the `META-INF/services/` and `META-INF/seata/` directories, so what happens if more than one of these Configuration Centre implementation classes are loaded?

We notice that the ZookeeperConfigurationProvider class has an annotation above it:

```java
@LoadLevel(name = "ZK", order = 1)

When loading multiple Configuration Centre implementation classes, they are sorted according to order:

io.seata.common.loader.EnhancedServiceLoader#findAllExtensionClass:

!

io.seata.common.loader.EnhancedServiceLoader#loadFile:

!

In this way, there is no conflict.

But we find that Seata can also use this method for selection, and Seata passes a parameter when calling the load method:

Objects.requireNonNull(configType).name()

ConfigType is the configuration centre type, which is an enumerated class:

public enum ConfigType {
File, ZK, Nacos, Apollo, Consul, Etcd3, SpringCloudConfig, Custom.
}

We notice that there is also a name attribute on the LoadLevel annotation, which Seata also does when filtering implementation classes:

!

If the name is equal to LoadLevel's name attribute, then it is the currently configured third-party configuration centre implementation class.

Third-party configuration centre implementation class

ZookeeperConfiguration inherits AbstractConfiguration and has the following constructor:

!

The constructor creates a zkClient object, what is FILE_CONFIG here?

private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;

It turns out to be the registry.conf configuration implementation class created in the static code block, from which you get the properties of the third-party Configuration Centre, construct the third-party Configuration Centre client, and then implement the Configuration interface:

!

Then you can use the relevant methods of the client to get the corresponding parameter values from the third-party configuration.

Third-party configuration centre configuration synchronization script

I wrote it last weekend and submitted it to PR, it's still under review, and it's expected to be available in Seata 1.0, so please look forward to it.

It's located in the script directory of the Seata project:

!

config.txt is a locally configured value, after setting up the third-party configuration centre, running the script will sync the config.txt configuration to the third-party configuration centre.

Author's Bio

Zhang Chenghui, currently working in the technology platform department of the information centre of Zhongtong Technology, as a Java engineer, mainly responsible for the development of the Zhongtong messaging platform and the all-links pressure test project, loves to share technology, author of WeChat's public number "Backend Advancement", and technology blog (https://objcoding.com/) Blogger, Seata Contributor, GitHub ID: objcoding.

· 13 min read

Running the demo used project address

Author: FUNKYE (Chen Jianbin), Hangzhou, an Internet company main program.

Preface

Seata configuration for direct connection blog

Seata Integration with Nacos Configuration blog

Let's go back to the basics of the previous posts to configure nacos as a configuration centre and dubbo registry.

Preparation

  1. Install docker
yum -y install docker
  1. Create the nacos and seata databases.
/******************************************/
/* Full database name = nacos */
/* Table name = config_info */
/******************************************/
CREATE TABLE `config_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id', `data_id` v
`data_id` varchar(255) NOT NULL COMMENT 'data_id', `group_id` varchar(255) AUTO_INCREMENT COMMENT
`group_id` varchar(255) DEFAULT NULL, `content` longtext NOT NULL
`content` longtext NOT NULL COMMENT 'content', `md5` varchar(255)
`md5` varchar(32) DEFAULT NULL COMMENT 'md5', `gmt_create` longtext NOT NULL COMMENT
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Creation time',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Modified', `src_user` datetime NOT NULL
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip', `app_name` varchar(20) DEFAULT NULL COMMENT '2010-05-05 00:00:00' COMMENT
`app_name` varchar(128) DEFAULT NULL, `tenant_id` varchar(20)
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant field',
`c_desc` varchar(256) DEFAULT NULL,
`c_use` varchar(64) DEFAULT NULL, `c_desc` varchar(256) DEFAULT
`effect` varchar(64) DEFAULT NULL,
`type` varchar(64) DEFAULT NULL,
`c_schema` text, `c_schema` text, `c_schema` text
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info';

/******************************************/
/* Full database name = nacos_config */
/* Table name = config_info_aggr */
/******************************************/
CREATE TABLE `config_info_aggr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id', `group_id` varchar(255) AUTO_INCREMENT COMMENT
`group_id` varchar(255) NOT NULL COMMENT 'group_id', `datum_id` varchar(255) NOT NULL COMMENT
`datum_id` varchar(255) NOT NULL COMMENT 'datum_id', `content` longtext NOT NULL COMMENT 'data_id', `group_id` varchar(255)
`content` longtext NOT NULL COMMENT '内容',
`gmt_modified` datetime NOT NULL COMMENT 'modification time', `app_name` varchar(255) NOT NULL COMMENT
`app_name` varchar(128) DEFAULT NULL, `tenant_id` varchar(128) COMMENT
`tenant_id` varchar(128) DEFAULT '' COMMENT 'Tenant field',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='Add tenant field';


/******************************************/
/* Full database name = nacos_config */
/* Table name = config_info_beta */
/******************************************/
CREATE TABLE `config_info_beta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id', `group_id` varchar(255) AUTO_INCREMENT COMMENT
`group_id` varchar(128) NOT NULL COMMENT 'group_id', `app_name` varchar(255) NOT NULL COMMENT
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name', `content` longtext NOT NULL
`content` longtext NOT NULL COMMENT 'content', `beta_ips` varchar(128)
`beta_ips` varchar(1024) DEFAULT NULL COMMENT 'betaIps', `md5` varchar(1024) DEFAULT NULL COMMENT
`md5` varchar(32) DEFAULT NULL COMMENT 'md5', `gmt_create` varchar(1024) DEFAULT NULL COMMENT
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Creation Time',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Modified', `src_user` datetime NOT NULL
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip', `tenant_id` varchar(20) DEFAULT NULL COMMENT '2010-05-05 00:00:00' COMMENT
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant field',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_beta';

/******************************************/
/* Full database name = nacos_config */
/* Table name = config_info_tag */
/******************************************/
CREATE TABLE `config_info_tag` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id', `group_id` varchar(255) AUTO_INCREMENT COMMENT
`group_id` varchar(128) NOT NULL COMMENT 'group_id', `tenant_id` varchar(255) NOT NULL COMMENT
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id', `tag_id` varchar(128) DEFAULT
`tag_id` varchar(128) NOT NULL COMMENT 'tag_id', `app_name` varchar(128) DEFAULT '' COMMENT
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content', `md5` varchar(128) DEFAULT NULL COMMENT
`md5` varchar(32) DEFAULT NULL COMMENT 'md5', `gmt_create
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Creation time',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Modified', `src_user` datetime NOT NULL
`src_user` text COMMENT 'source user',
`src_ip` varchar(20) DEFAULT NULL COMMENT 'source ip', `src_user` text COMMENT
PRIMARY KEY (`id`), UNIQUE KEY `src_ip` varchar(20)
UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_tag';

/******************************************/
/* Full database name = nacos_config */
/* Table name = config_tags_relation */
/******************************************/
CREATE TABLE `config_tags_relation` (
`id` bigint(20) NOT NULL COMMENT 'id', `tag_name` v
`tag_name` varchar(128) NOT NULL COMMENT 'tag_name', `tag_type` varchar(20) NOT NULL COMMENT
`tag_type` varchar(64) DEFAULT NULL COMMENT 'tag_type',
`data_id` varchar(255) NOT NULL COMMENT 'data_id', `group_id` varchar(255) DEFAULT COMMENT
`group_id` varchar(128) NOT NULL COMMENT 'group_id', `tenant_id` varchar(255) NOT NULL COMMENT
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id', `nid` bigint(128) NOT NULL COMMENT
`nid` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`nid`),
UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_tag_relation';

/******************************************/
/* Full database name = nacos_config */
/* Table name = group_capacity */
/******************************************/
CREATE TABLE `group_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Primary key ID',
`group_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Group ID, null character indicates entire cluster', `quota` int(10)
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Quota, a 0 indicates that the default value is being used',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Usage',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Individual configuration size limit in bytes, 0 means use the default',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Maximum number of aggregate subconfigurations,, 0 means use default',, `max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0'
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Maximum subconfiguration size in bytes for a single aggregated data,, 0 means use default',, `max_history_size` int(10) unsigned NOT NULL DEFAULT '0'
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Maximum number of change history counts',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Creation Time',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Modified time',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_id` (`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='Cluster, Capacity Information Table by Group';

/******************************************/
/* Full database name = nacos_config */
/* Table name = his_config_info */
/******************************************/
CREATE TABLE `his_config_info` (
`id` bigint(64) unsigned NOT NULL, `nid` bigint(64) unsigned NOT NULL, `his_config_info
`nid` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `data_id` v
`data_id` varchar(255) NOT NULL, `group_id` varchar(255) NOT NULL, `group_id` varchar(255) NOT NULL, `group_id` varchar(255) NOT NULL
`group_id` varchar(128) NOT NULL, `app_name` varchar(255)
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL, `md5` varchar(128) DEFAULT NULL
`md5` varchar(32) DEFAULT NULL, `gmt_create
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00', `src_user` datetime NOT NULL
`src_user` text, `src_ip` datetime NOT NULL
`src_ip` varchar(20) DEFAULT NULL, `op_type` char(20) DEFAULT
`op_type` char(10) DEFAULT NULL, `tenant_id` varchar(20)
`tenant_id` varchar(128) DEFAULT '' COMMENT 'Tenant field',
PRIMARY KEY (`nid`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_did` (`data_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='Multi-Tenant Transformation';


/******************************************/
/* Full database name = nacos_config */
/* Table name = tenant_capacity */
/******************************************/
CREATE TABLE `tenant_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Primary key ID', `tenant_id` v
`tenant_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Tenant ID',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Quota, 0 means use the default value',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Usage',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Individual configuration size limit in bytes, 0 means use the default',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Maximum number of aggregated sub-configurations',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Maximum subconfiguration size in bytes for a single aggregation data, 0 means use default',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT 'Maximum number of change history counts',
`gmt_create` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Creation time',
`gmt_modified` datetime NOT NULL DEFAULT '2010-05-05 00:00:00' COMMENT 'Modified time',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='Tenant capacity information table';


CREATE TABLE `tenant_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`kp` varchar(128) NOT NULL COMMENT 'kp', `tenant_id` varchar(20) AUTO_INCREMENT COMMENT
`tenant_id` varchar(128) default '' COMMENT 'tenant_id',
`tenant_name` varchar(128) default '' COMMENT 'tenant_name',
`tenant_desc` varchar(256) DEFAULT NULL COMMENT 'tenant_desc', `create_source` varchar(256)
`create_source` varchar(32) DEFAULT NULL COMMENT 'create_source', `gmt_create` varchar(256) DEFAULT NULL COMMENT
`gmt_create` bigint(20) NOT NULL COMMENT 'create_time', `gmt_modify` varchar(32) DEFAULT NULL COMMENT
`gmt_modified` bigint(20) NOT NULL COMMENT 'modified_time', `gmt_modified` bigint(20) NOT NULL COMMENT
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='tenant_info';

CREATE TABLE users (
username varchar(50) NOT NULL PRIMARY KEY, password varchar(500) NOT NULL
password varchar(500) NOT NULL,
enabled boolean NOT NULL
NULL, enabled boolean NOT NULL ); CREATE TABLE users

CREATE TABLE roles (
username varchar(50) NOT NULL, role varchar(50) NOT NULL, enabled boolean NOT NULL ); CREATE TABLE
role varchar(50) NOT NULL
); INSERT INTO users (username varchar(50) NOT NULL, role varchar(50) NOT NULL ); CREATE TABLE

INSERT INTO users (username, password, enabled) VALUES ('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);; CREATE TABLE roles ( username, role)

INSERT INTO roles (username, role) VALUES ('nacos', 'ROLE_ADMIN');

-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL, `transaction_id` BARCHAR(128)
`transaction_id` BIGINT, `status` TINYL
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32), `transaction_service
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000), `gmt_create
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8.

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL, `xid` VARCHARGE
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32), `resource_id` VARCHAR(32), `transaction_id` BIGINT
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8), `status` TINYINT
`status` TINYINT,
`client_id` VARCHAR(64), `application_data` TINYINT, `client_id` VARCHAR(64), `application_data` TINYINT
`application_data` VARCHAR(2000), `gmt_create
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`), `branch_id`, `idx_x
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8; -- the table to store lock data.

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(128) NOT NULL, -- the table to store lock data
`xid` VARCHAR(128),
`transaction_id` BIGINT, `branch_id` BIGINT, `branch_id` BIGINT
`branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(128)
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36), `gmt_create` VARCHAR(256), `gmt_create
`gmt_create` DATETIME, `gmt_modify` VARCHAR(256), `pk` VARCHAR(36), `gmt_create` DATETIME
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8.

  1. Pull the nacos and seata mirrors and run them.
docker run -d --name nacos -p 8848:8848 -e MODE=standalone -e MYSQL_MASTER_SERVICE_HOST=your mysql ip -e MYSQL_MASTER_SERVICE_DB_NAME=nacos -e MYSQL_MASTER_SERVICE_USER=root -e MYSQL_MASTER_SERVICE_PASSWORD=mysql password -e MYSQL_SLAVE_SERVICE_HOST=your mysql ip -e SPRING_DATASOURCE_PLATFORM=mysql PLATFORM=mysql -e MYSQL_DATABASE_NUM=1 nacos/nacos-server:latest
docker run -d --name seata -p 8091:8091 -e SEATA_IP=the ip you want to specify -e SEATA_PORT=8091 seataio/seata-server:1.4.2

Seata Configuration

  1. Since there is no built-in vim in the seata container, we can directly cp the folder to the host and then cp it to go back.
docker cp container id:seata-server/resources The directory you want to place the folder in.
  1. Get the ip addresses of the two containers using the following code
docker inspect --format='{{.NetworkSettings.IPAddress}}' ID/NAMES
  1. nacos-config.txt is edited as follows
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.Your transaction group name=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.
store.db.url=jdbc:mysql://your mysql host ip:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=mysql account
store.db.password=mysql password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialisation=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialisation=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

Click here for detailed parameter configurations.

  1. registry.conf is edited as follows
registry {
# file, nacos, eureka, redis, zk, consul, etcd3, sofa
type = "nacos"

nacos {
serverAddr = "nacos container ip:8848"
namespace = ""
cluster = "default"
}
}

config {
# file, nacos, apollo, zk, consul, etcd3
type = "nacos"

nacos {
serverAddr = "nacos container ip:8848"
namespace = ""
}
}
  1. After the configuration is complete, use the following command to copy the modified registry.conf to the container, and reboot to view the logs running
docker cp /home/seata/resources/registry.conf seata:seata-server/resources/
docker restart seata
docker logs -f seata
  1. Run nacos-config.sh to import the Nacos configuration.

eg: sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca - u username -w password u username -w password

Refer to Configuration Import Instructions for specific parameter definitions.

  1. Log in to the nacos control centre to see

! 20191202205912

As shown in the picture is successful.

Debugging

  1. Pull the project shown in the blog post and modify the application.yml and registry.conf of test-service.
registry {
type = "nacos"
nacos {
serverAddr = "host ip:8848"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "host ip:8848"
namespace = ""
cluster = "default"
}
}

server.
port: 38888
spring.
name: test-service
name: test-service
datasource: type: com.alibaba.druid.pool.
type: com.alibaba.druid.pool.
url: jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.
driver-class-name: com.mysql.cj.jdbc.
driver-class-name: com.mysql.cj.jdbc.driver username: root
driver-class-name: com mysql.cj.jdbc.
driver-class-name: com mysql.cj.jdbc.
threadpool: cached
scan.
base-packages: com.example
application: qos-enable: false
qos-enable: false
name: testserver
registry: id: my-registry
id: my-registry
address: nacos://host ip:8848
mybatis-plus.
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config.
db-config.
field-strategy: not-empty
db-config: field-strategy: not-empty
db-type: mysql
configuration: map-underscore-to-camel-case: true
map-underscore-to-camel-case: true
cache-enabled: true
log-impl: org.apache.ibatis.logging.stdout.
auto-mapping-unknown-column-behavior: none
  1. Copy the modified registry.conf to test-client-resources, and modify the application
spring: application.
application: name: test
name: test
datasource: driver-class-name: com.mysql.
driver-class-name: com.mysql.cj.jdbc.
url: jdbc:mysql://mysqlIp:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc.
servlet.
load-on-startup: 1
http.
http: http: encoding: http: encoding: http: force: true
force: true
charset: utf-8
enabled: true
multipart: max-file-size: 10MB
max-file-size: 10MB
max-request-size: 10MB
dubbo.
dubbo: registry: id: my-registry
id: my-registry
address: nacos://host ip:8848
application.
name: dubbo-demo-client
qos-enable: false
server: name: dubbo-demo-client qos-enable: false
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat: max-http-post-size: 104857600
max-http-post-size: 104857600
  1. Execute the undo_log script on each db involved.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id', `xid` VARCHARCHARCHARCHARCHARCHARCHARCHARCHARGE
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialisation', `rollback_info` VARCHAR(128) NOT NULL COMMENT
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INTRODUCTION
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defence status', `log_created` DAT
`log_created` DATETIME(6) NOT NULL COMMENT 'creation datetime', `log_modified` DATETIME(6) NOT NULL COMMENT
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime', `log_modified` DATETIME(6) NOT NULL COMMENT
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
  1. Run test-service,test-client in that order.

  2. See if the list of services in nacos is as shown below.

! 20191203132351

Summary

The docker deployment of nacos and seata has been completed, for more details I would like you to visit the following address to read the detailed documentation

nacos official website

dubbo official website

seata website

docker official website

· 6 min read

Project address

This article was written by FUNKYE (Chen Jianbin), the main programme of an Internet company in Hangzhou.

Preface

The last release of the direct connection method of seata configuration, you can see the details of this blog

We then go on the basis of the previous article to configure nacos to do configuration centre and dubbo registry.

Preparation

  1. First of all, go to the nacos github to download the latest version

!

  1. after the download, very simple, unzip to the bin directory to start on it, see as shown in the picture on it:

!

  1. start finished visit:http://127.0.0.1:8848/nacos/#/login

!

Did you see this interface? Enter nacos (account password is the same), go in and take a look.

At this time you can find that there is no service registration

! 20191202204147

Don't worry, let's get the seata service connected.

Seata configuration

  1. Go to seata's conf folder and see this ?

See this folder?

That's it, edit it: !

! 20191202204353

! 20191202204437

  1. Then remember to save it! Next we open the registry.conf file to edit it:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "localhost"
namespace = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}

After all the editing, we run nacos-config.sh, and the content of our configured nacos-config.txt is sent to nacos as shown in the figure:

! 20191202205743

The appearance of the above similar code is an indication of success, then we log in to the nacos configuration centre to view the configuration list, the appearance of the list as shown in the figure shows that the configuration is successful:

! 20191202205912

see it, your configuration has all been committed, if then git tool run sh does not work, try to edit the sh file, try to change the operation to the following

for line in $(cat nacos-config.txt)

do

key=${line%%=*}
value=${line#*=}
echo "\r\n set "${key}" = "${value}

result=`curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=$key&group=SEATA_GROUP&content=$value"`

if [ "$result"x == "true"x ]; then

echo "\033[42;37m $result \033[0m"

else

echo "\033[41;37 $result \033[0m"
let error++

fi

done


if [ $error -eq 0 ]; then

echo "\r\n\033[42;37m init nacos config finished, please start seata-server. \033[0m"

else

echo "\r\n\033[41;33m init nacos config fail. \033[0m"

fi
  1. At present, our preparations are all complete, we go to seata-service/bin to run the seata service it, as shown in the figure on the success!

! 20191202210112

Debugging

  1. first springboot-dubbo-mybatsiplus-seata project pom dependency changes, remove zk these configurations, because we use nacos to do the registry.
   <properties>
<webVersion>3.1</webVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<HikariCP.version>3.2.0</HikariCP.version>
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-nacos</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<!-- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency>
<!-- mybatis-plus end -->
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.4</version> </dependency> -->

<!-- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<!-- 加上这个才能辨认到log4j2.yml文件 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency> <!-- 引入log4j2依赖 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

  1. Then change the directory structure of test-service, delete the configuration of zk and change the application.yml file, directory structure and code.
server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: nacos://127.0.0.1:8848
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none
20191202211833

3.then change the registry.conf file, if your nacos is another server, please change it to the corresponding ip and port.

 registry {
type = "nacos"
file { name = "file.conf
name = "file.conf"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
file { name = "file.conf
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
  1. Next, we run provideApplication

! 20191202212000

The startup is successful, and we look at the seata logs: !

[20191202212028 20191202212028

Success, this time we are the same, to modify the contents of the test-client, first of all the same application.yml, zk replaced by nacos, here will not describe in detail, the test-service within the registry.conf, copy to the client project resources to cover the original registry.conf.

Then we can run clientApplication: !

! 20191202212114

  1. Confirm that the service has been published and test that the transaction is running correctly

! 20191202212203

The service is successfully published and consumed. Now let's go back to swagger and test the rollback to see if everything is ok, visit http://127.0.0.1:28888/swagger-ui.html

! 20191202212240

Congratulations, see this must be as successful as me!

Summary

About the use of nacos and seata simple build has been completed, more detailed content hope you visit the following address to read the detailed documentation

nacos official website

dubbo official website

seata official website

· One min read

Event Introduction

Highlight Interpretation

Guest Speakers

  • Ji Min (Qing Ming) "Seata Past, Present, and Future" slides

  • Wu Jiangke "My Open Source Journey with SEATA and SEATA's Application in Internet Healthcare Systems" slides

    1577282651

  • Shen Haiqiang (Xuan Yi) "Essence of Seata AT Mode" slides

    1577282652

  • Zhang Sen "Detailed Explanation of TCC Mode in Distributed Transaction Seata"

    1577282653

  • Chen Long (Yiyuan) "Seata Long Transaction Solution Saga Mode"

    1577282654

  • Chen Pengzhi "Seata Practice in Didi Chuxing's Motorcycle Business" slides

    1577282655

Special Awards

· 11 min read

Project address: https://gitee.com/itCjb/springboot-dubbo-mybatisplus-seata

Author: FUNKYE (Chen Jianbin), Hangzhou, an Internet company programmer.

Introduction

Mybatis-Plus: MyBatis-Plus (MP for short) is an MyBatis enhancement tool in the MyBatis on the basis of only enhancements do not change , in order to simplify the development , improve efficiency and born .

MP configuration:

<bean id="sqlSessionFactory" class="com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
</bean

Seata: Seata is an open source distributed transaction solution , is committed to providing high-performance and easy to use distributed transaction services . Seata will provide users with AT, TCC, SAGA and XA transaction patterns , to create a one-stop distributed solution for users .

AT mode mechanism:

  • Phase I: Business data and rollback log records are committed in the same local transaction, releasing local locks and connection resources.
  • Phase II:
  • Commit asynchronised and completed very quickly.
  • Rollbacks are back-compensated by the phase 1 rollback log.

Analyse the causes

  1. First of all, through the introduction, we can see that mp is required to register the sqlSessionFactory and inject the data source, while Seata is to ensure the normal rollback and commit of the transaction through the proxy data source.

  2. Let's look at the SeataAutoConfig code based on the official Seata demo.

package org.test.config;

import javax.sql.DataSource;

import org.apache.ibatis.session.
import org.slf4j.Logger; import org.slf4j.
import org.slf4j.LoggerFactory; import org.springframework.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.autoconfigure.jdbc.
import org.springframework.context.annotation.
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.
import org.springframework.context.annotation.

import com.alibaba.druid.pool.DruidDataSource; import com.baomidou.pool.
import com.alibaba.druid.pool.DruidDataSource; import com.baomidou.mybatisplus.extension.spring.

import io.seata.rm.datasource.DataSourceProxy; import io.seata.rm.datasource.
import io.seata.rm.datasource.DataSourceProxy; import io.seata.spring.annotation.

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties; private final static Logger logger; @Autowired(required = true)
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "dataSource") // Declare it as a bean instance.
@Primary // In the same DataSource, first use the labelled DataSource
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}",dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0); druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false); druidDataSource.
druidDataSource.setTestWhileIdle(true); druidDataSource.
druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.
druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("Loading dataSource ........") ;
return druidDataSource;
}

/**
* init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
logger.info("Proxy dataSource ........") ;
return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
MybatisSqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver()); factory.setMapperLocations(new PathMatchingResourcePatternResolver())
.getResources("classpath*:/mapper/*.xml")); factory.setMapperLocations(new PathMatchingResourcePatternResolver())
return factory.getObject();
}

/**
* init global transaction scanner
* @Return: GlobalTransactionScanner
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("Configuring seata........") ;
return new GlobalTransactionScanner("test-service", "test-group");
}
}

First of all, we see that in our seata configuration datasource class, we have configured a datasource, and then we have configured a seata proxy datasource bean, and this time.

Then we if we directly start the mp integration seata project will find that paging and other plug-ins will be directly invalid , even scanning mapper have to write from the code , this is why?

By reading the above code, because we have another configuration of a sqlSessionFactory, resulting in mp's sqlSessionFactory failure, this time we found the problem, even if we do not configure sqlSessionFactoryl, but also because of the mp data source used is not seata proxy After the data source used by mp is not proxied by seata, resulting in distributed transaction failure. But how to solve this problem?

We need to read the source code of mp and find its startup class.

/* /* /* /* /* /* /* /*
* Copyright (c) 2011-2020, baomidou (jobob@qq.com).
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the Licence. You may obtain a copy of * the Licence at
* the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software * distributed under the Licence is distributed on an "AS IS" BASIS.
* distributed under the Licence is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either expressed or implied.
* Licence for the specific language governing permissions and limitations under
* the Licence.
*/
package com.baomidou.mybatisplus.autoconfigure;


import com.baomidou.mybatisplus.core.MybatisConfiguration; import com.baomidou.mybatisplus.core.config.
import com.baomidou.mybatisplus.core.config.GlobalConfig; import com.baomidou.mybatisplus.core.
import com.baomidou.mybatisplus.core.handlers.
import com.baomidou.mybatisplus.core.incrementer.IKeyGenerator; import com.baomidou.mybatisplus.core.
import com.baomidou.mybatisplus.core.injector.ISqlInjector; import com.baomidou.mybatisplus.core.injector.
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import org.apache.ibache.
import org.apache.ibatis.annotations.
import org.apache.ibatis.mapping.DatabaseIdProvider; import org.apache.ibatis.mapping.
import org.apache.ibatis.mapping.DatabaseIdProvider; import org.apache.ibatis.plugin.
import org.apache.ibatis.scripting.LanguageDriver; import org.apache.ibatis.scripting.
import org.apache.ibatis.scripting.LanguageDriver; import org.apache.ibatis.session.
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.type.
import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.
import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.
import org.mybatis.spring.mapper.MapperFactoryBean; import org.mybatis.spring.mapper.
import org.mybatis.spring.mapper.MapperScannerConfigurer; import org.mybatis.spring.mapper.
import org.slf4j.Logger; import org.slf4j.
import org.slf4j.LoggerFactory; import org.springframework.
import org.springframework.beans.BeanWrapper; import org.springframework.beans.
import org.springframework.beans.BeanWrapperImpl; import org.springframework.beans.
import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.
import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.
import org.springframework.beans.factory.InitialisingBean; import org.springframework.beans.factory.
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.
import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.
import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.
import org.springframework.boot.autoconfigure.AutoConfigurationPackages; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.condition.
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.
import org.springframework.context.ApplicationContext; import org.springframework.context.
import org.springframework.context.annotation.
import org.springframework.context.annotation.
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.context.annotation.
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.core.io.
import org.springframework.core.io.ResourceLoader; import org.springframework.core.io.
import org.springframework.core.type.AnnotationMetadata; import org.springframework.core.io.
import org.springframework.core.type.AnnotationMetadata; import org.springframework.util.
import org.springframework.util.CollectionUtils; import org.springframework.util.
import org.springframework.util.ObjectUtils; import org.springframework.util.
import org.springframework.util.StringUtils; import org.springframework.util.

import javax.sql.DataSource; import java.util.
import java.util.List; import java.util.
import java.util.Optional; import java.util.
import java.util.stream.

/**
* {@link EnableAutoConfiguration Auto-Configuration} for Mybatis. Contributes a
* {@link SqlSessionFactory} and a {@link SqlSessionTemplate}.
* <p>
* If {@link org.mybatis.spring.annotation.MapperScan} is used, or a
* configuration file is specified as a property, those will be considered, * otherwise this auto-configuration will be considered.
* otherwise this auto-configuration will attempt to register mappers based on
* the interface definitions in or under the root auto-configuration package.
* </p
* <p> copy from {@link org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration}</p>
* @author Eddú Melén
* @author Eddú Meléndez
* @author Josh Long
* @author Kazuki Shimizu
* @author Eduardo Macarrón
*/
@Configuration
@ConditionalOnClass({SqlSessionFactory.class, SqlSessionFactoryBean.class})
@ConditionalOnSingleCandidate(DataSource.class)
@EnableConfigurationProperties(MybatisPlusProperties.class)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class MybatisPlusAutoConfiguration implements InitialisingBean {

private static final Logger logger = LoggerFactory.getLogger(MybatisPlusAutoConfiguration.class);

private final MybatisPlusProperties properties.

private final Interceptor[] interceptors; private final

private final TypeHandler[] typeHandlers; private final MybatisPlusProperties properties; private final

private final LanguageDriver[] languageDrivers.

private final ResourceLoader resourceLoader;

private final DatabaseIdProvider databaseIdProvider; private final

private final List<ConfigurationCustomizer> configurationCustomizers; private final List<ConfigurationCustomizer> configurationCustomizers.

private final List<MybatisPlusPropertiesCustomizer> mybatisPlusPropertiesCustomizers;

private final ApplicationContext applicationContext;


public MybatisPlusAutoConfiguration(MybatisPlusProperties properties, MybatisPlusPropertiesCustomizers)
ObjectProvider<Interceptor[]> interceptorsProvider, ObjectProvider<TypeHandler[]> interceptorsProvider, MybatisPlusAutoConfiguration(MybatisPlusProperties)
ObjectProvider<TypeHandler[]> typeHandlersProvider, ObjectProvider<LanguageProvider
ObjectProvider<LanguageDriver[]> languageDriversProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider,
ApplicationContext applicationContext) {
this.properties = properties; this.interceptors = interceptors
this.interceptors = interceptorsProvider.getIfAvailable();
this.typeHandlers = typeHandlersProvider.getIfAvailable(); this.
this.languageDrivers = languageDriversProvider.getIfAvailable(); this.
this.resourceLoader = resourceLoader; this.databaseIdProvider.getIfAvailable()
this.databaseIdProvider = databaseIdProvider.getIfAvailable(); this.
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable(); this.
this.mybatisPlusPropertiesCustomizers = mybatisPlusPropertiesCustomizerProvider.getIfAvailable(); this.
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() {
if (!CollectionUtils.isEmpty(mybatisPlusPropertiesCustomizers)) {
mybatisPlusPropertiesCustomizers.forEach(i -> i.customise(properties));
}
checkConfigFileExists();
}

private void checkConfigFileExists() {
if (this.properties.isCheckConfigLocation() && StringUtils.hasText(this.properties.getConfigLocation())) {
Resource resource = this.resourceLoader.getResource(this.properties.getConfigLocation());
Assert.state(resource.exists(),
"Cannot find config location: " + resource + " (please add config file or check your Mybatis configuration)");
}
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// TODO uses MybatisSqlSessionFactoryBean instead of SqlSessionFactoryBean.
MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
factory.setDataSource(dataSource); factory.setVfs(SpringBean); factory.setVfs(SpringBean)
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation())); }
}
applyConfiguration(factory).
if (this.properties.getConfigurationProperties() ! = null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors); }
}
if (this.databaseIdProvider ! = null) {
factory.setDatabaseIdProvider(this.databaseIdProvider); }
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage()); }
}
if (this.properties.getTypeAliasesSuperType() ! = null) {
factory.setTypeAliasesSuperType(this.properties.getTypeAliasesSuperType()); }
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage()); }
}
if (!ObjectUtils.isEmpty(this.typeHandlers)) {
factory.setTypeHandlers(this.typeHandlers); }
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations()); }
}

// TODO makes some changes to the source code (because it adapts to an older version of mybatis, but we don't need to).
Class<? extends LanguageDriver> defaultLanguageDriver = this.properties.getDefaultScriptingLanguageDriver(); if (!
if (!ObjectUtils.isEmpty(this.languageDrivers)) {
factory.setScriptingLanguageDrivers(this.languageDrivers); }
}
Optional.ofNullable(defaultLanguageDriver).ifPresent(factory::setDefaultScriptingLanguageDriver);

// TODO custom enum package
if (StringUtils.hasLength(this.properties.getTypeEnumsPackage())) {
factory.setTypeEnumsPackage(this.properties.getTypeEnumsPackage());
}
// TODO This must be non-NULL.
GlobalConfig globalConfig = this.properties.getGlobalConfig(); // TODO inject the filler.
// TODO inject the filler
if (this.applicationContext.getBeanNamesForType(MetaObjectHandler.class,
false, false).length > 0) {
MetaObjectHandler metaObjectHandler = this.applicationContext.getBean(MetaObjectHandler.class);
globalConfig.setMetaObjectHandler(metaObjectHandler);
}
// TODO inject the primary key generator
if (this.applicationContext.getBeanNamesForType(IKeyGenerator.class, false
false).length > 0) {
IKeyGenerator keyGenerator = this.applicationContext.getBean(IKeyGenerator.class);
globalConfig.getDbConfig().setKeyGenerator(keyGenerator);
}
// TODO injecting the sql injector
if (this.applicationContext.getBeanNamesForType(ISqlInjector.class, false,
false).length > 0) {
ISqlInjector iSqlInjector = this.applicationContext.getBean(ISqlInjector.class);
globalConfig.setSqlInjector(iSqlInjector);
}
// TODO set GlobalConfig to MybatisSqlSessionFactoryBean
factory.setGlobalConfig(globalConfig); return factory.getObject(MybatisSqlSessionFactoryBean); }
factory.setGlobalConfig(globalConfig); return factory.getObject();
}

// TODO entry using MybatisSqlSessionFactoryBean.
private void applyConfiguration(MybatisSqlSessionFactoryBean factory) {
// TODO using MybatisConfiguration
MybatisConfiguration configuration = this.properties.getConfiguration(); if (configuration == null & null); }
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation()) {
configuration = new MybatisConfiguration();
}
if (configuration ! = null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration); }
}

@Bean
@ConditionalOnMissingBean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType(); if (executorType !
if (executorType ! = null) {
return new SqlSessionTemplate(sqlSessionFactory, executorType); if (executorType !
} else {
return new SqlSessionTemplate(sqlSessionFactory); } else { new SqlSessionTemplate(sqlSessionFactory); }
}
}

/**} }
* This will just scan the same base package as Spring Boot does. If you want more power, you can explicitly use
* {@link org.mybatis.spring.annotation.MapperScan} but this will get typed mappers working correctly, out-of-the-box, * similar to using Spring Data JPA repositories.
* similar to using Spring Data JPA repositories.
*/
public static class AutoConfiguredMapperScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar {

private BeanFactory beanFactory;

private BeanFactory beanFactory; @Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

if (!AutoConfigurationPackages.has(this.beanFactory)) {
logger.debug("Could not determine auto-configuration package, automatic mapper scanning disabled."); return; { if (!AutoConfigurationPackages.has(this.beanFactory)) { if (!
return;
}

logger.debug("Searching for mappers annotated with @Mapper");

List<String> packages = AutoConfigurationPackages.get(this.beanFactory);
if (logger.isDebugEnabled()) {
packages.forEach(pkg -> logger.debug("Using auto-configuration base package '{}'", pkg));
}

BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MapperScannerConfigurer.class);
builder.addPropertyValue("ProcessPropertyPlaceHolders", true);
builder.addPropertyValue("annotationClass", Mapper.class); builder.addPropertyValue("processPropertyPlaceHolders", true);
builder.addPropertyValue("basePackage", StringUtils.collectionToCommaDelimitedString(packages));
BeanWrapper beanWrapper = new BeanWrapperImpl(MapperScannerConfigurer.class);
Stream.of(beanWrapper.getPropertyDescriptors())
// Need to mybatis-spring 2.0.2+
.filter(x -> x.getName().equals("lazyInitialisation")).findAny()
.ifPresent(x -> builder.addPropertyValue("lazyInitialization", "${mybatis.lazy-initialization:false}"));
registry.registerBeanDefinition(MapperScannerConfigurer.class.getName(), builder.getBeanDefinition());
}

@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory; } @Override public void setBeanFactory(beanFactory) { this.
}
}

/**
* If mapper registering configuration or mapper scanning configuration not present, this configuration allow to scan
* mappers based on the same component-scanning path as Spring Boot itself.
*/
@Configuration
@Import(AutoConfiguredMapperScannerRegistrar.class)
@ConditionalOnMissingBean({MapperFactoryBean.class, MapperScannerConfigurer.class})
public static class MapperScannerRegistrarNotFoundConfiguration implements InitialisingBean {

public void afterPropertiesSet
public void afterPropertiesSet() {
logger.debug(
"Not found configuration for registering mapper bean using @MapperScan, MapperFactoryBean and MapperScannerConfigurer.");
}
}
}

See the sqlSessionFactory method in the mp startup class, it injects a data source in the same way, at this point you should know the solution, right?

That's right, is to be proxied to the data source to the mp sqlSessionFactory.

It's very simple, we need to slightly change our seata configuration class on the line

package org.test.config; import javax.sql.

import javax.sql.DataSource; import org.mybatis.

import org.mybatis.spring.annotation.
import org.slf4j.Logger; import org.slf4j.
import org.slf4j.LoggerFactory; import org.springframework.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.autoconfigure.jdbc.
import org.springframework.context.annotation.
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.
import org.springframework.context.annotation.

import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.

import io.seata.rm.datasource.DataSourceProxy; import io.seata.rm.datasource.
import io.seata.spring.annotation.GlobalTransactionScanner; import io.seata.rm.datasource.

@Configuration
@MapperScan("com.baomidou.springboot.mapper*")
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties; private final static Logger logger; @Autowired(required = true)
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);
private DataSourceProxy dataSourceProxy;

@Bean(name = "dataSource") // Declare it as a bean instance.
@Primary // In the same DataSource, the labelled DataSource is used first
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0); druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false); druidDataSource.
druidDataSource.setTestWhileIdle(true); druidDataSource.
druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.
druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("Loading dataSource ........") ;
dataSourceProxy = new DataSourceProxy(druidDataSource);
return dataSourceProxy;
}

/**
* init datasource proxy
} /** * init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean
public DataSourceProxy dataSourceProxy() {
logger.info("Proxy dataSource ........") ;
return dataSourceProxy;
}

/**
* init global transaction scanner
* @Return: GlobalTransactionScanner
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("Configuring seata........") ;
return new GlobalTransactionScanner("test-service", "test-group");
}
}

Look at the code, we removed their own configuration of the sqlSessionFactory, directly let the DataSource bean return is a proxied bean, and we added @Primary, resulting in mp priority to use our configuration of the data source, which solves the problem of mp because of seata proxy data source with the creation of a new sqlSessionFactory, resulting in mp's plug-ins and components fail the bug!

Summary

stepping into the pit is not terrible, the main and patience along the principle of each component implementation, and then go to think, look for the corresponding conflict of the code block, you will be able to find a compatible method of the two.

· 21 min read

Project address

This article was written by FUNKYE (Chen Jianbin), Hangzhou, an Internet company main program.

Preface

Transaction: Transaction is a reliable independent unit of work composed of a set of operations, the transaction has the characteristics of ACID, namely atomicity, consistency, isolation and persistence. Distributed Transaction: When an operation involves multiple services, multiple databases to collaborate on the completion (such as sub-tables and libraries, business split), multiple services, the local Transaction has been unable to cope with this situation , in order to ensure data consistency, you need to use distributed transactions. Seata : is an open source distributed transaction solution , is committed to providing high performance and ease of use in the microservices architecture of distributed transaction services . Purpose of this article : Nowadays, microservices are becoming more and more popular , and the market can be described as a number of distributed transaction solutions , uneven , more popular to MQ on behalf of the guarantee is the ultimate consistency of the message solution ( consumption confirmation , message lookback , message compensation mechanism , etc.) , and TX-LCN LCN mode to coordinate local transactions to ensure that the transaction unified commit or rollback (has stopped updating , incompatible with Dubbo2.7). MQ's distributed transactions are too complex, TX-LCN break more, this time the need for an efficient and reliable and easy to get started with the distributed transaction solution, Seata stands out, this article is to introduce how to quickly build a Demo project to integrate Seata, together!

Preparation

  1. First of all, install mysql, eclipse and other commonly used tools, which does not expand.

  2. visit the seata download centre address we use version 0.9.0

  3. Download and unzip seata-server.

Build the library and table

1.first we link mysql to create a database named seata, and then run the table building sql, this in the seata-server conf folder db_store.sql is what I need to use the sql.

/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : seata
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:18
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for branch_table

-- ----------------------------

DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL, `xid` varchar
`xid` varchar(128) NOT NULL, `transaction_id` bigint(20)
`transaction_id` bigint(20) DEFAULT NULL, `resource_group_id
`resource_group_id` varchar(32) DEFAULT NULL, `resource_id` varchar(32)
`resource_id` varchar(256) DEFAULT NULL, `lock_key` varchar(256)
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL, `status` tinyint(8)
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL, `application_data` tinyint(4)
`application_data` varchar(2000) DEFAULT NULL, `gmt_create` tinyint(4) DEFAULT NULL, `gmt_create
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL, `gmt_modified` datetime
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of branch_table

-- ----------------------------

-- ----------------------------

-- Table structure for global_table

-- ----------------------------

DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL, `transaction_id` varchar(128)
`transaction_id` bigint(20) DEFAULT NULL, `status` tinyint(20)
`status` tinyint(4) NOT NULL, `application_id` varchar(4)
`application_id` varchar(32) DEFAULT NULL, `transaction_service` bigint(20)
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL, `timeout` int(11.0)
`timeout` int(11) DEFAULT NULL, `begin_time` big
`begin_time` bigint(20) DEFAULT NULL, `application_data` int(11)
`application_data` varchar(2000) DEFAULT NULL, `gmt_create` bigint(20)
`gmt_create` datetime DEFAULT NULL, `gmt_modify` datetime
`gmt_modified` datetime DEFAULT NULL, `gmt_modified` datetime
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_tmt_status
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of global_table

-- ----------------------------

-- ----------------------------

-- Table structure for lock_table

-- ----------------------------

DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL, `xid` varchar(128)
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext, `branch_id` mediumtext, `transaction_id` mediumtext
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL, `table_name` varchar(256)
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL, `gmt_create
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL, `gmt_modified` datetime
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of lock_table

-- ----------------------------

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20)
`branch_id` bigint(20) NOT NULL, `xid` varchar
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL, `rollback_info` bigint(20)
`rollback_info` longblob NOT NULL, `log_status` int
`log_status` int(11) NOT NULL, `log_created` datasheet
`log_created` datetime NOT NULL, `log_modified` longblob NOT NULL, `log_status` int(11)
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log
  1. After running the database needed for the above seata, we build the library we need to write the demo, create a database named test, and then execute the following sql code.
/*
Navicat MySQL Data Transfer
Source Server : mysql
Source Server Version : 50721
Source Host : localhost:3306
Source Database : test
Target Server Type : MYSQL
Target Server Version : 50721
File Encoding : 65001
Date: 2019-11-23 22:03:24
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------

-- Table structure for test

-- ----------------------------

DROP TABLE IF EXISTS `test`.
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT, `one` varchar(255) DEFATE TABLE (
`one` varchar(255) DEFAULT NULL,
`two` varchar(255) DEFAULT NULL, `createTime` datetime, `createTime` datetime, `createTime` datetime
`createTime` datetime DEFAULT NULL, `two` varchar(255) DEFAULT NULL, `createTime` datetime
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;

-- ----------------------------

-- Records of test

-- ----------------------------

INSERT INTO `test` VALUES ('1', '1', '2', '2019-11-23 16:07:34');

-- ----------------------------

-- Table structure for undo_log

-- ----------------------------

DROP TABLE IF EXISTS `undo_log`;.
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20)
`branch_id` bigint(20) NOT NULL, `xid` varchar
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL, `rollback_info` bigint(20)
`rollback_info` longblob NOT NULL, `log_status` int
`log_status` int(11) NOT NULL, `log_created` datasheet
`log_created` datetime NOT NULL, `log_modified` longblob NOT NULL, `log_status` int(11)
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of undo_log
  1. we find the file inside the seata-server/conf folder and edit it:20191129132933

  2. again find the db configuration method block, change the method as follows:

Well, you can go to the bin directory./seata-server.bat run to see the

Create a project

first of all, we use eclipse, of course, you can also use idea and other tools, please run in detail according to the following steps

  1. create a new maven project, and delete the extra folder:2019112913335420191129133441

  2. Open the project's pom.xml and add the following dependency.

<properties
<webVersion>3.1</webVersion
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding
<maven.compiler.source>1.8</maven.compiler.source
<maven.compiler.target>1.8</maven.compiler.target
<HikariCP.version>3.2.0</HikariCP.version
<mybatis-plus-boot-starter.version>3.2.0</mybatis-plus-boot-starter.version>
</properties
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>.
</parent
<dependencies
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId
<version>4.2.0</version>
</dependency
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>.
</dependency
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.4.1</version>
</dependency
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency
<dependency>
<groupId>com.alibaba</groupId
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency
<! -- <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId>
<version>7.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>.
</dependency
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>.
</dependency

<! -- mybatis-plus begin -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-boot-starter.version}</version>
</dependency
<! -- mybatis-plus end -->
<! -- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0.1</version>
</dependency
<! -- Zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<exclusions
<exclusion
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion
</exclusions
</dependency
<! -- <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</ artifactId>
<version>2.5.4</version> </dependency> -->

<! -- <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId>
<version>3.1.0</version> </dependency> -->
<! -- https://mvnrepository.com/artifact/org.freemarker/freemarker -->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency
<! -- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency
<! -- Add this to recognise the log4j2.yml file -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency
<dependency> <! -- Introducing the log4j2 dependency -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency
<! -- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId
<exclusions
<exclusion>
<groupId>org.springframework.boot</groupId
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion
<exclusion
<groupId>org.slf4j</groupId
<artifactId>slf4j-log4j12</artifactId>
</exclusion
</exclusions
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId
<scope>test</scope
</dependency
<! -- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>
<version>2.11.0</version> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional
</dependency
</dependencies

<optional>true</optional> </dependencies>
  1. and then switch the parent project for pom mode, or pom file, switch to overview , do as shown in the operation:20191129134127

  2. create our demo sub-project, test-service:20191129135935

The directory is as follows.

20191129140048

Create EmbeddedZooKeeper.java file, along with ProviderApplication.java, with the following code.

package org.test;

import java.io.File;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.UUID;

import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.ErrorHandler;
import org.springframework.util.SocketUtils;

/**
* from:
* https://github.com/spring-projects/spring-xd/blob/v1.3.1.RELEASE/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/zookeeper/ZooKeeperUtils.java
*
* Helper class to start an embedded instance of standalone (non clustered) ZooKeeper.
*
* NOTE: at least an external standalone server (if not an ensemble) are recommended, even for
* {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication}
*
* @author Patrick Peralta
* @author Mark Fisher
* @author David Turanski
*/
public class EmbeddedZooKeeper implements SmartLifecycle {

/**
* Logger.
*/
private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class);

/**
* ZooKeeper client port. This will be determined dynamically upon startup.
*/
private final int clientPort;

/**
* Whether to auto-start. Default is true.
*/
private boolean autoStartup = true;

/**
* Lifecycle phase. Default is 0.
*/
private int phase = 0;

/**
* Thread for running the ZooKeeper server.
*/
private volatile Thread zkServerThread;

/**
* ZooKeeper server.
*/
private volatile ZooKeeperServerMain zkServer;

/**
* {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread.
*/
private ErrorHandler errorHandler;

private boolean daemon = true;

/**
* Construct an EmbeddedZooKeeper with a random port.
*/
public EmbeddedZooKeeper() {
clientPort = SocketUtils.findAvailableTcpPort();
}

/**
* Construct an EmbeddedZooKeeper with the provided port.
*
* @param clientPort
* port for ZooKeeper server to bind to
*/
public EmbeddedZooKeeper(int clientPort, boolean daemon) {
this.clientPort = clientPort;
this.daemon = daemon;
}

/**
* Returns the port that clients should use to connect to this embedded server.
*
* @return dynamically determined client port
*/
public int getClientPort() {
return this.clientPort;
}

/**
* Specify whether to start automatically. Default is true.
*
* @param autoStartup
* whether to start automatically
*/
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* {@inheritDoc}
*/
public boolean isAutoStartup() {
return this.autoStartup;
}

/**
* Specify the lifecycle phase for the embedded server.
*
* @param phase
* the lifecycle phase
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* {@inheritDoc}
*/
public int getPhase() {
return this.phase;
}

/**
* {@inheritDoc}
*/
public boolean isRunning() {
return (zkServerThread != null);
}

/**
* Start the ZooKeeper server in a background thread.
* <p>
* Register an error handler via {@link #setErrorHandler} in order to handle any exceptions thrown during startup or
* execution.
*/
public synchronized void start() {
if (zkServerThread == null) {
zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter");
zkServerThread.setDaemon(daemon);
zkServerThread.start();
}
}

/**
* Shutdown the ZooKeeper server.
*/
public synchronized void stop() {
if (zkServerThread != null) {
// The shutdown method is protected...thus this hack to invoke it.
// This will log an exception on shutdown; see
// https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details.
try {
Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown");
shutdown.setAccessible(true);
shutdown.invoke(zkServer);
}

catch (Exception e) {
throw new RuntimeException(e);
}

// It is expected that the thread will exit after
// the server is shutdown; this will block until
// the shutdown is complete.
try {
zkServerThread.join(5000);
zkServerThread = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for embedded ZooKeeper to exit");
// abandoning zk thread
zkServerThread = null;
}
}
}

/**
* Stop the server if running and invoke the callback when complete.
*/
public void stop(Runnable callback) {
stop();
callback.run();
}

/**
* Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none
* is provided, only error-level logging will occur.
*
* @param errorHandler
* the {@link ErrorHandler} to be invoked
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Runnable implementation that starts the ZooKeeper server.
*/
private class ServerRunnable implements Runnable {

public void run() {
try {
Properties properties = new Properties();
File file = new File(System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID());
file.deleteOnExit();
properties.setProperty("dataDir", file.getAbsolutePath());
properties.setProperty("clientPort", String.valueOf(clientPort));

QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parseProperties(properties);

zkServer = new ZooKeeperServerMain();
ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumPeerConfig);

zkServer.runFromConfig(configuration);
} catch (Exception e) {
if (errorHandler != null) {
errorHandler.handleError(e);
} else {
logger.error("Exception running embedded ZooKeeper", e);
}
}
}
}

}
package org.test;

import org.apache.dubbo.config.spring.context.annotation.DubboComponentScan; import org.apache.dubbo.config.spring.context.annotation.
import org.springframework.boot.SpringApplication; import org.springframework.boot.
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.
import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.

/**
* @author cjbc.annotation.EnableTransactionManagement; /**
* @author cjb
* @date 2019/10/24
*/
@EnableTransactionManagement.
@ComponentScan(basePackages = {"org.test.config", "org.test.service.impl"})
@DubboComponentScan(basePackages = "org.test.service.impl")
@SpringBootApplication
public class ProviderApplication {

public static void main(String[] args) {
new EmbeddedZooKeeper(2181, false).start();
SpringApplication app = new SpringApplication(ProviderApplication.class);
app.run(args);
}

}

create entity package org.test.entity and the creation of entity class Test used to lombok, details of Baidu, eclipse installed lombok plug-in

package org.test.entity;

import java.io.Serializable;
import java.time.LocalDateTime;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
* <p>
* Functions
* </p
*
* @author Funkye
* @since 2019-04-23
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "test对象", description = "功能")
public class Test implements Serializable {

private static final long serialVersionUID = 1L;

@ApiModelProperty(value = "主键")
@TableId(value = "id", type = IdType.AUTO)
private Integer id;

@ApiModelProperty(value = "one")
@TableField("one")
private String one;

@ApiModelProperty(value = "two")
@TableField("two")
private String two;

@ApiModelProperty(value = "createTime")
@TableField("createTime")
private LocalDateTime createTime;

}

Create service, service.impl, mapper and other packages, in turn create ITestservice, and the implementation class, mapper.

package org.test.service;

import org.test.entity.Test;

import com.baomidou.mybatisplus.extension.service.IService;

/**
* <p>
* Function Service class
* </p
*
* @author Funkye
* @since 2019-04-10
*/
public interface ITestService extends IService<Test> {

}

import org.apache.dubbo.config.annotation.Service;
import org.test.entity.Test;
import org.test.mapper.TestMapper;
import org.test.service.ITestService;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

@Service(version = "1.0.0",interfaceClass =ITestService.class )
public class TestServiceImpl extends ServiceImpl<TestMapper, Test> implements ITestService {

}

 package org.test.mapper;

import org.test.entity.Test;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* <p>
* Functional Mapper interface
* </p>
*
* @author Funkye
* @since 2019-04-10
*/
public interface TestMapper extends BaseMapper<Test> {

}

Create org.test.config package, create SeataAutoConfig.java, configuration information are here, the main role for the proxy data, connect to the transaction service grouping

package org.test.config;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import com.alibaba.druid.pool.DruidDataSource;

import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SeataAutoConfig {
@Autowired(required = true)
private DataSourceProperties dataSourceProperties;
private final static Logger logger = LoggerFactory.getLogger(SeataAutoConfig.class);

@Bean(name = "druidDataSource")
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
logger.info("dataSourceProperties.getUrl():{}", dataSourceProperties.getUrl());
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
logger.info("load dataSource........");
return druidDataSource;
}

/**
* init datasource proxy
* @Param: druidDataSource datasource bean instance
* @Param: druidDataSource datasource bean instance
* @Return: DataSourceProxy datasource proxy
*/
@Bean(name = "dataSource")
@Primary // In the same DataSource, first use the labelled DataSource
public DataSourceProxy dataSourceProxy(@Qualifier(value = "druidDataSource") DruidDataSource druidDataSource) {
logger.info("Proxy dataSource ........") ;
return new DataSourceProxy(druidDataSource);
}

/**
* init global transaction scanner
* @Return: GlobalTransactionScanner
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
logger.info("Configuring seata........") ;
return new GlobalTransactionScanner("test-service", "test-group");
}
}

Then create the configuration file MybatisPlusConfig, which is required for mybatisplus.

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.baomidou.mybatisplus.core.parser.ISqlParser;
import com.baomidou.mybatisplus.extension.parsers.BlockAttackSqlParser;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;

@Configuration
// @MapperScan("com.baomidou.springboot.mapper*") // This annotation is equivalent to @Bean below.
// MapperScannerConfigurer, 2 configurations of a copy can be
public class MybatisPlusConfig {

/**
* mybatis-plus paging plugin <br
* Documentation: http://mp.baomidou.com<br>
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
List<ISqlParser> sqlParserList = new ArrayList<ISqlParser>();
// Attack the SQL blocking parser and join the parse chain.
sqlParserList.add(new BlockAttackSqlParser());
paginationInterceptor.setSqlParserList(sqlParserList);
return paginationInterceptor;
}

/**
* Equivalent to the top: {@code @MapperScan("com.baomidou.springboot.mapper*")} Here it can be extended, e.g., using a configuration file to configure the path to scan the Mapper
*/

@Bean
public MapperScannerConfigurer mapperScannerConfigurer() {
MapperScannerConfigurer scannerConfigurer = new MapperScannerConfigurer();
scannerConfigurer.setBasePackage("org.test.mapper");
return scannerConfigurer;
}

}

Create the resources directory, create the mapper folder, application.yml and other files.

server:
port: 38888
spring:
application:
name: test-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
dubbo:
protocol:
loadbalance: leastactive
threadpool: cached
scan:
base-packages: org。test.service
application:
qos-enable: false
name: testserver
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
typeAliasesPackage: org.test.entity
global-config:
db-config:
field-strategy: not-empty
id-type: auto
db-type: mysql
configuration:
map-underscore-to-camel-case: true
cache-enabled: true
auto-mapping-unknown-column-behavior: none

create file.conf, here the service within the vgroup_mapping. your transaction grouping, for example, on the ** face SeataAutoConfig configured within the test-group, then here should also be changed to test-group **, and then the following ip port are seata running ip and port on the line!

transport {
type = "TCP"
server = "NIO"
heartbeat = true
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
boss-thread-size = 1
worker-thread-size = 8
}
shutdown {
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
vgroup_mapping.test-group = "default"
default.grouplist = "127.0.0.1:8091"
enableDegrade = false
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
undo.log.table = "undo_log"
}

recovery {
committing-retry-period = 1000
asyn-committing-retry-period = 1000
rollbacking-retry-period = 1000
timeout-retry-period = 1000
}

transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
undo.log.save.days = 7
undo.log.delete.period = 86400000
undo.log.table = "undo_log"
}

metrics {
enabled = false
registry-type = "compact"
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}

support {
spring {
datasource.autoproxy = false
}
}

Create registry.conf to specify ip ports for file, zk and so on.

registry {
type = "file"
file {
name = "file.conf"
}
}
config {
type = "file"
file {
name = "file.conf"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
}

Great success, you can directly run it, this time to observe the seata-server20191129142115

Next, we create test-client project, here will not repeat, with the above test-service the same way to create

Next, we copy the test-service service and entities within the past, of course, you are too much trouble, you can get a separate sub-project to put a general service and entities, some tools and so on, I'm here in order to quickly build this demo, the choice of copy and paste the way.

Directory structure:

Then we create ClientApplication.

package org.test;

import java.util.TimeZone;
import java.util.concurrent.Executor;

import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, MybatisPlusAutoConfiguration.class})
@EnableScheduling
@EnableAsync
@Configuration
@EnableDubbo(scanBasePackages = {"org.test.service"})
@ComponentScan(basePackages = {"org.test.service", "org.test.controller", "org.test.config"})
public class ClientApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication app = new SpringApplication(ClientApplication.class);
app.run(args);
}

@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
return new ThreadPoolTaskExecutor();
}
}

Then go to the config package and create SwaggerConfig :

package org.test.config;

import java.util.ArrayList;
import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
public class SwaggerConfig {
// swagger2 configuration file, here you can configure the swagger2 some basic content, such as scanning packages and so on
@Bean
public Docket createRestApi() {
List<Parameter> pars = new ArrayList<Parameter>(); return new Docket(DocumentationText)
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
// Path to the current package
.apis(RequestHandlerSelectors.basePackage("org.test.controller")).paths(PathSelectors.any()).build()
.globalOperationParameters(pars);
}

// Build the api document's details function, noting which annotation is referenced here
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
// The title of the page
.title("Project Interface")
// Creator
.contact(new Contact("FUNKYE", "", ""))
// Version number
.version("1.0")
// Description
.description("API description").build();
}
}

and then create SpringMvcConfigure, and then put inside the configuration of seata, I'm lazy in order to directly integrated in the mvc configuration of the class, you can standardise the point can be created in addition to the configuration of a seata class, you can find the following is still a group name, I have two projects are assigned to a group to go, it seems that another take a also It's okay.

package org.test.config;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.dubbo.config.annotation.Reference;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.view.InternalResourceViewResolver;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter;
import com.google.common.collect.Maps;

import io.seata.spring.annotation.GlobalTransactionScanner;

@Configuration
public class SpringMvcConfigure implements WebMvcConfigurer {

@Bean
public FilterRegistrationBean corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader(CorsConfiguration.ALL); config.addAllowedHeader(CorsConfiguration.ALL);
config.addAllowedMethod(CorsConfiguration.ALL); config.addAllowedMethod(CorsConfiguration.ALL);
source.registerCorsConfiguration("/**", config);
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new CorsFilter(source));
filterRegistrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE);
filterRegistrationBean.setOrder(1);
filterRegistrationBean.setEnabled(true);
filterRegistrationBean.addUrlPatterns("/**");
Map<String, String> initParameters = Maps.newHashMap();
initParameters.put("excludes", "/favicon.ico,/img/*,/js/*,/css/*");
initParameters.put("isIncludeRichText", "true");
filterRegistrationBean.setInitParameters(initParameters); return filterRegistrationBean.
return filterRegistrationBean; }
}

@Bean
public InternalResourceViewResolver viewResolver() {
InternalResourceViewResolver viewResolver = new InternalResourceViewResolver(); viewResolver.setPrefix("/WEB-INF")
viewResolver.setPrefix("/WEB-INF/jsp/");
viewResolver.setSuffix(".jsp");
// viewResolver.setViewClass(JstlView.class); // This property does not usually need to be configured manually.
// This property does not usually need to be configured manually, as higher versions of Spring will automatically detect it.
return viewResolver; // viewResolver.setViewClass(JstlView.class)
}



/**
* Replacing frame json with fastjson
*/
@Override
public void configureMessageConverters(List<HttpMessageConverter<? >> converters) {
FastJsonHttpMessageConverter fastConverter = new FastJsonHttpMessageConverter();
FastJsonConfig fastJsonConfig = new FastJsonConfig();
fastJsonConfig.setSerializerFeatures(SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue,
SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.DisableCircularReferenceDetect);
// Handle garbled Chinese characters
List<MediaType> fastMediaTypes = new ArrayList<>();
fastMediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
fastConverter.setSupportedMediaTypes(fastMediaTypes);
fastConverter.setFastJsonConfig(fastJsonConfig);
// Handle strings, avoiding quotes when returning strings directly.
StringHttpMessageConverter smc = new StringHttpMessageConverter(Charset.forName("UTF-8"));
converters.add(smc);
converters.add(fastConverter);
}

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("test-client", "test-group"); }
}

}

Create the controller package, and then create the TestController under the package.

package org.test.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.test.service.DemoService;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

/**
* <p>
* Documentation table Front-end controller
* </p
*
* @author funkye
* @since 2019-03-20
*/
@RestController
@RequestMapping("/test")
@Api(tags = "test interface")
public class TestController {

private final static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
@Lazy
DemoService demoService;

@GetMapping(value = "testSeataOne")
@ApiOperation(value = "Test the manual rollback distributed transaction interface")
public Object testSeataOne() {
return demoService.One();
}

@GetMapping(value = "testSeataTwo")
@ApiOperation(value = "Test Exception Rollback Distributed Transaction Interface")
public Object testSeataTwo() {
return demoService.Two();
}

}

Then go to service and create the demoService you need to depend on.

package org.test.service;

import java.time.LocalDateTime;

import org.apache.dubbo.config.annotation.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.test.controller.TestController;
import org.test.entity.Test;

import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.GlobalTransactionContext;

@Service
public class DemoService {
@Reference(version = "1.0.0", timeout = 60000)
private ITestService testService;
private final static Logger logger = LoggerFactory.getLogger(DemoService.class);

/**
* manual rollback example
*
* @return
*/
@GlobalTransactional
public Object One() {
logger.info("seata distribute transaction Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
try {
logger.info("load transaction id for rollback");
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
} catch (TransactionException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
return false;
}

/**
* throw exception and rollback
*
* @return
*/
@GlobalTransactional
public Object Two() {
logger.info("seata分布式事务Id:{}", RootContext.getXID());
logger.info("seata distribute transaction Id:{}", RootContext.getXID());
Test t = new Test();
t.setOne("1");
t.setTwo("2");
t.setCreateTime(LocalDateTime.now());
testService.save(t);
try {
int i = 1 / 0;
return true;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException();
}
}
}

Create the resources folder as usual, starting with the common application.yml.

spring:
application:
name: test
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test?userSSL=true&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai
username: root
password: 123456
mvc:
servlet:
load-on-startup: 1
http:
encoding:
force: true
charset: utf-8
enabled: true
multipart:
max-file-size: 10MB
max-request-size: 10MB
dubbo:
registry:
id: my-registry
address: zookeeper://127.0.0.1:2181?client=curator
# address: zookeeper://127.0.0.1:2181?client=curator
application:
name: dubbo-demo-client
qos-enable: false
server:
port: 28888
max-http-header-size: 8192
address: 0.0.0.0
tomcat:
max-http-post-size: 104857600

Copy the service configuration file and registry file, if your client group name is changed in the configuration class, then the group name in the file file needs to be changed as well.

The complete directory structure as above, this time you can start test-service, then start test-client, to swagger test it!

  1. Visit 127.0.0.1:28888/swagger-ui.html to do the final finish

20191129143124

Here's the data I've saved a record, let's see if we'll successfully rollback:

20191129143252

Refresh the database, found that there is still only one data:

20191129143124

And then check the log.

20191129143407

It shows that it has been rolled back, let's look at the log from seata-server again:

Display rollback success, transaction id is also consistent, this is our distributed transaction on the run through, through the interruption point way, you can view the undo_log, you will find that before the transaction is committed, will be deposited into a transaction information data, if the rollback is successful, the information will be deleted.

Summary

seata's integration is still relatively simple and easy to start, a little more attentive you must write better than me!

Welcome to read more seata, dubbo and other source code, can solve the business encountered a lot of pit oh!

· 3 min read

When analysing the source code of the startup section, I found that GlobalTransactionScanner will start both RM and TM client, but according to Seata's design, TM is responsible for global transaction operation, if a service does not need to open global transaction, then there is no need to start TM client, that is to say, if there is no global transaction annotation in the project, then there is no need to initialize TM client, because not every microservice needs GlobalTransactional, it just acts as an RM client. That is to say, if there is no global transaction annotation in the project, there is no need to initialise the TM client at this time, because not every microservice needs GlobalTransactional, and it is only used as an RM client at this time.

So I proceeded to change the initialisation rules of GlobalTransactionScanner slightly, since previously GlobalTransactionScanner called the initialisation method in the afterPropertiesSet() method of InitializingBean, the afterPropertySet() method was used to initialise the TM client. AfterPropertySet() is only called after the current bean is initialised, there is no way to know if the Spring container has a global transaction annotation.

Therefore, I removed the InitializingBean and implemented ApplicationListener instead, checking for GlobalTransactional annotations during bean instantiation, and then calling RM and TM client initialisation methods after the Spring container initialisation is complete. Finally, after the Spring container is initialised, the RM and TM client initialisation methods are called, and then you can decide whether to start the TM client or not, depending on whether the GlobalTransactional annotation is used in the project or not.

Here is the PR address: https://github.com/apache/incubator-seata/pull/1936

As we discussed in pr, the current design of Seata is that only the TM at the initiator can initiate GlobalRollbackRequest, and the RM can only send BranchReport(false) to report the branch status to the TC server, and cannot send GlobalRollbackRequest directly to perform global rollback. operation. The interaction logic is as follows:

!

According to the above design model, the TM client can be started on demand.

However, in the later optimisation iterations of Seata, there is one more point that needs to be considered:

When an exception occurs in a participant, is it possible to initiate a global rollback directly from the participant's TM client? This also means that the cycle time of distributed transactions can be shortened, and global locks can be released as soon as possible so that other transactions with conflicting data can acquire locks and execute as soon as possible.

!

That is to say, in a global transaction, as long as one RM client fails to execute a local transaction, the TM client of the current service will directly initiate a global transaction rollback, so there is no need to wait for the TM of the initiator to initiate a resolution rollback notification. To achieve this optimisation, each service needs to start both the TM client and the RM client.

Zhang Chenghui, currently working in the Information Centre of China Communication Technology, Technology Platform Department, as a Java engineer, mainly responsible for the development of China Communication messaging platform and the whole link pressure test project, love to share technology, WeChat public number "back-end advanced" author, technology blog (https://objcoding.com/) Blogger, Seata Contributor, GitHub ID: objcoding.

· 13 min read

From the previous article "Design Principles of Distributed Transaction Middleware Seata", we talked about some design principles of Seata AT pattern, from which we also know the three roles of AT pattern (RM, TM, TC). I will update the Seata source code analysis series. Today, we are going to analyse what Seata AT mode does at startup.

Client Startup Logic

TM is responsible for the whole global transaction manager, so a global transaction is started by TM, TM has a global management class GlobalTransaction, the structure is as follows:

io.seata.tm.api.GlobalTransaction

public interface GlobalTransaction {

void begin() throws TransactionException.

void begin(int timeout) throws TransactionException.

void begin(int timeout, String name) throws TransactionException; void commit() throws TransactionException.

void commit() throws TransactionException.

void rollback() throws TransactionException.

GlobalStatus getStatus() throws TransactionException; // ...

// ...
}

It is possible to create a GlobalTransaction via GlobalTransactionContext and then use the GlobalTransaction to open, commit, rollback, etc., a global transaction, so we're using the Seata AT mode directly in an API way:

//init seata; TMClient.init(application)
TMClient.init(applicationId, txServiceGroup); RMClient.init(applicationId, txServiceGroup)
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
// Transaction
// ...
tx.commit(); } catch (Exception exx)
} catch (Exception exx) {
tx.rollback(); } catch (Exception exx) { tx.rollback(); }
throw exx; } catch (Exception exx) { tx.rollback(); throw exx; }
}

If you write this every time you use a global transaction, it will inevitably cause code redundancy, our projects are based on the Spring container, we can use the characteristics of Spring AOP, with template patterns to encapsulate this redundant code in the template, reference Mybatis-spring also does this thing, so let's analyse what a Spring-based So let's analyse what a Spring-based project does when it starts Seata and registers a global transaction.

We enable a global transaction by adding the @GlobalTransactional annotation to the method. Seata's Spring module has a GlobalTransactionScanner, which has the following inheritance relationship:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitialisingBean, ApplicationContextAware, DisposableBean {
// ...
}

During the startup of a Spring-based project, the following initialisation process occurs for this class:

! image-20191124155455309

The afterPropertiesSet() method of InitialisingBean calls the initClient() method:

io.seata.spring.annotation.GlobalTransactionScanner#initClient

TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup).

Initialisation operations are done for TM and RM.

  • TM initialisation

io.seata.tm.TMClient#init

public static void init(String applicationId, String transactionServiceGroup) {
// Get the TmRpcClient instance.
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup); // Initialise the TM Client.
// Initialise the TM Client
tmRpcClient.init();
}

Calling the TmRpcClient.getInstance() method acquires an instance of the TM Client, and during the acquisition process creates the Netty Client Profile object, as well as the messageExecutor thread pool, which is used to handle various message interactions with the server, and during the creation of the TmRpcClient instance, the Create a ClientBootstrap, which is used to manage the start and stop of the Netty service, and a ClientChannelManager, which is used to manage the Netty client object pool, which is used in conjunction with the Netty part of Seata, and which will be discussed later in the Analysing Networks module.

io.seata.core.rpc.netty.AbstractRpcRemotingClient#init

public void init() {
clientBootstrap.start();
// Timer to try to connect to the server
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME
KEEP_ALIVE_TIME, TimeUnit.
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
}

Calling the TM client init() method will eventually start the netty client (it's not really started yet, it will be started when the object pool is called); a timed task is started to resend the RegisterTMRequest (the RM client sends the RegisterRMRequest) request to try to connect to the server, the logic for this is The logic is that the client channel is cached in channels in the NettyClientChannelManager, so if the channels don't exist and are out of date, then it will try to connect to the server in order to fetch the channel again and cache it in channels; a separate thread is started to handle asynchronous request sending. This is a very clever use of the network module, which will be analysed later in the analysis.

io.seata.core.rpc.netty.AbstractRpcRemoting#init

public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
public void run() { scheduleAtFixedRate(new Runnable() {
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

In the init method of AbstractRpcRemoting, it opens a timer task, which is mainly used to clear the expired futrue of futures. futures is a future object that saves the results of sending requests, and this object has a timeout period, after which an exception will be thrown. Therefore, you need to clear the expired futures regularly.

  • RM Initialisation
io.seata.rm.RMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
rmRpcClient.setResourceManager(DefaultResourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get());
rmRpcClient.init();
}

RmRpcClient.getInstance handles the same logic as the TM; ResourceManager is the RM resource manager responsible for branch transaction registration, commit, report, and rollback operations, as well as global lock querying operations, and DefaultResourceManager will hold all current RM resource managers. The DefaultResourceManager holds all current RM resource managers. DefaultResourceManager will hold all the current RM resource managers for unified call processing, and get() method is mainly to load the current resource manager, mainly using a mechanism similar to SPI, for flexible loading, as shown in the following figure, Seata will scan the META- INF/services/ directory for configuration classes and load them dynamically.

ClientMessageListener is a RM message listener, which is responsible for processing commands sent from TC and performing branch commit, branch rollback, and undo log deletion operations on the branch; finally, the init method follows the same logic as the TM; DefaultRMHandler encapsulates some of the specific operation logic of RM branching transactions. logic.

Let's take a look at what the wrapIfNecessary method does.

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // Determine if there is a global transaction scanner turned on?
// Determine if global transactions are enabled
if (disableGlobalTransaction) {
Returns the bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
Return the bean;
}
Interceptor = null;
// Check the TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //TCC interceptor, proxy bean for sofa:reference/dubbo.
//TCC interceptor, proxy bean for sofa:reference/dubbo:reference and LocalTCC.
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

// Determine if the bean has the GlobalTransactional and GlobalLock annotations.
if (!existsAnnotation(new Class[]{serviceInterface}))
&& !existsAnnotation(interfacesIfJdk)) {
Return the bean;
}

if (interceptor == null) { // create the proxy class
// Create the proxy class
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}

LOGGER.info("Bean [{}] with name [{}] would use interceptor [{}]",
bean.getClass().getName(),beanName,interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// Perform wrapping the target object to the proxy object
Advisor[] advisor = super.buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
Returns the bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

GlobalTransactionScanner inherits AbstractAutoProxyCreator for Spring AOP support, and as you can see from the code, GlobalTransactionalInterceptor is used instead of the methods annotated with GlobalTransactional and GlobalLock annotated methods.

GlobalTransactionalInterceptor implements MethodInterceptor: method interceptor.


io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<? > targetClass = methodInvocation.getThis() ! = null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation ! = null) { // globalTransactionalAnnotation !
// globalTransactionalAnnotation
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation ! = null) { // globalLockAnnotation !
// global lock annotation
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}

The above is the logic executed by the proxy method, where the handleGlobalTransaction() method calls the TransactionalTemplate template inside: io.seata.spring.annotation.GlobalTransactionalInterceptor #handleGlobalTransaction()

io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public TransactionInfo getTransactionInfo() {
// ...
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// ...
}
}

The handleGlobalTransaction() method executes the execute method of the TransactionalTemplate template class:

io.seata.tm.api.TransactionalTemplate#execute

public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 get transactionInfo = GlobalTransactionContext.

// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo = txInfo.getCurrentOrCreate())
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist"); }
}
try {

// 2. begin transaction


Object rs = null; } try { // 2.
try {

// Do Your Business
rs = business.execute(); } catch (Throwable ex) { // Do Your Business.

} catch (Throwable ex) {

// 3. the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex); } throw ex; }
throw ex; } catch (Throwable ex) { // 3.
}

// 4. everything is fine, commit.
commitTransaction(tx); return rs; }

commitTransaction(tx); return rs.
} finally {
} finally { // 5. clear
triggerAfterCompletion(); cleanUp(); }
cleanUp();
}
}

Doesn't the above give you a sense of déjà vu? That's right, the above is often written when we use the API redundant code, now Spring through the proxy model, the redundant code are encapsulated with the template inside it, it will be those redundant code is encapsulated in a unified process processing, and do not need to show you write out, interested can also go to look at the source code of the Mybatis-spring, is also written very exciting.

server-side processing logic

The server receives the client's connection, that is, of course, the channel is also cached up, also said that the client will send RegisterRMRequest/RegisterTMRequest request to the server, the server receives the ServerMessageListener listener will be called to deal with:

io.seata.core.rpc.ServerMessageListener

public interface ServerMessageListener {
// Handles various transactions, such as branch registration, branch commit, branch report, branch rollback, etc.
void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender); // Handle the registration of RM clients.
// Handle the registration of the RM client's connection
void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender); // Handle the registration of the RM client.
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler); // Handle the registration of the TM client.
// Handle the registration of the TM client's connection.
void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler)
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler); // Handle TM client's registered connection.
// The server maintains a heartbeat with the client
void onCheckMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)

}

ChannelManager is the manager of the server channel, every time the server communicates with the client, it needs to get the corresponding channel of the client from the ChannelManager, which is used to save the cache structure of the TM and RM client channel as follows:

/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer.
RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext
RpcContext>>>>();

/**
* ip+appname,port
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

The above Map structure is a bit complicated:

RM_CHANNELS:

  1. resourceId refers to the database address of the RM client;
  2. applicationId refers to the service Id of the RM client, for example, account-service in springboot's configuration spring.application.name=account-service is the applicationId. 3. ip refers to the service Id of the RM client, for example, account-service in spring.application.name=account-service is the applicationId;
  3. ip refers to the RM client service address. 4. port refers to the RM client service address;
  4. port refers to the RM client service address;
  5. RpcContext saves the information of this registration request.

TM_CHANNELS:

  1. ip+appname: the comment here should be written wrongly, it should be appname+ip, that is, the first key of the Map structure of TM_CHANNELS is appname+ip;
  2. port: the port number of the client.

The following is the RM Client registration logic:

io.seata.core.rpc.ChannelManager#registerRMChannel

public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(resourceManagerRequest.getVersion()); // Register the ResourceIds database.
// Put the ResourceIds database connection connection information into a set
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds()); // put the ResourceIds database connection connection information into a set.
RpcContext rpcContext;
// Determine if the channel information is available from the cache
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// Build the rpcContext based on the request registration information.
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getTransactionServiceGroup(), resourceManagerRequest.getResourceIds(), channel);
// Put the rpcContext into the cache
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS); } else { rpcContext.
} else {
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (null == dbkeySet || dbkeySet.isEmpty()) { return; }
for (String resourceId : dbkeySet) {
String clientIp; // Store the request information into RM_Request.
// Store the request information into RM_CHANNELS, using java8's computeIfAbsent method.
ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
// Put the current rpcContext into the portMap.
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId()); }
}
}

From the above code logic, we can see that the registration of RM client is mainly to put the registration request information into RM_CHANNELS cache, and at the same time, we will also judge from IDENTIFIED_CHANNELS whether the channel of this request has been verified or not, and the structure of IDENTIFIED_CHANNELS is as follows:

private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS
= new ConcurrentHashMap<>();

IDENTIFIED_CHANNELS contains all TM and RM registered channels.

The following is the TM registration logic:

io.seata.core.rpc.ChannelManager#registerTMChannel

public static void registerTMChannel(RegisterTMRequest request, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(request.getVersion());
// Build the RpcContext based on the request registration information.
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(), request.getTransactionServiceHolder(NettyPoolKey.TransactionRole.
request.getApplicationId(), request.getTransactionServiceGroup(),
null, channel);
// Put the RpcContext into the IDENTIFIED_CHANNELS cache.


rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS); // put RpcContext into IDENTIFIED_CHANNELS cache; rpcContext.
// account-service:127.0.0.1:63353
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
+ getClientIpFromChannel(channel);
// Store the request information in the TM_CHANNELS cache.
TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>()); // put the request information into the TM_CHANNELS cache.
// Create the get from the previous step, and then put the rpcContext into the value of the map.
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
rpcContext.holdInClientChannels(clientIdentifiedMap);
}

The registration of TM client is similar, the information registered is put into the corresponding cache, but the registration logic is simpler than that of RM client, mainly because RM client involves the information of branch transaction resources, and the information needed to be registered will be more than that of TM client.

The above source code analysis is based on version 0.9.0.

About the Author

Zhang Chenghui, currently working in the Information Centre of China Communication Technology, Technology Platform Department, as a Java engineer, mainly responsible for the development of China Communication messaging platform and the whole link pressure test project, love to share technology, WeChat public number "back-end advanced" author, technology blog (https://objcoding.com/) Blogger, Seata Contributor, GitHub ID: objcoding.

· 20 min read

Seata, short for Simple Extensible Autonomous Transaction Architecture, is an all-in-one distributed transaction solution. It provides AT, TCC, Saga, and XA transaction modes. This article provides a detailed explanation of the Saga mode within Seata, with the project hosted on GitHub.

Author: Yiyuan (Chen Long), Core Developer of Distributed Transactions at Ant Financial.

Pain Points in Financial Distributed Application Development

Distributed systems face a prominent challenge where a business process requires a composition of various services. This challenge becomes even more pronounced in a microservices architecture, as it necessitates consistency guarantees at the business level. In other words, if a step fails, it either needs to roll back to the previous service invocation or continuously retry to ensure the success of all steps. - From "Left Ear Wind - Resilient Design: Compensation Transaction"

In the domain of financial microservices architecture, business processes are often more complex. Processes are lengthy, such as a typical internet microloan business process involving calls to more than ten services. When combined with exception handling processes, the complexity increases further. Developers with experience in financial business development can relate to these challenges.

During the development of financial distributed applications, we encounter several pain points:

  • Difficulty Ensuring Business Consistency

    In many of the systems we encounter (e.g., in channel layers, product layers, and integration layers), ensuring eventual business consistency often involves adopting a "compensation" approach. Without a coordinator to support this, the development difficulty is significant. Each step requires handling "rollback" operations in catch blocks, resulting in a code structure resembling an "arrow," with poor readability and maintainability. Alternatively, retrying exceptional operations, if unsuccessful, might lead to asynchronous retries or even manual intervention. These challenges impose a significant burden on developers, reducing development efficiency and increasing the likelihood of errors.

  • Difficulty Managing Business State

    With numerous business entities and their corresponding states, developers often update the entity's state in the database after completing a business activity. Lack of a state machine to manage the entire state transition process results in a lack of intuitiveness, increases the likelihood of errors, and causes the business to enter an incorrect state.

  • Difficulty Ensuring Idempotence

    Idempotence of services is a fundamental requirement in a distributed environment. Ensuring the idempotence of services often requires developers to design each service individually, using unique keys in databases or distributed caches. There is no unified solution, creating a significant burden on developers and increasing the chances of oversight, leading to financial losses.

  • Challenges in Business Monitoring and Operations; Lack of Unified Error Guardian Capability

    Monitoring the execution of business operations is usually done by logging, and monitoring platforms are based on log analysis. While this is generally sufficient, in the case of business errors, these monitors lack immediate access to the business context and require additional database queries. Additionally, the reliance on developers for log printing makes it prone to omissions. For compensatory transactions, there is often a need for "error guardian triggering compensation" and "worker-triggered compensation" operations. The lack of a unified error guardian and processing standard requires developers to implement these individually, resulting in a heavy development burden.

Theoretical Foundation

In certain scenarios where strong consistency is required for data, we may adopt distributed transaction schemes like "Two-Phase Commit" at the business layer. However, in other scenarios, where such strong consistency is not necessary, ensuring eventual consistency is sufficient.

For example, Ant Financial currently employs the TCC (Try, Confirm, Cancel) pattern in its financial core systems. The characteristics of financial core systems include high consistency requirements (business isolation), short processes, and high concurrency.

On the other hand, in many business systems above the financial core (e.g., systems in the channel layer, product layer, and integration layer), the emphasis is on achieving eventual consistency. These systems typically have complex processes, long flows, and may need to call services from other companies (such as financial networks). Developing Try, Confirm, Cancel methods for each service in these scenarios incurs high costs. Additionally, when there are services from other companies in the transaction, it is impractical to require those services to follow the TCC development model. Long processes can negatively impact performance if transaction boundaries are too extensive.

When it comes to transactions, we are familiar with ACID, and we are also acquainted with the CAP theorem, which states that at most two out of three—Consistency (C), Availability (A), and Partition Tolerance (P)—can be achieved simultaneously. To enhance performance, a variant of ACID known as BASE emerged. While ACID emphasizes consistency (C in CAP), BASE emphasizes availability (A in CAP). Achieving strong consistency (ACID) is often challenging, especially when dealing with multiple systems that are not provided by a single company. BASE systems are designed to create more resilient systems. In many situations, particularly when dealing with multiple systems and providers, BASE systems acknowledge the risk of data inconsistency in the short term. This allows new transactions to occur, with potentially problematic transactions addressed later through compensatory means to ensure eventual consistency.

Therefore, in practical development, we make trade-offs. For many business systems above the financial core, compensatory transactions can be adopted. The concept of compensatory transactions has been proposed for about 30 years, with the Saga theory emerging as a solution for long transactions. With the recent rise of microservices, Saga has gradually gained attention in recent years. Currently, the industry generally recognizes Saga as a solution for handling long transactions.

https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf[1] > http://microservices.io/patterns/data/saga.html[2]

Community and Industry Solutions

Apache Camel Saga

Camel is an open-source product that implements Enterprise Integration Patterns (EIP). It is based on an event-driven architecture and offers good performance and throughput. In version 2.21, Camel introduced the Saga EIP.

The Saga EIP provides a way to define a series of related actions through Camel routes. These actions either all succeed or all roll back. Saga can coordinate distributed services or local services using any communication protocol, achieving global eventual consistency. Saga does not require the entire process to be completed in a short time because it does not occupy any database locks. It can support requests that require long processing times, ranging from seconds to days. Camel's Saga EIP is based on MicroProfile's LRA[3] (Long Running Action). It also supports the coordination of distributed services implemented in any language using any communication protocol.

The implementation of Saga does not lock data. Instead, it defines "compensating operations" for each operation. When an error occurs during the normal process execution, the "compensating operations" for the operations that have already been executed are triggered to roll back the process. "Compensating operations" can be defined on Camel routes using Java or XML DSL (Definition Specific Language).

Here is an example of Java DSL:

// Java DSL example goes here

// action
from("direct:reserveCredit")
.bean(idService, "generateCustomId") // generate a custom Id and set it in the body
.to("direct:creditReservation")

// delegate action
from("direct:creditReservation")
.saga()
.propagation(SagaPropagation.SUPPORTS)
.option("CreditId", body()) // mark the current body as needed in the compensating action
.compensation("direct:creditRefund")
.bean(creditService, "reserveCredit")
.log("Credit ${header.amount} reserved. Custom Id used is ${body}");

// called only if the saga is cancelled
from("direct:creditRefund")
.transform(header("CreditId")) // retrieve the CreditId option from headers
.bean(creditService, "refundCredit")
.log("Credit for Custom Id ${body} refunded");

XML DSL sample:

<route>
<from uri="direct:start"/>
<saga>
<compensation uri="direct:compensation" />
<completion uri="direct:completion" />
<option optionName="myOptionKey">
<constant>myOptionValue</constant>
</option>
<option optionName="myOptionKey2">
<constant>myOptionValue2</constant>
</option>
</saga>
<to uri="direct:action1" />
<to uri="direct:action2" />
</route>

Eventuate Tram Saga

Eventuate Tram Saga[4] The framework is a Saga framework for Java microservices using JDBC/JPA. Similar to Camel Saga, it also adopts Java DSL to define compensating operations:

public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {

private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.withCompensation(this::reject)
.step()
.invokeParticipant(this::reserveCredit)
.step()
.invokeParticipant(this::approve)
.build();


@Override
public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}


private CommandWithDestination reserveCredit(CreateOrderSagaData data) {
long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
return send(new ReserveCreditCommand(customerId, orderId, orderTotal))
.to("customerService")
.build();

...

Apache ServiceComb Saga

ServiceComb Saga[5] is also a solution for achieving data eventual consistency in microservices applications. In contrast to TCC, Saga directly commits transactions in the try phase, and the subsequent rollback phase is completed through compensating operations in reverse. What sets it apart is the use of Java annotations and interceptors to define "compensating" services.

Architecture:

Saga consists of alpha and omega, where:

  • Alpha acts as the coordinator, primarily responsible for managing and coordinating transactions;
  • Omega is an embedded agent in microservices, responsible for intercepting network requests and reporting transaction events to alpha;

The diagram below illustrates the relationship between alpha, omega, and microservices:

ServiceComb Saga

sample:

public class ServiceA extends AbsService implements IServiceA {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Autowired
private IServiceB serviceB;

@Autowired
private IServiceC serviceC;

@Override
public String getServiceName() {
return "servicea";
}

@Override
public String getTableName() {
return "testa";
}

@Override
@SagaStart
@Compensable(compensationMethod = "cancelRun")
@Transactional(rollbackFor = Exception.class)
public Object run(InvokeContext invokeContext) throws Exception {
LOG.info("A.run called");
doRunBusi();
if (invokeContext.isInvokeB(getServiceName())) {
serviceB.run(invokeContext);
}
if (invokeContext.isInvokeC(getServiceName())) {
serviceC.run(invokeContext);
}
if (invokeContext.isException(getServiceName())) {
LOG.info("A.run exception");
throw new Exception("A.run exception");
}
return null;
}

public void cancelRun(InvokeContext invokeContext) {
LOG.info("A.cancel called");
doCancelBusi();
}

Ant Financial's Practice

Ant Financial extensively uses the TCC mode for distributed transactions, mainly in scenarios where high consistency and performance are required, such as in financial core systems. In upper-level business systems with complex and lengthy processes, developing TCC can be costly. In such cases, most businesses opt for the Saga mode to achieve eventual business consistency. Due to historical reasons, different business units have their own set of "compensating" transaction solutions, basically falling into two categories:

  1. When a service needs to "retry" or "compensate" in case of failure, a record is inserted into the database with the status before executing the service. When an exception occurs, a scheduled task queries the database record and performs "retry" or "compensation." If the business process is successful, the record is deleted.

  2. Designing a state machine engine and a simple DSL to orchestrate business processes and record business states. The state machine engine can define "compensating services." In case of an exception, the state machine engine invokes "compensating services" in reverse. There is also an "error guardian" platform that monitors failed or uncompensated business transactions and continuously performs "compensation" or "retry."

Solution Comparison

Generally, there are two common solutions in the community and industry: one is based on a state machine or a process engine that orchestrates processes and defines compensation through DSL; the other is based on Java annotations and interceptors to implement compensation. What are the advantages and disadvantages of these two approaches?

ApproachProsCons
State Machine + DSL
- Business processes can be defined using visual tools, standardized, readable, and can achieve service orchestration functionality
- Improves communication efficiency between business analysts and developers
- Business state management: Processes are essentially state machines, reflecting the flow of business states
- Enhances flexibility in exception handling: Can implement "forward retry" or "backward compensation" after recovery from a crash
- Naturally supports asynchronous processing engines such as Actor model or SEDA architecture, improving overall throughput

- Business processes are composed of JAVA programs and DSL configurations, making development relatively cumbersome
- High intrusiveness into existing business if it is a transformation
- High implementation cost of the engine
Interceptor + Java Annotation
- Programs and annotations are integrated, simple development, low learning curve
- Easy integration into existing businesses
- Low framework implementation cost

- The framework cannot provide asynchronous processing modes such as the Actor model or SEDA architecture to improve system throughput
- The framework cannot provide business state management
- Difficult to achieve "forward retry" after crash recovery due to the inability to restore thread context

Seata Saga Approach

The introduction of Seata Saga can be found in Seata Saga Official Documentation[6].

Seata Saga adopts the state machine + DSL approach for the following reasons:

  • The state machine + DSL approach is more widely used in practical production scenarios.
  • Can use asynchronous processing engines such as the Actor model or SEDA architecture to improve overall throughput.
  • Typically, business systems above the core system have "service orchestration" requirements, and service orchestration has transactional eventual consistency requirements. These two are challenging to separate. The state machine + DSL approach can simultaneously meet these two requirements.
  • Because Saga mode theoretically does not guarantee isolation, in extreme cases, it may not complete the rollback operation due to dirty writing. For example, in a distributed transaction, if you recharge user A first and then deduct the balance from user B, if A user consumes the balance before the transaction is committed, and the transaction is rolled back, there is no way to compensate. Some business scenarios may allow the business to eventually succeed, and in cases where rollback is impossible, it can continue to retry the subsequent process. The state machine + DSL approach can achieve the ability to "forward" recover context and continue execution, making the business eventually successful and achieving eventual consistency.

In cases where isolation is not guaranteed: When designing business processes, follow the principle of "prefer long 款, not short 款." Long 款 means fewer funds for customers and more funds for institutions. Institutions can refund customers based on their credibility. Conversely, short 款 means less funding for institutions, and the funds may not be recovered. Therefore, in business process design, deduction should be done first.

State Definition Language (Seata State Language)

  1. Define the service call process through a state diagram and generate a JSON state language definition file.

  2. In the state diagram, a node can be a service call, and the node can configure its compensating node.

  3. The JSON state diagram is driven by the state machine engine. When an exception occurs, the state engine executes the compensating node corresponding to the successfully executed node to roll back the transaction.

    Note: Whether to compensate when an exception occurs can also be user-defined.

  4. It can meet service orchestration requirements, supporting one-way selection, concurrency, asynchronous, sub-state machine, parameter conversion, parameter mapping, service execution status judgment, exception capture, and other functions.

Assuming a business process calls two services, deducting inventory (InventoryService) and deducting balance (BalanceService), to ensure that in a distributed scenario, either both succeed or both roll back. Both participant services have a reduce method for inventory deduction or balance deduction, and a compensateReduce method for compensating deduction operations. Let's take a look at the interface definition of InventoryService:

public interface InventoryService {

/**
* reduce
* @param businessKey
* @param amount
* @param params
* @return
*/
boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);

/**
* compensateReduce
* @param businessKey
* @param params
* @return
*/
boolean compensateReduce(String businessKey, Map<String, Object> params);
}

This is the state diagram corresponding to the business process:

Example State Diagram
Corresponding JSON

{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]", "$.[count]"],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
}
],
"Default": "Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException": "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": ["java.lang.Throwable"],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": ["$.[businessKey]"]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}

This is the state language to some extent referring to AWS Step Functions[7].

Introduction to "State Machine" Attributes:

  • Name: Represents the name of the state machine, must be unique;
  • Comment: Description of the state machine;
  • Version: Version of the state machine definition;
  • StartState: The first "state" to run when starting;
  • States: List of states, a map structure, where the key is the name of the "state," which must be unique within the state machine;

Introduction to "State" Attributes:

  • Type: The type of the "state," such as:
    • ServiceTask: Executes the service task;
    • Choice: Single conditional choice route;
    • CompensationTrigger: Triggers the compensation process;
    • Succeed: Normal end of the state machine;
    • Fail: Exceptional end of the state machine;
    • SubStateMachine: Calls a sub-state machine;
  • ServiceName: Service name, usually the beanId of the service;
  • ServiceMethod: Service method name;
  • CompensateState: Compensatory "state" for this state;
  • Input: List of input parameters for the service call, an array corresponding to the parameter list of the service method, $. represents using an expression to retrieve parameters from the state machine context. The expression uses SpringEL[8], and if it is a constant, write the value directly;
  • Output: Assigns the parameters returned by the service to the state machine context, a map structure, where the key is the key when placing it in the state machine context (the state machine context is also a map), and the value uses $. as a SpringEL expression, indicating the value is taken from the return parameters of the service, #root represents the entire return parameters of the service;
  • Status: Mapping of the service execution status, the framework defines three statuses, SU success, FA failure, UN unknown. We need to map the execution status of the service into these three statuses, helping the framework judge the overall consistency of the transaction. It is a map structure, where the key is a condition expression, usually based on the return value of the service or the exception thrown for judgment. The default is a SpringEL expression to judge the return parameters of the service. Those starting with $Exception{ indicate judging the exception type, and the value is mapped to this value when this condition expression is true;
  • Catch: Route after catching an exception;
  • Next: The next "state" to execute after the service is completed;
  • Choices: List of optional branches in the Choice type "state," where Expression is a SpringEL expression, and Next is the next "state" to execute when the expression is true;
  • ErrorCode: Error code for the Fail type "state";
  • Message: Error message for the Fail type "state";

For more detailed explanations of the state language, please refer to Seata Saga Official Documentation[6http://seata.io/zh-cn/docs/user/saga.html].

State Machine Engine Principle:

State Machine Engine Principle

  • The state diagram in the image first executes stateA, then executes stateB, and then executes stateC;
  • The execution of "states" is based on an event-driven model. After stateA is executed, a routing message is generated and placed in the EventQueue. The event consumer takes the message from the EventQueue and executes stateB;
  • When the entire state machine is started, Seata Server is called to start a distributed transaction, and the xid is generated. Then, the start event of the "state machine instance" is recorded in the local database;
  • When a "state" is executed, Seata Server is called to register a branch transaction, and the branchId is generated. Then, the start event of the "state instance" is recorded in the local database;
  • After a "state" is executed, the end event of the "state instance" is recorded in the local database, and Seata Server is called to report the status of the branch transaction;
  • When the entire state machine is executed, the completion event of the "state machine instance" is recorded in the local database, and Seata Server is called to commit or roll back the distributed transaction;

Design of State Machine Engine:

Design of State Machine Engine

The design of the state machine engine is mainly divided into three layers, with the upper layer depending on the lower layer. From bottom to top, they are:

  • Eventing Layer:

    • Implements an event-driven architecture that can push events and be consumed by a consumer. This layer does not care about what the event is or what the consumer executes; it is implemented by the upper layer.
  • ProcessController Layer:

    • Driven by the above Eventing to execute a "empty" process. The behavior and routing of "states" are not implemented. It is implemented by the upper layer.

      Based on the above two layers, theoretically, any "process" engine can be customly extended. The design of these two layers is based on the internal design of the financial network platform.

  • StateMachineEngine Layer:

    • Implements the behavior and routing logic of each type of state in the state machine engine;
    • Provides API and state machine language repository;

Practical Experience in Service Design under Saga Mode

Below are some practical experiences summarized in the design of microservices under Saga mode. Of course, these are recommended practices, not necessarily to be followed 100%. There are "workaround" solutions even if not followed.

Good news: Seata Saga mode has no specific requirements for the interface parameters of microservices, making Saga mode suitable for integrating legacy systems or services from external institutions.

Allow Empty Compensation

  • Empty Compensation: The original service was not executed, but the compensation service was executed;
  • Reasons:
    • Timeout (packet loss) of the original service;
    • Saga transaction triggers a rollback;
    • The request of the original service is not received, but the compensation request is received first;

Therefore, when designing services, it is necessary to allow empty compensation, that is, if the business primary key to be compensated is not found, return compensation success and record the original business primary key.

Hang Prevention Control

  • Hang: Compensation service is executed before the original service;
  • Reasons:
    • Timeout (congestion) of the original service;
    • Saga transaction rollback triggers a rollback;
    • Congested original service arrives;

Therefore, check whether the current business primary key already exists in the business primary keys recorded by empty compensation. If it exists, reject the execution of the service.

Idempotent Control

  • Both the original service and the compensation service need to ensure idempotence. Due to possible network timeouts, a retry strategy can be set. When a retry occurs, idempotent control should be used to avoid duplicate updates to business data.

Summary

Many times, we don't need to emphasize strong consistency. We design more resilient systems based on the BASE and Saga theories to achieve better performance and fault tolerance in distributed architecture. There is no silver bullet in distributed architecture, only solutions suitable for specific scenarios. In fact, Seata Saga is a product with the capabilities of "service orchestration" and "Saga distributed transactions." Summarizing, its applicable scenarios are:

  • Suitable for handling "long transactions" in a microservices architecture;
  • Suitable for "service orchestration" requirements in a microservices architecture;
  • Suitable for business systems with a large number of composite services above the financial core system (such as systems in the channel layer, product layer, integration layer);
  • Suitable for scenarios where integration with services provided by legacy systems or external institutions is required (these services are immutable and cannot be required to be modified).

Related Links Mentioned in the Article

[1]https://github.com/aphyr/dist-sagas/blob/master/sagas.pdf
[2]http://microservices.io/patterns/data/saga.html
[3]Microprofile 的 LRAhttps://github.com/eclipse/microprofile-sandbox/tree/master/proposals/0009-LRA
[4]Eventuate Tram Sagahttps://github.com/eventuate-tram/eventuate-tram-sagas
[5]ServiceComb Sagahttps://github.com/apache/servicecomb-pack
[6]Seata Saga 官网文档http://seata.io/zh-cn/docs/user/saga.html
[7]AWS Step Functionshttps://docs.aws.amazon.com/zh_cn/step-functions/latest/dg/tutorial-creating-lambda-state-machine.html
[8]SpringELhttps://docs.spring.io/spring/docs/4.3.10.RELEASE/spring-framework-reference/html/expressions.html

· 19 min read

Author: Yi Yuan (Chen Long), Ant Gold Services distributed transaction framework core development.
This article is based on the topic of "Distributed Transaction Seata and its Three Patterns" shared at SOFA Meetup#3 on 11 August in Guangzhou, focusing on the background and theoretical foundation of distributed transaction, as well as the principle of Seata distributed transaction and the implementation of distributed transaction in three patterns (AT, TCC, and Saga).

The video and PPT are at the end of this article.

3 Distributed Transaction Seata Three Modes Explained-Eiyuan.jpg

I. Background of the emergence of distributed transactions

1.1 Distributed Architecture Evolution - Horizontal Splitting of Database

AntGold's business database was initially a single database with a single table, but with the rapid development of the business data scale, the data volume is getting bigger and bigger, and the single database with a single table is gradually becoming a bottleneck. So we split the database horizontally, splitting the original single database and single table into database slices.

As shown in the figure below, after splitting the database and table, the original write operation that can be completed on a database may be across multiple databases, which gives rise to cross-database transaction problems.

image.png

1.2 Distributed Architecture Evolution - Business Service Splitting

In the early stage of business development, the single business system architecture of "one piece of cake" can meet the basic business needs. However, with the rapid development of the business, the system's access and business complexity are growing rapidly, single-system architecture has gradually become the bottleneck of business development, to solve the problem of high coupling and scalability of the business system demand is becoming stronger and stronger.

As shown in the figure below, Ant Financial Services splits the single business system into multiple business systems in accordance with the design principles of Service Oriented Architecture (SOA), which reduces the coupling between the systems and enables different business systems to focus on their own business, which is more conducive to the development of the business and the scaling of the system capacity.

image.png

After the business system is split according to services, a complete business often needs to call multiple services, how to ensure data consistency between multiple services becomes a difficult problem.

II. Theoretical foundation of distributed transaction

2.1 Two-stage commit protocols

16_16_18__08_13_2019.jpg

Two phase commit protocol: transaction manager coordinates resource manager in two phases, the first phase prepares resources, that is, reserve the resources needed for the transaction, if every resource manager resource reservation succeeds, the second phase resource commit is performed, otherwise the coordinated resource manager rolls back the resources.

2.2 TCC

16_16_51__08_13_2019.jpg

TCC (Try-Confirm-Cancel) is actually a two-phase commit protocol for servitisation, business developers need to implement these three service interfaces, the first phase of the service is choreographed by the business code to call the Try interface for resource reservation, the Try interface for all participants is successful, the transaction manager will commit the transaction and call the Confirm interface for each participant The transaction manager will commit the transaction and call the Confirm interface of each participant to actually commit the business operation, otherwise the Cancel interface of each participant will be called to rollback the transaction.

2.3 Saga

3 Distributed Transactions Seata Three Patterns Explained - Yi Yuan-9.jpg

Saga is a compensation protocol. In Saga mode, there are multiple participants within a distributed transaction, and each participant is an offsetting compensation service that requires the user to implement its forward and reverse rollback operations according to the business scenario.

During the execution of a distributed transaction, the forward operations of each participant are executed sequentially, and if all forward operations are executed successfully, the distributed transaction commits. If any of the forward operations fails, the distributed transaction backs out and performs a reverse rollback on the previous participants, rolling back the committed participants and returning the distributed transaction to its initial state.

Saga theory is from the paper Sagas published by Hector & Kenneth in 1987.

Saga Positive Service and Compensation Service also need to be implemented by business developers.

III. Seata and its three patterns explained in detail

3.1 Distributed transaction Seata introduction

Seata (Simple Extensible Autonomous Transaction Architecture) is a distributed transaction solution jointly open-sourced by Ant Financial Services and Alibaba in January 2019.Seata has been open-sourced for about half a year, and currently has more than 11,000 stars. Seata has been open source for about half a year, and now has more than 11,000 stars and a very active community. We warmly welcome you to participate in the Seata community construction, together will Seata become the open source distributed transaction benchmark product.

Seata: https://[github.com/apache/incubator-seata](https://github.com/apache/incubator -seata)

image.png

3.2 Distributed Transactions Seata Product Module

As shown in the figure below, there are three major modules in Seata, namely TM, RM and TC. TM and RM are integrated with the business system as clients of Seata, and TC is deployed independently as the server of Seata.

TC is deployed independently as a Seata server. image.png

The execution flow of a distributed transaction in Seata:

  • TM opens distributed transaction (TM registers global transaction record with TC);
  • According to the business scenario, arrange the resources in the transaction such as database and service (RM reports the resource readiness status to TC);
  • TM ends the distributed transaction, and the transaction phase ends (TM notifies TC to commit/rollback the distributed transaction);
  • TC aggregates the transaction information and decides whether the distributed transaction should be committed or rolled back;
  • TC notifies all RMs to commit/rollback resources, transaction phase 2 ends;

3.3 Distributed Transactions Seata Solution

Seata has four distributed transaction solutions, AT mode, TCC mode, Saga mode and XA mode.

15_49_23__08_13_2019.jpg

2.3.1 AT Mode

In January, Seata open sourced AT Mode, a non-intrusive distributed transaction solution. In AT mode, users only need to focus on their own "business SQL", the user's "business SQL" as a phase, Seata framework will automatically generate the transaction of the two-phase commit and rollback operations.

image.png

How the AT model is non-intrusive to business :
  • Phase I:

In phase 1, Seata intercepts the "business SQL", first parses the semantics of the SQL, finds the business data to be updated by the "business SQL", and then saves it as a "before image" before updating the business data. Before the business data is updated, it will save it as "before image", then execute "business SQL" to update the business data, and after the business data is updated, it will save it as "after image", and finally generate row locks. The above operations are all done within a single database transaction, which ensures the atomicity of one phase of operation.

This ensures the atomicity of a phase of operations. image3.png

  • Second-phase commit:

If the second phase is a commit, since the "business SQL" has already been committed to the database in the first phase, the Seata framework only needs to delete the snapshot data and row locks saved in the first phase to complete the data cleanup.

image 4.png

  • Phase 2 rollback:

If the second phase is a rollback, Seata needs to rollback the "business SQL" that has been executed in the first phase to restore the business data. The way to rollback is to use "before image" to restore the business data; however, before restoring, we must first verify the dirty writing, compare the "current business data in the database" and the "after image", if the two data are not in the same state, then we will use the "after image" to restore the business data. However, before restoring, we should first check the dirty writing, compare the "current business data in database" and "after image", if the two data are completely consistent, it means there is no dirty writing, and we can restore the business data, if it is inconsistent, it means there is dirty writing, and we need to transfer the dirty writing to manual processing.

image 5.png

AT mode one phase, two phase commit and rollback are automatically generated by Seata framework, user only need to write "business SQL", then can easily access distributed transaction, AT mode is a kind of distributed transaction solution without any intrusion to business.

2.3.2 TCC Mode

In March 2019, Seata open-sourced the TCC pattern, which is contributed by Ant Gold. the TCC pattern requires users to implement Try, Confirm and Cancel operations according to their business scenarios; the transaction initiator executes the Try method in the first stage, the Confirm method in the second-stage commit, and the Cancel method in the second-stage rollback.

The transaction initiator performs Try in the first stage, Confirm in the second stage, and Cancel in the second stage. image 6.png

TCC Three method descriptions:

  • Try: detection and reservation of resources;
  • Confirm: the execution of the business operation submitted; require Try success Confirm must be successful;
  • Cancel: the release of the reserved resources;

Ant Gold's practical experience in TCC
**
16_48_02__08_13_2019.jpg

1 TCC Design - Business model is designed in 2 phases:

The most important thing for users to consider when accessing TCC is how to split their business model into two phases.

Take the "debit" scenario as an example, before accessing TCC, the debit of account A can be completed with a single SQL for updating the account balance; however, after accessing TCC, the user needs to consider how to split the original one-step debit operation into two phases and implement it into three methods, and to ensure that the first-phase Try will be successful and the second-phase Confirm will be successful if Try is successful. If Try succeeds in the first stage, Confirm will definitely succeed in the second stage.

image 7.png

As shown above, the

Try method as a one-stage preparation method needs to do resource checking and reservation. In the deduction scenario, what Try has to do is to check whether the account balance is sufficient and reserve funds for transfer, and the way to reserve is to freeze the transfer funds of account A. After the execution of the Try method, although the balance of account A is still 100, but $30 of it has been frozen and cannot be used by other transactions.

The second stage, the Confirm method, performs the real debit operation; Confirm will use the funds frozen in the Try stage to perform the debit operation; after the Confirm method is executed, the $30 frozen in the first stage has been deducted from account A, and the balance of account A becomes $70.

If the second stage is a rollback, you need to release the $30 frozen in the first stage of Try in the Cancel method, so that account A is back to the initial state, and all $100 is available.

The most important thing for users to access TCC mode is to consider how to split the business model into 2 phases, implement it into 3 methods of TCC, and ensure that Try succeeds and Confirm succeeds. Compared to AT mode, TCC mode is somewhat intrusive to the business code, but TCC mode does not have the global line locks of AT mode, and the performance of TCC will be much higher than AT mode.

2 TCC Design - Allow Null Rollback:
**
16_51_44__08_13_2019.jpg

The Cancel interface needs to be designed to allow null rollbacks. When the Try interface is not received due to packet loss, the transaction manager triggers a rollback, which triggers the Cancel interface, which needs to return to the success of the rollback when it finds that there is no corresponding transaction xid or primary key during the execution of Cancel. If the transaction service manager thinks it has been rolled back, otherwise it will keep retrying, and Cancel has no corresponding business data to roll back.

3 TCC Design - Anti-Suspension Control:
**
16_51_56__08_13_2019.jpg

The implication of the suspension is that the Cancel is executed before the Try interface, which occurs because the Try times out due to network congestion, the transaction manager generates a rollback that triggers the Cancel interface, and the Try interface call is eventually received, but the Cancel arrives before the Try. According to the previous logic of allowing empty rollback, the rollback will return successfully, the transaction manager thinks the transaction has been rolled back successfully, then the Try interface should not be executed at this time, otherwise it will generate data inconsistency, so we record the transaction xid or business key before the Cancel empty rollback returns successfully, marking this record has been rolled back, the Try interface checks the transaction xid or business key first. The Try interface first checks the transaction xid or business key to identify that the record has been rolled back, and then does not perform the business operation of Try if it has already been marked as rolled back successfully.

4 TCC Design - Power Control:
**
16_52_07__08_13_2019.jpg

Idempotence means that for the same system, using the same conditions, a single request and repeated multiple requests have the same impact on system resources. Because network jitter or congestion may timeout, transaction manager will retry operation on resources, so it is very likely that a business operation will be called repeatedly, in order not to occupy resources many times because of repeated calls, it is necessary to control idempotency when designing the service, usually we can use the transaction xid or the business primary key to judge the weight to control.

2.3.3 Saga Patterns

Saga mode is Seata's upcoming open source solution for long transactions, which will be mainly contributed by Ant Gold. In Saga mode, there are multiple participants within a distributed transaction, and each participant is an offsetting compensation service that requires users to implement its forward and reverse rollback operations according to business scenarios.

During the execution of a distributed transaction, the forward operations of each participant are executed sequentially, and if all forward operations are executed successfully, the distributed transaction commits. If any of the forward operations fails, the distributed transaction will go back and execute the reverse rollback operations of the previous participants to roll back the committed participants and bring the distributed transaction back to the initial state.

image 8.png

Saga Pattern Distributed transactions are usually event-driven and executed asynchronously between the various participants, Saga Pattern is a long transaction solution.

1 Saga pattern usage scenario
**
16_44_58__08_13_2019.jpg

Saga pattern is suitable for business systems with long business processes and the need to ensure the final consistency of transactions. Saga pattern commits local transactions at one stage, and performance can be guaranteed in the case of lock-free and long processes.

Transaction participants may be services from other companies or legacy systems that cannot be transformed and provide the interfaces required by TCC, and can use the Saga pattern.

The advantages of the Saga pattern are:

  • One-stage commit of local database transactions, lock-free, high performance;
  • Participants can use transaction-driven asynchronous execution, high throughput;
  • The compensation service is the "reverse" of the forward service, which is easy to understand and implement;

Disadvantages: The Saga pattern does not guarantee isolation because the local database transaction has already been committed in the first phase and no "reservation" action has been performed. Later we will talk about the lack of isolation of the countermeasures.
2 Saga implementation based on a state machine engine*
2 Saga implementation based on a state machine engine*
**3

17_13_19__08_13_2019.jpg

Currently there are generally two types of Saga implementations, one is achieved through event-driven architecture, and the other is based on annotations plus interceptors to intercept the business of the positive service implementation.Seata is currently implemented using an event-driven mechanism, Seata implements a state machine, which can orchestrate the call flow of the service and the compensation service of the positive service, generating a state diagram defined by a json file, and the state machine The state machine engine is driven to the operation of this map, when an exception occurs, the state machine triggers a rollback and executes the compensation services one by one. Of course, it is up to the user to decide when to trigger the rollback. The state machine can achieve the needs of service orchestration, it supports single selection, concurrency, asynchrony, sub-state machine call, parameter conversion, parameter mapping, service execution state judgement, exception catching and other functions.

3 State Machine Engine Principles

16_45_32__08_13_2019.jpg

The basic principle of this state machine engine is that it is based on an event-driven architecture, where each step is executed asynchronously, and steps flow through an event queue between steps,
greatly improving system throughput. Transaction logs are recorded at the time of execution of each step for use when rolling back in the event of an exception. Transaction logs are recorded in the database where the business tables are located to improve performance.

**4 State Machine Engine Design

16_45_46__08_13_2019.jpg

The state machine engine is divided into a three-tier architecture design, the bottom layer is the "event-driven" layer, the implementation of the EventBus and the consumption of events in the thread pool, is a Pub-Sub architecture. The second layer is the "process controller" layer, which implements a minimalist process engine framework that drives an "empty" process execution. node does, it just executes the process method of each node and then executes the route method to flow to the next node. This is a generic framework, based on these two layers, developers can implement any process engine. The top layer is the "state machine engine" layer, which implements the "behaviour" and "route" logic code of each state node, provides APIs and statechart repositories, and has some other components, such as expression languages, logic languages, and so on. There are also a number of other components, such as expression languages, logic calculators, flow generators, interceptors, configuration management, transaction logging, and so on.

5 The Saga Service Design Experience

Similar to TCC, Saga's forward and reverse services need to follow the following design principles:

1) Saga Service Design - Allow Null Compensation
**
16_52_22__08_13_2019.jpg

2) Saga Service Design - Anti-Suspension Control
**
16_52_52__08_13_2019.jpg

3) Saga Service Design - Power Control
**
3 Distributed Transactions Seata Three Patterns Explained - Yi Yuan-31.jpg

4) Saga Design - Custom Transaction Recovery Strategies
**
16_53_07__08_13_2019.jpg

As mentioned earlier, the Saga pattern does not guarantee transaction isolation, and dirty writes can occur in extreme cases. For example, in the case of a distributed transaction is not committed, the data of the previous service was modified, and the service behind the anomaly needs to be rolled back, may not be able to compensate for the operation due to the data of the previous service was modified. One way to deal with this situation is to "retry" and continue forward to complete the distributed transaction. Since the entire business process is arranged by the state machine, even after the recovery can continue to retry. So you can configure the transaction policy of the process according to the business characteristics, whether to give priority to "rollback" or "retry", when the transaction timeout, the Server side will continue to retry according to this policy.

Since Saga does not guarantee isolation, we need to achieve the principle of "long money rather than short money" in business design. Long money refers to the situation when there is a mistake and the money is too much from our point of view, and the money is too little, because if the money is too long, we can refund the money to the customer, but if it is too short, the money may not be recovered, which means that in the business design, we must give priority to "rollback" or "retry". That is, when the business is designed, it must be deducted from the customer's account before crediting the account, and if the override update is caused by the isolation problem, there will not be a case of less money.

6 Annotation and Interceptor Based Saga Implementation
**
17_13_37__08_13_2019.jpg

There is another implementation of Saga that is based on annotations + interceptors, which Seata does not currently implement. You can look at the pseudo-code above to understand it, the @SagaCompensable annotation is defined on the one method, and the compensation method used to define the one method is the compensateOne method. Then the @SagaTransactional annotation is defined on the processA method of the business process code, which starts a Saga distributed transaction, intercepts each forward method with an interceptor, and triggers a rollback operation when an exception occurs, calling the compensation method of the forward method.

**7 Comparison of Advantages and Disadvantages of the Two Saga Implementations

The following table compares the advantages and disadvantages of the two Saga implementations:

17_13_49__08_13_2019.jpg

The biggest advantage of the state machine engine is that it can be executed asynchronously through an event-driven approach to improve system throughput, service scheduling requirements can be achieved, and in the absence of isolation in the Saga model, there can be an additional "retry forward" strategy to recover from things. The biggest advantage of annotations and interceptors is that they are easy to develop and low cost to learn.

Summary

This article first reviewed the background and theoretical basis of distributed transactions, and then focused on the principles of Seata distributed transactions and three patterns (AT, TCC, Saga) of distributed transaction implementation.

Seata's positioning is a full-scenario solution for distributed transactions, and in the future there will also be XA mode of distributed transaction implementation, each mode has its own application scenarios, AT mode is a non-intrusive distributed transaction solution for scenes that do not want to transform the business, with almost zero learning cost. TCC mode is a high-performance distributed transaction solution for core systems and other scenes that have a high demand for performance. Saga mode is a long transaction solution for business systems that have long business processes and need to ensure the ultimate consistency of transactions. Saga mode submits local transactions at one stage, with no locks, and can ensure performance in the case of long processes, and is mostly used in the channel layer and integration layer of business systems. Transaction participants may be services from other companies or legacy systems that can't be transformed to provide the interfaces required by TCC, Saga mode can also be used.

The video review and PPT of this sharing can be viewed at: [https://tech.antfin.com/community/activities/779/review](https://tech.antfin.com/community/activities/779/ review)