1. 应用场景说明
有多个事务发起的点 E0、E1,都调用到需要在事务里执行的方法 M0、M1,M0、M1 里都可能产生一些逻辑:一些作为事务的一部分执行、一些在事务提交失败时执行、一些在事务成功提交后执行。
2. 自行实现的一个简陋实现
应用命令模式把要根据事务是否提交成功来决定执行的逻辑封装成一个命令,在事务之外来执行。
定义了下面这样一个类,分别用于收集在事务提交成功和失败后执行的逻辑:
public class Actions {
private List<Runnable> transactionCommitSuccessActions = new ArrayList<Runnable>();
private List<Runnable> transactionCommitFailedActions = new ArrayList<Runnable>();
}
在开启事务前初始化一个 Actions 对象,然后事务方法调用到的方法都得传递这个对象,在事务方法结束后就可以根据事务是否成功来执行不同的队列里的逻辑。
3. Spring 内置的事务钩子
前段时间看到这篇文章 大搜车异步任务队列中间件的建设实践 才发现 Spring 一直就自带有事务钩子 TransactionSynchronizationManager
,要执行的逻辑封装成 TransactionSynchronization
对象。
钩子定义:
public interface TransactionSynchronization extends Flushable {
// 事务正常提交结束
int STATUS_COMMITTED = 0;
// 事务以回滚结束
int STATUS_ROLLED_BACK = 1;
/** Completion status in case of heuristic mixed completion or system errors. */
// 以启发式混合结束或系统错误,,,
int STATUS_UNKNOWN = 2;
default void suspend() {
}
default void resume() {
}
@Override
default void flush() {
}
default void beforeCommit(boolean readOnly) {
}
default void beforeCompletion() {
}
default void afterCommit() {
}
default void afterCompletion(int status) {
}
}
事务提交前、后执行的钩子为 beforeCommit/afterCommit
,事务结束的钩子为 beforeCompletion/afterCompletion
,事务结束的原因可能是正常提交、回滚,也可能是不确定的(比如发起了提交、但读响应失败)或系统错误。
我们知道 Spring 的事务是跟线程相关的,因此可以利用 ThreadLocal
来存储跟本线程相关的事务的钩子,这样也避免了不断传递 Actions
实例的参数的问题。
TransactionSynchronizationManager
提供了管理、执行钩子的能力:
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
// 判断当前线程相关的事务是否处于活跃状态
public static boolean isActualTransactionActive() {
return (actualTransactionActive.get() != null);
}
//
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
AbstractPlatformTransactionManager
负责在恰当的回调点调用 TransactionSynchronizationManager
管理的钩子。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
cleanupAfterCompletion(status);
}
}
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。