在之前的文章《踩坑 Druid 连接池》说踩了坑,后面经人提醒,发现根因是一个等待获取连接的 Job 线程被终止了,通过直接调用线程的 stop 方法终止的,这种方式破坏了 ReentrantLock 锁的模型。
下面这个方法是在持有锁的情况下执行的,执行到 1491 行时,job 线程会把自己加入条件对象的等待队列、然后释放锁,等待其他线程来唤醒;
其他线程调用 notEmpty.signal() 方法时,会把 job 线程从条件对象的等待队列转移到 AQS 的获取队列上,让 job 线程重新获取锁、继续执行。
当上一个持有锁的线程释放锁后,它会唤醒下一个,即执行 662 行。
线程被唤醒后执行下面的方法,问题是如果被唤醒的线程(job 线程)已经被终止了,就不会执行这个方法,导致 job 线程的等待节点一直在 head.next 位置,它之后的线程也不会被唤醒。
要能能明白的是,JVM 里 Thread 实例还在、但对应的操作系统线程是已终止,这是可以的,只是是不正常的。
HikariCP 为了达到高性能,做了不少有意思的实现,通过一个定制的数据结构 ConcurrentBag 来管理连接。
private final CopyOnWriteArrayList<T> sharedList;
private final ThreadLocal<List<Object>> threadList;
private final SynchronousQueue<T> handoffQueue;
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// 优先从本地队列获取,竞争小
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// 获取不到再从全局队列获取
// 读多写少,全局队列适合用 COW 的实现队列
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
// 还获取不到则在同步队列上等待获取
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
新连接加入池里:
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
如上,新连接首先加入到全局队列,然后看是否有线程在等待,有则通过 handoffQueue 进行交付。
可以发现,HikariCP 与 Druid 在这块的不同是,它不通过 Lock/Condition 来协调创建连接与获取连接的线程。
SynchronousQueue 底层的实现是无锁的,因此即使出现某个等待线程被终止,也不会影响后续的线程。它是适合于这种传递的场景的。
欢迎关注我的微信公众号: coderbee笔记 。