Spring 事务钩子

1. 应用场景说明

有多个事务发起的点 E0、E1,都调用到需要在事务里执行的方法 M0、M1,M0、M1 里都可能产生一些逻辑:一些作为事务的一部分执行、一些在事务提交失败时执行、一些在事务成功提交后执行。

2. 自行实现的一个简陋实现

应用命令模式把要根据事务是否提交成功来决定执行的逻辑封装成一个命令,在事务之外来执行。

定义了下面这样一个类,分别用于收集在事务提交成功和失败后执行的逻辑:

public class Actions {
    private List<Runnable> transactionCommitSuccessActions = new ArrayList<Runnable>();
    private List<Runnable> transactionCommitFailedActions = new ArrayList<Runnable>();
}

在开启事务前初始化一个 Actions 对象,然后事务方法调用到的方法都得传递这个对象,在事务方法结束后就可以根据事务是否成功来执行不同的队列里的逻辑。

3. Spring 内置的事务钩子

前段时间看到这篇文章 大搜车异步任务队列中间件的建设实践 才发现 Spring 一直就自带有事务钩子 TransactionSynchronizationManager,要执行的逻辑封装成 TransactionSynchronization 对象。

钩子定义:

public interface TransactionSynchronization extends Flushable {
    // 事务正常提交结束
    int STATUS_COMMITTED = 0;

    // 事务以回滚结束
    int STATUS_ROLLED_BACK = 1;

    /** Completion status in case of heuristic mixed completion or system errors. */
    // 以启发式混合结束或系统错误,,,
    int STATUS_UNKNOWN = 2;

    default void suspend() {
    }

    default void resume() {
    }

    @Override
    default void flush() {
    }

    default void beforeCommit(boolean readOnly) {
    }

    default void beforeCompletion() {
    }

    default void afterCommit() {
    }

    default void afterCompletion(int status) {
    }
}

事务提交前、后执行的钩子为 beforeCommit/afterCommit,事务结束的钩子为 beforeCompletion/afterCompletion,事务结束的原因可能是正常提交、回滚,也可能是不确定的(比如发起了提交、但读响应失败)或系统错误。

我们知道 Spring 的事务是跟线程相关的,因此可以利用 ThreadLocal 来存储跟本线程相关的事务的钩子,这样也避免了不断传递 Actions 实例的参数的问题。

TransactionSynchronizationManager 提供了管理、执行钩子的能力:

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");

// 判断当前线程相关的事务是否处于活跃状态
public static boolean isActualTransactionActive() {
    return (actualTransactionActive.get() != null);
}

// 
public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}

public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
}

public static void registerSynchronization(TransactionSynchronization synchronization)
        throws IllegalStateException {

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    Set<TransactionSynchronization> synchs = synchronizations.get();
    if (synchs == null) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchs.add(synchronization);
}

AbstractPlatformTransactionManager 负责在恰当的回调点调用 TransactionSynchronizationManager 管理的钩子。

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;

            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                status.releaseHeldSavepoint();
            } else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                doCommit(status);
            } else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        } catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        } catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            } else {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        } catch (RuntimeException | Error ex) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }

        try {
            triggerAfterCommit(status);
        } finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }
    } finally {
        cleanupAfterCompletion(status);
    }
}

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据