Commit db94ff19 by SunWei峰

划水的一天,事务部分初步完成

parent f1543be3
......@@ -264,6 +264,7 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
Connection con = null;
try {
// 如果当前线程中的数据库连接不存在,或者事务同步为true的情况下需要重新获取数据库连接,进行同步
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
......@@ -272,10 +273,10 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 将同步标识设置为 true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 设置事务隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
......@@ -283,6 +284,7 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 更改自动提交,将数据库的自动提交改为 Spring 来控制,否则数据库执行结束sql后自动提交了
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
......@@ -290,16 +292,19 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
}
con.setAutoCommit(false);
}
// 准备事务连接,这里实际上执行了 SET TRANSACTION READ ONLY 的 sql 语句
prepareTransactionalConnection(con, definition);
// 设置当前线程已经存在事务,这个 transactionActive 属性是判断是否当前线程存在事务的依据
txObject.getConnectionHolder().setTransactionActive(true);
// 设置超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
// 如果是新的连接,则绑定数据库连接到当前线程
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
......@@ -334,6 +339,7 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
// 调用commit 方法
con.commit();
}
catch (SQLException ex) {
......@@ -349,6 +355,7 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
// 直接调用rollback方法 回滚
con.rollback();
}
catch (SQLException ex) {
......@@ -372,15 +379,19 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
// 将数据库连接从当前线程中解除绑定
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// Reset connection.
// 释放连接
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
// 恢复数据库连接自动提交属性
con.setAutoCommit(true);
}
// 重置数据连接
DataSourceUtils.resetConnectionAfterTransaction(
con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
}
......@@ -392,6 +403,7 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
// 如果当前事务是独立的新创建的事务则在事务完成时释放数据库连接。
DataSourceUtils.releaseConnection(con, this.dataSource);
}
......@@ -418,6 +430,7 @@ public class DataSourceTransactionManager extends AbstractPlatformTransactionMan
if (isEnforceReadOnly() && definition.isReadOnly()) {
try (Statement stmt = con.createStatement()) {
// 设置事务为只读
stmt.executeUpdate("SET TRANSACTION READ ONLY");
}
}
......
......@@ -181,6 +181,7 @@ public abstract class DataSourceUtils {
boolean debugEnabled = logger.isDebugEnabled();
// Set read-only flag.
// 设置属性只读
if (definition != null && definition.isReadOnly()) {
try {
if (debugEnabled) {
......@@ -203,6 +204,7 @@ public abstract class DataSourceUtils {
}
// Apply specific isolation level, if any.
// 设置数据库隔离级别
Integer previousIsolationLevel = null;
if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
if (debugEnabled) {
......
......@@ -172,6 +172,7 @@ public abstract class JdbcTransactionObjectSupport implements SavepointManager,
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
// 直接调用 Connection的rollback 方法
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
}
......@@ -188,6 +189,7 @@ public abstract class JdbcTransactionObjectSupport implements SavepointManager,
public void releaseSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
// 直接调用 Connection的releaseSavepoint方法
conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
}
catch (Throwable ex) {
......
......@@ -644,6 +644,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 封装成 TransactionInfo 实例
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
......@@ -651,6 +652,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
// 记录事务状态
txInfo.newTransactionStatus(status);
}
else {
......@@ -665,6 +667,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
// 绑定到线程当前上
txInfo.bindToThread();
return txInfo;
}
......@@ -679,6 +682,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 进行事务提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
......@@ -690,13 +694,17 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
* @param ex throwable encountered
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 判断当前线程中是否存在事务
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 判断是否触发回滚操作,这里的条件是异常是否是 RuntimeException 或 error类型
// 即 return (ex instanceof RuntimeException || ex instanceof Error);
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 执行回滚操作
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
......@@ -713,6 +721,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
// 如果不满足回滚条件,则还是会提交,也就是说如果抛出异常不是 RuntimeException 或 error类型,则不会触发事务的回滚。
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
......
......@@ -342,6 +342,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
throws TransactionException {
// Use defaults if no transaction definition given.
// TransactionAttribute
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 1. 获取事务
......@@ -357,6 +358,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
// 到这里就表明当前线程没有事务存在了,即不会出现嵌套事务的情况了
// Check definition settings for new transaction.
// 事务超时验证
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
......@@ -408,9 +410,13 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个默认的 DefaultTransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 构造 Transaction,包括设置 ConnectionHolder、隔离级别、timeout。
// 并且如果是新连接,则绑定当当前线程。
doBegin(transaction, definition);
// 新同步事务的设置,针对于当前线程的设置
prepareSynchronization(status, definition);
return status;
}
......@@ -427,7 +433,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 如果传播属性是 PROPAGATION_NOT_SUPPORTED,则需要挂起当前事务。以不使用事务的形式调用
// 如果传播属性是 PROPAGATION_NOT_SUPPORTED,则需要挂起当前事务。以事务的形式调用
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
......@@ -589,13 +595,17 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
*/
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 如果当前事务处于激活状态
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 这里主要将当前事务的数据源(ConnectionHolder)解绑
suspendedResources = doSuspend(transaction);
}
// 获取当前事务 name、readOnly、isolationLevel 、wasActive 等属性,封装成 SuspendedResourcesHolder 返回
// 同时将当前事务的各种信息重置
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
......@@ -613,6 +623,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
throw ex;
}
}
// 如果当前事务并未激活且存在transaction
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
......@@ -638,11 +649,14 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
throws TransactionException {
if (resourcesHolder != null) {
// 获取挂起的事务
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 恢复事务
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
// 设置事务属性
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
......@@ -715,6 +729,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚。
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
......@@ -731,6 +746,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
return;
}
// 处理事务提交
processCommit(defStatus);
}
......@@ -746,25 +762,32 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
try {
boolean unexpectedRollback = false;
// 预留操作
prepareForCommit(status);
// 调用自定义触发器的方法
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 如果设置了保存点信息
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
// 清除保存点信息
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 如果是新事物
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 如果是独立事务则直接提交
doCommit(status);
}
// 不是新事物并不会直接提交,而是等最外围事务进行提交。
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
......@@ -795,6 +818,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 提交过程中会出现异常则回滚
doRollbackOnCommitException(status, ex);
throw ex;
}
......@@ -805,6 +829,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
triggerAfterCommit(status);
}
finally {
// 清理事务信息
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
......@@ -843,14 +868,17 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
boolean unexpectedRollback = unexpected;
try {
// 激活所有 TransactionSynchronization 中对应的方法 beforeCompletion() 方法
triggerBeforeCompletion(status);
// 如果当前事务有保存点,也就是当前事务为单独的线程则会退到保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
// 如果当前事务为独立的新事务,则直接回退
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
......@@ -858,6 +886,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
doRollback(status);
}
else {
// 如果当前事务不是独立的事务,那么只能标记状态,等到事务链执行完毕后统一回滚
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
......@@ -886,6 +915,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
throw ex;
}
// 激活所有 TransactionSynchronization 中对应的方法 afterCompletion() 方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
......@@ -895,6 +925,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
}
}
finally {
// 清空记录的资源并将挂起的资源恢复
cleanupAfterCompletion(status);
}
}
......@@ -1009,9 +1040,11 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
// 清除当前线程的中关于该事务的信息
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
// 清理事务信息
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
......@@ -1019,6 +1052,7 @@ public abstract class AbstractPlatformTransactionManager implements PlatformTran
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 结束之前事务的挂起状态。
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
......
......@@ -150,7 +150,9 @@ public abstract class AbstractTransactionStatus implements TransactionStatus {
throw new TransactionUsageException(
"Cannot roll back to savepoint - no savepoint associated with current transaction");
}
// 回滚到 savepoint
getSavepointManager().rollbackToSavepoint(savepoint);
// 释放保存点
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment