1. 事务管理器抽象
一个事务管理器只需要三个基本的能力:获取一个事务、提交事务、回滚事务。
public interface PlatformTransactionManager extends TransactionManager {
// 获取一个事务
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
// 提交事务
void commit(TransactionStatus status) throws TransactionException;
// 回滚事务
void rollback(TransactionStatus status) throws TransactionException;
}
DataSourceTransactionManagerAutoConfiguration
配置类导入了数据库事务管理器 DataSourceTransactionManager
。
2. 事务同步回调钩子
事务同步回调钩子让我们有机会在事务的各个阶段加入一些协调的动作。
public interface TransactionSynchronization extends Flushable {
/** Completion status in case of proper commit. */
int STATUS_COMMITTED = 0;
/** Completion status in case of proper rollback. */
int STATUS_ROLLED_BACK = 1;
/** Completion status in case of heuristic mixed completion or system errors. */
int STATUS_UNKNOWN = 2;
default void suspend() {}
default void resume() {}
default void flush() {}
default void beforeCommit(boolean readOnly) {}
default void beforeCompletion() {}
default void afterCommit() {}
default void afterCompletion(int status) {}
}
3. TransactionSynchronizationManager
TransactionSynchronizationManager
委托集中管理与每个线程绑定的资源和事务同步钩子。
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
4. 织入事务
spring-boot-autoconfig 模块会导入:TransactionAutoConfiguration -> EnableTransactionManagement -> ProxyTransactionManagementConfiguration
。
ProxyTransactionManagementConfiguration
会注册 Bean : TransactionInterceptor
(要切入的逻辑)、AnnotationTransactionAttributeSource
(切入点)、BeanFactoryTransactionAttributeSourceAdvisor
(切面)。
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource());
advisor.setAdvice(transactionInterceptor());
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor() {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource());
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
AnnotationAwareAspectJAutoProxyCreator
在创建 Bean 的代理对象时,会获取到 BeanFactoryTransactionAttributeSourceAdvisor
判断该 Bean 的哪些方法需要织入事务,从而把 TransactionInterceptor
织入到代理对象。
5. TransactionInterceptor 事务拦截器
TransactionInterceptor
被 AOP 织入到代理对象,拦截对事务方法的调用,然后调用父类 TransactionAspectSupport.invokeWithinTransaction
,该方法调用 TransactionManager
实现类,在执行目标方法前后加入 获取事务、提交事务或回滚事务的控制。
// TransactionInterceptor
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
// TransactionAspectSupport
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取类和方法的事务属性。如果不是事务方法, txAttr 是 null 。
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 根据事务属性获取事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 省略 ReactiveTransactionManager 相关的。。
// 基于数据库的 DataSourceTransactionManager 是 PlatformTransactionManager 的子类
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 标准事务 demarcation with getTransaction and commit/rollback calls.
// 创建一个新的事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 调用目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 处理异常:回滚事务或正常提交事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 正常提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 省略 CallbackPreferringPlatformTransactionManager 相关的
}
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
6. AbstractPlatformTransactionManager 事务控制实现类
AbstractPlatformTransactionManager
在获取事务时实现了 Spring 事务传播行为(创建新事务、创建新事务并挂起当前事务)。
事务提交或回滚前后回调已注册的 TransactionSynchronization
的相关方法。
6.1 创建事务
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 这里只是创建了一个事务对象,并没有绑定到底层的资源,比如 JDBC 连接。
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// 如果当前已存在事务,要根据新的传播行为来决定如何处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 到这里说明当前没有事务存在。
// 新事务要求当前必须存在事务,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起当前事务,null 表示没有事务,但可能存在同步钩子。
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个事务对象状态。
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 这里会获得真实的数据库连接
// TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
doBegin(transaction, definition);
// 初始化 TransactionSynchronizationManager 的线程本地变量,更新为当前事务的。
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// 创建一个"空"事务:非真实的事务,但准备了同步钩子
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
// DataSourceTransactionManager
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the session holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
6.2 事务提交
事务提交并不是每次调用都提交底层的事务,而是只有初始事务才会提交。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
// 是初始事务才会提交
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 这个事务结束后,进行清理,然后恢复被挂起的事务
cleanupAfterCompletion(status);
}
}
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
// 恢复被挂起的事务
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
triggerXXX
方法都是回调与这个事务相关的 TransactionSynchronization.xXXX
方法。
7. MyBatis 与 Spring 事务
MybatisAutoConfiguration
配置类定义了 Bean:SqlSessionFactory
和 SqlSessionTemplate
。
在构建 SqlSessionFactory
时,SqlSessionFactoryBean.buildSqlSessionFactory
方法里会把 transactionFactory
初始化为 SpringManagedTransactionFactory
的实例,并把这个实例传给 Environment
。
@MapperScan
注解会导入 MapperScannerRegistrar
来扫描 Mapper 接口类,封装成 MapperFactoryBean
,该 bean 注入了 SqlSessionTemplate
用于执行各种数据库操作。
MyBatis-Spring 模块还会把获得的 Session 封装在 SqlSessionHolder,以 SqlSessionFactory 为键,以 SqlSessionHolder 为值缓存在 TransactionSynchronizationManager 的 resource 里,方便快速获取。
为了与 Spring 的事务动作协调,还向 TransactionSynchronizationManager 注册了 SqlSessionSynchronization,以便在 suspend/resume/commit
等动作前后处理 SqlSessionHolder
。
数据库连接获取的链: 代理对象 -> SqlSessionTemplate -> SqlSessionInterceptor -> SpringManagedTransaction -> DataSourceUtils -> Connection
MyBatis 代理对象获取 SqlSession
SqlSessionInterceptor
调用 SqlSessionUtils.getSqlSession
方法来获取事务。
// SqlSessionUtils
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
// 先从当前事务找
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
// 当前没有 SqlSession,创建一个。
session = sessionFactory.openSession(executorType);
// 注册新的 SqlSession 到 TransactionSynchronizationManager
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
// 把当前的 SqlSessionHolder 绑定到 resource,与 ConnectionHolder 一致。
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
// 注册事务同步钩子,以便于 Spring 事务协调。
TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else { }
}
SpringManagedTransaction 获取数据库连接
Executor 最终要执行数据库操作时,必须调用 SqlSession.getConnection
方法获取连接,也就会调用到 SpringManagedTransaction.getConnection
方法,如下。
// SpringManagedTransaction
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
// DataSourceUtils
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
欢迎关注我的微信公众号: coderbee笔记 。