//第一次进来connectionHolder为空的,所以不存在事务 // 如果当前已经存在事务 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // 根据不同传播机制不同处理 return handleExistingTransaction(def, transaction, debugEnabled); }
// Check definition settings for new transaction. // 超时不能小于默认值 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { thrownewInvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); }
// No existing transaction found -> check propagation behavior to find out how to proceed. // 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { thrownewIllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //第一次进来大部分会走这里 // 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步 elseif (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //先挂起 SuspendedResourcesHoldersuspendedResources= suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { //创建事务状态对象,其实就是封装了事务对象的一些信息,记录事务状态的 //是否需要新开启同步// 开启// 开启 booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus= newTransactionStatus( def, transaction, true, newSynchronization, debugEnabled, suspendedResources); //开启事务,重点看看 DataSourceTransactionObject doBegin(transaction, def); //开启事务后,改变事务状态 prepareSynchronization(status, def); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况, // 创建“空”事务:没有实际事务,但可能是同步。警告:定义了隔离级别,但并没有真实的事务初始化, // 隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化, // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
// 1.NERVER(不支持当前事务;如果当前事务存在,抛出异常)报错 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { thrownewIllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); }
//2.NOT_SUPPORTED(不支持当前事务,现有同步将被挂起)挂起当前事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); }
//挂起当前事务 ObjectsuspendedResources= suspend(transaction); booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //修改事务状态信息,把事务的一些信息存储到当前线程中,ThreadLocal中 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // 3.REQUIRES_NEW挂起当前事务,创建新事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } //挂起 SuspendedResourcesHoldersuspendedResources= suspend(transaction); try { // 创建新事务 booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus= newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } //4.NESTED嵌套事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { thrownewNestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 是否支持保存点:非JTA事务走这个分支。AbstractPlatformTransactionManager默认是true, // JtaTransactionManager复写了该方法false,DataSourceTransactionmanager没有复写,还是true, //默认是可以嵌套事务的 if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatusstatus= prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); //创建回滚点 status.createAndHoldSavepoint(); return status; } else { // JTA事务走这个分支,创建新事务 // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus= newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } }
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { IntegercurrentIsolationLevel= TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { ConstantsisoConstants= DefaultTransactionDefinition.constants; thrownewIllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { thrownewIllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } // 到这里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY, // 存在事务加入事务即可,prepareTransactionStatus第三个参数就是是否需要新事务。false代表不需要新事物 booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
当前线程已存在事务情况下,新的不同隔离级别处理情况:
NERVER:不支持当前事务;如果当前事务存在,抛出异常:”Existing transaction found for transaction marked with propagation ‘never’”
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 如果是自动提交切换到手动提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } //关闭连接的自动提交,其实这步就是开启了事务 con.setAutoCommit(false); }
inttimeout= determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); }
// 绑定connection持有者到当前线程 // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { //如果是新创建的事务,则建立当前线程和数据库连接的关系 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } }
catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } thrownewCannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
@Override publicfinalvoidcommit(TransactionStatus status)throws TransactionException { // 如果事务已完结,报错无法再次提交 if (status.isCompleted()) { thrownewIllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
DefaultTransactionStatusdefStatus= (DefaultTransactionStatus) status; // 如果事务明确标记为回滚 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } //执行回滚 processRollback(defStatus, false); return; } //如果不需要全局回滚时提交 且 全局回滚 if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } //执行回滚 processRollback(defStatus, true); return; } // 执行提交事务 processCommit(defStatus); }
// Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. // 3.非新事务,且全局回滚失败,但是提交时没有得到异常,抛出异常 if (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } // 事务异常 catch (TransactionException ex) { // can only be caused by doCommit // 提交失败回滚 if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } // 触发完成后回调,事务同步状态为未知 else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } // 运行时异常或者其它异常 catch (RuntimeException | Error ex) { // 如果3个前置步骤未完成,调用前置的最后一步操作 if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } // 提交异常回滚 doRollbackOnCommitException(status, ex); throw ex; }
// Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); }
privatevoidtriggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) { if (status.isNewSynchronization()) { List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations(); TransactionSynchronizationManager.clearSynchronization(); if (!status.hasTransaction() || status.isNewTransaction()) { if (status.isDebug()) { logger.trace("Triggering afterCompletion synchronization"); } // No transaction or new transaction for the current scope -> // invoke the afterCompletion callbacks immediately invokeAfterCompletion(synchronizations, completionStatus); } elseif (!synchronizations.isEmpty()) { // Existing transaction that we participate in, controlled outside // of the scope of this Spring transaction manager -> try to register // an afterCompletion callback with the existing (JTA) transaction. registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations); } } }
privatevoidtriggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) { if (status.isNewSynchronization()) { List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations(); TransactionSynchronizationManager.clearSynchronization(); if (!status.hasTransaction() || status.isNewTransaction()) { if (status.isDebug()) { logger.trace("Triggering afterCompletion synchronization"); } // No transaction or new transaction for the current scope -> // invoke the afterCompletion callbacks immediately //调用方法 invokeAfterCompletion(synchronizations, completionStatus); } elseif (!synchronizations.isEmpty()) { // Existing transaction that we participate in, controlled outside // of the scope of this Spring transaction manager -> try to register // an afterCompletion callback with the existing (JTA) transaction. registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations); } } }
// 如果是最新的连接持有者,解绑当前线程绑定的<数据库资源,ConnectionHolder> // Remove the connection holder from the thread, if exposed. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } // 资源引用计数-1,关闭数据库连接 DataSourceUtils.releaseConnection(con, this.dataSource); } // 重置连接持有者的全部属性 txObject.getConnectionHolder().clear(); }
rollback回滚事务
AbstractPlatformTransactionManager#rollback
1 2 3 4 5 6 7 8 9 10
@Override publicfinalvoidrollback(TransactionStatus status)throws TransactionException { if (status.isCompleted()) { thrownewIllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
// Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { //解绑当前线程 cleanupAfterCompletion(status); } }
有几个公共方法和提交事务时一致,就不再重复。
DataSourceTransactionManager#doRollback()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@Override protectedvoiddoRollback(DefaultTransactionStatus status) { DataSourceTransactionObjecttxObject= (DataSourceTransactionObject) status.getTransaction(); Connectioncon= txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { thrownewTransactionSystemException("Could not roll back JDBC transaction", ex); } }