Seata事务隔离
本文目标:帮助用户明白使用Seata AT模式时,该如何正确实现事务隔离,防止脏读脏写。
希望读者在阅读本文前,已阅读过seata官网中对AT模式的介绍,并且对数据库本地锁有所了解
(例如,两个事务同时在对同一条记录做update时,只有拿到record lock的事务才能更新成功,另一个事务在record lock未释放前只能等待,直到事务超时)
首先请看这样的一段代码,尽管看着“初级”,但持久层框架实际上帮我们做的主要事情也就这样。
@Service
public class StorageService {
@Autowired
private DataSource dataSource;
@GlobalTransactional
public void batchUpdate() throws SQLException {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String sql = "update storage_tbl set count = ?" +
" where id = ? and commodity_code = ?";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 100);
preparedStatement.setLong(2, 1);
preparedStatement.setString(3, "2001");
preparedStatement.executeUpdate();
connection.commit();
} catch (Exception e) {
throw e;
} finally {
IOutils.close(preparedStatement);
IOutils.close(connection);
}
}
}
从代理数据源说起
使用AT模式,最重要的事情便是代理数据源,那么用DataSourceProxy
代理数据源有什么作用呢?
DataSourceProxy能帮助我们获得几个重要的代理对象
-
通过
DataSourceProxy.getConnection()
获得ConnectionProxy
-
通过
ConnectionProxy.prepareStatement(...)
获得StatementProxy
Seata的如何实现事务隔离,就藏在这2个Proxy中,我先概述下实现逻辑。
StatementProxy.executeXXX()
的处理逻辑
-
当调用
io.seata.rm.datasource.StatementProxy.executeXXX()
会将sql交给io.seata.rm.datasource.exec.ExecuteTemplate.execute(...)
处理。ExecuteTemplate.execute(...)
方法中,Seata根据不同dbType和sql语句类型使用不同的Executer,调用io.seata.rm.datasource.exec.Executer
类的execute(Object... args)
。- 如果选了DML类型Executer,主要做了以下事情:
- 查询前镜像(select for update,因此此时获得本地锁)
- 执行业务sql
- 查询后镜像
- 准备undoLog
- 如果你的sql是select for update则会使用
SelectForUpdateExecutor
(Seata代理了select for update),代理后处理的逻辑是这样的:- 先执行 select for update(获取数据库本地锁)
- 如果处于
@GlobalTransactional
or@GlobalLock
,检查是否有全局锁 - 如果有全局锁,则未开启本地事务下会rollback本地事务,再重新争抢本地锁和全局锁,以此类推,除非拿到全局锁
ConnectionProxy.commit()
的处理逻辑
- 处于全局事务中(即,数据持久化方法带有
@GlobalTransactional
)- 注册分支事务,获取全局锁
- undoLog数据入库
- 让数据库commit本次事务
- 处于
@GlobalLock
中(即,数据持久化方法带有@GlobalLock
)- 向tc查询是否有全局锁存在,如存在,则抛出异常
- 让数据库commit本次事务
- 除了以上情况(
else
分支)- 让数据库commit本次事务
@GlobalTransactional的作用
标识一个全局事务
@GlobalLock + select for update的作用
如果像updateA()
方法带有@GlobalLock + select for update
,Seata在处理时,会先获取数据库本地锁,然后查询该记录是否有全局锁存在,若有,则抛出LockConflictException。
先举一个脏写的例子,再来看Seata如何防止脏写
假设你的业务代码是这样的:
updateAll()
用来同时更新A和B表记录,updateA()
updateB()
则分别更新A、B表记录updateAll()
已经加上了@GlobalTransactional
class YourBussinessService {
DbServiceA serviceA;
DbServiceB serviceB;
@GlobalTransactional
public boolean updateAll(DTO dto) {
serviceA.update(dto.getA());
serviceB.update(dto.getB());
}
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
class DbServiceA {
@Transactional
public boolean update(A a) {
}
}
|
怎么用Seata防止脏写?
办法一:updateA()
也加上@GlobalTransactional
,此时Seata会如何保证事务隔离?
class DbServiceA {
@GlobalTransactional
@Transactional
public boolean updateA(DTO dto) {
serviceA.update(dto.getA());
}
}
updateAll()
先被调用(未完成),updateA()
后被调用
办法二: @GlobalLock + select for update
class DbServiceA {
@GlobalLock
@Transactional
public boolean updateA(DTO dto) {
serviceA.selectForUpdate(dto.getA());
serviceA.update(dto.getA());
}
}
-
updateAll()
先被调用 (未完成),updateA()
后被调用 -
那如果是
updateA()
先被调用(未完成),updateAll()
后被调用呢?
由于2个业务都是要先获得本地锁,因此同样不会发生脏写 -
一定有人会问,“这里为什么要加上select for update? 只用@GlobalLock能不能防止脏写?” 能。但请再回看下上面的图,select for update能带来这么几个好处:
- 锁冲突更“温柔”些。如果只有@GlobalLock,检查到全局锁,则立刻抛出异常,也许再“坚持”那么一下,全局锁就释放了,抛出异常岂不可惜了。
- 在
updateA()
中可以通过select for update获得最新的A,接着再做更新。
如何防止脏读?
场景: 某业务先调用updateAll()
,updateAll()
未执行完成,另一业务后调用queryA()
源码展示
@Service
public class StorageService {
@Autowired
private DataSource dataSource;
@GlobalTransactional
public void update() throws SQLException {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String sql = "update storage_tbl set count = ?" +
" where id = ? and commodity_code = ?";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 100);
preparedStatement.setLong(2, 1);
preparedStatement.setString(3, "2001");
preparedStatement.execute();
connection.commit();
} catch (Exception e) {
throw e;
} finally {
IOutils.close(preparedStatement);
IOutils.close(connection);
}
}
}
这段代码虽然看着很初级,没有使用持久层框架,但如果将框架帮我们做的事情抽象出来,其实也就是上面这段代码。
简单说明接下来源码介绍的脉络(主要关注和事务隔离有关的源码)
- 代理数据源的用途
DataSourceProxy
的作用(返回ConnectionProxy
)- 介绍
ConnectionProxy
的一个小功能(存放undolog)
- 介绍
ConnectionProxy
的作用(返回StatementProxy
)StatementProxy.execute()
的处理逻辑io.seata.rm.datasource.exec.UpdateExecutor
的执行逻辑(查前镜像、执行sql、查后镜像、准备undoLog)SelectForUpdateExecutor
的执行逻辑(争本地锁,查全局锁。有全局锁,回滚,再争...)
ConnectionProxy.commit()
的处理逻辑(注册分支事务(争全局锁),写入undoLog,数据库 提交)
- 介绍RootContext
GlobalTransactionalInterceptor
的不同代理逻辑- 带有
@GlobalTransactional
如何处理 - 带有
@GlobalLock
如何处理
- 带有
DataSourceProxy的作用
DataSourceProxy帮助我们获得几个重要的代理对象
- 通过
DataSourceProxy.getConnection()
获得ConnectionProxy
package io.seata.rm.datasource;
import java.sql.Connection;
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
}- 现在先介绍下
ConnectionProxy
中的ConnectionContext
,它的有一个功能是存放undoLog。package io.seata.rm.datasource;
import io.seata.rm.datasource.undo.SQLUndoLog;
public class ConnectionProxy extends AbstractConnectionProxy {
private ConnectionContext context = new ConnectionContext();
public void appendUndoLog(SQLUndoLog sqlUndoLog) {
context.appendUndoItem(sqlUndoLog);
}
}package io.seata.rm.datasource;
public class ConnectionContext {
private static final Savepoint DEFAULT_SAVEPOINT = new Savepoint() {
@Override
public int getSavepointId() throws SQLException {
return 0;
}
@Override
public String getSavepointName() throws SQLException {
return "DEFAULT_SEATA_SAVEPOINT";
}
};
private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();
private Savepoint currentSavepoint = DEFAULT_SAVEPOINT;
void appendUndoItem(SQLUndoLog sqlUndoLog) {
sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>()).add(sqlUndoLog);
}
}
- 现在先介绍下
通过ConnectionProxy.prepareStatement(...)
获得StatementProxy
package io.seata.rm.datasource;
public class ConnectionProxy extends AbstractConnectionProxy {
public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
super(dataSourceProxy, targetConnection);
}
}
package io.seata.rm.datasource;
import java.sql.Connection;
public abstract class AbstractConnectionProxy implements Connection {
protected Connection targetConnection;
public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
this.dataSourceProxy = dataSourceProxy;
this.targetConnection = targetConnection;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
if (BranchType.AT == RootContext.getBranchType()) { //为什么这里会返回AT?
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
// 如果是insert语句,这里创建的PreparedStatement需要可以返回自动生成的主键,因此使用这个prepareStatement()
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
public Connection getTargetConnection() {
return targetConnection;
}
}
先在这打下个疑问,后边解释。
RootContext.getBranchType()
的返回值怎么会是AT?
StatementProxy.execute()
的处理逻辑
-
当调用
io.seata.rm.datasource.StatementProxy.execute()
会将sql交给io.seata.rm.datasource.exec.ExecuteTemplate.execute(...)
处理。package io.seata.rm.datasource;
public class PreparedStatementProxy extends AbstractPreparedStatementProxy
implements PreparedStatement, ParametersHolder {
@Override
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
}-
ExecuteTemplate.execute(...)
方法中,Seata根据不同dbType和sql语句类型使用不同的Executer,调用io.seata.rm.datasource.exec.Executer
类的execute(Object... args)
。package io.seata.rm.datasource.exec;
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
}也在这打下个疑问,后边解释。
RootContext.requireGlobalLock()
怎么判断当前是否需要全局锁?先以
io.seata.rm.datasource.exec.UpdateExecutor
举例,UpdateExecutor
extendsAbstractDMLBaseExecutor
extendsBaseTransactionalExecutor
。 观察execute()
方法的做了什么package io.seata.rm.datasource.exec;
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
protected StatementProxy<S> statementProxy;
protected StatementCallback<T, S> statementCallback;
protected SQLRecognizer sqlRecognizer;
public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
this.statementProxy = statementProxy;
this.statementCallback = statementCallback;
this.sqlRecognizer = sqlRecognizer;
}
@Override
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if (xid != null) {
statementProxy.getConnectionProxy().bind(xid);
}
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.changeAutoCommit(); // 注意,你如果没开启事务,seata帮你开启
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit(); // 帮你开启事务后,通过connectionProxy来提交
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
}package io.seata.rm.datasource.exec;
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
} -
如果选了DML类型Executer,可以在上面的executeAutoCommitFalse()中看到,主要做了以下事情:
-
查询前镜像(select for update,因此此时获得本地锁)
package io.seata.rm.datasource.exec;
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS); // 默认为true
@Override
protected TableRecords beforeImage() throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
TableMeta tmeta = getTableMeta();
String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
// SELECT id, count FROM storage_tbl WHERE id = ? FOR UPDATE
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(WHERE).append(whereCondition);
}
String orderBy = recognizer.getOrderBy();
if (StringUtils.isNotBlank(orderBy)) {
suffix.append(orderBy);
}
ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
if (StringUtils.isNotBlank(limit)) {
suffix.append(limit);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(updateColumns)) {// 如果本次更新的行不包含主键,那select for update的时候加上主键
selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoin.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoin.toString();
}
protected TableRecords buildTableRecords(TableMeta tableMeta, String selectSQL, ArrayList<List<Object>> paramAppenderList) throws SQLException {
ResultSet rs = null;
try (PreparedStatement ps = statementProxy.getConnection().prepareStatement(selectSQL)) { // 执行select for update,然后就拿到了本地锁
if (CollectionUtils.isNotEmpty(paramAppenderList)) {
for (int i = 0, ts = paramAppenderList.size(); i < ts; i++) {
List<Object> paramAppender = paramAppenderList.get(i);
for (int j = 0, ds = paramAppender.size(); j < ds; j++) {
ps.setObject(i * ds + j + 1, paramAppender.get(j));
}
}
}
rs = ps.executeQuery();
return TableRecords.buildRecords(tableMeta, rs);
} finally {
IOUtil.close(rs);
}
}
} -
执行业务sql
-
查询后镜像
package io.seata.rm.datasource.exec;
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
//SELECT id, count FROM storage_tbl WHERE (id) in ( (?) )
ResultSet rs = null;
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
return TableRecords.buildRecords(tmeta, rs);
} finally {
IOUtil.close(rs);
}
}
}
-
-