HikariCP 连接池–高性能数据结构

HikariCP,日语的含义是“光”,号称目前最快的数据库连接池。

它的高性能来自两个方面:

  1. 利用 Javassist 在字节码层面优化代理对象的创建,提升代理对象的调用性能。
  2. 在数据结构上采用定制的 FastListConcurrentBag 来提升性能。

本文主要关注这两种数据结构的实现。

1. FastList

我们用 JDBC 编程的时候,首先是获取 Connection、创建 Statement、执行查询得到 ResultSet,执行完成后依次关闭:ResultSetStatementConnection,特别是一个逻辑里创建了多个 PreparedStatement 时,一般用完就关闭。

为了防止用户忘了关闭 StatementResultSet,连接池需要跟踪创建的 StatementResultSet,在连接返回到连接池时关闭这两类资源。

public abstract class ProxyConnection implements Connection
{
    // 跟踪本连接创建的语句
    private final FastList<Statement> openStatements;

   private synchronized <T extends Statement> T trackStatement(final T statement) {
      openStatements.add(statement);
      return statement;
   }

   // Statement 关闭时回调此方法
   final synchronized void untrackStatement(final Statement statement) {
      openStatements.remove(statement);
   }

   // 创建语句时加入跟踪列表
   public Statement createStatement() throws SQLException {
      return ProxyFactory.getProxyStatement(this, trackStatement(delegate.createStatement()));
   }

    // Connection.close 方法回调此方法关闭打开的语句
    private synchronized void closeStatements() {
      final int size = openStatements.size();
      if (size > 0) {
         for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
            // 利用 try-with-resources 机制进行关闭
            try (Statement ignored = openStatements.get(i)) {
               // automatic resource cleanup
            } catch (SQLException e) {
               LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
                           poolEntry.getPoolName(), delegate);
               leakTask.cancel();
               poolEntry.evict("(exception closing Statements during Connection.close())");
               // 包装的代理对象不再持有底层的连接
               delegate = ClosedConnection.CLOSED_CONNECTION;
            }
         }

         openStatements.clear();
      }
}

关闭的顺序跟创建的顺序是相反的,要关闭并移除的对象一般在列表的末尾。而 ArrayList 的移除对象是从列表头部开始的,在这种场景下不高效。FastList 的实现是逆序查找要删除对象;对于根据下标进行的操作,移除了对下标合法性的检查,由连接池来保证。

public boolean remove(Object element) {
  for (int index = size - 1; index >= 0; index--) {
     if (element == elementData[index]) {
        final int numMoved = size - index - 1;
        if (numMoved > 0) {
           System.arraycopy(elementData, index + 1, elementData, index, numMoved);
        }
        elementData[--size] = null;
        return true;
     }
  }

  return false;
}

// 没有下标合法性检查
public T set(int index, T element) {
  T old = elementData[index];
  elementData[index] = element;
  return old;
}

FastList 算是结合业务特点进行优化的典型。

2. ConcurrentBag

一般的连接池实现会采用两个阻塞队列(空闲、已分配)来维护已创建的连接,而对阻塞队列的访问一般会涉及加锁、释放锁的操作,开销比较大,因此 HikariCP 采用自定义实现的 ConcurrentBag 来减少锁操作。

2.0 关键属性与接口

// 全局、共享的连接队列
private final CopyOnWriteArrayList<T> sharedList;

// 线程本地变量,用于减少竞争
private final ThreadLocal<List<Object>> threadList;

// 等待获取连接的线程数量
private final AtomicInteger waiters;

// 用于分配连接的队列
private final SynchronousQueue<T> handoffQueue;

// 元素必须实现的接口,定义了元素的状态,支持 CAS 操作来设置状态
public interface IConcurrentBagEntry {
  int STATE_NOT_IN_USE = 0;
  int STATE_IN_USE = 1;
  int STATE_REMOVED = -1;
  int STATE_RESERVED = -2;

  boolean compareAndSet(int expectState, int newState);
  void setState(int newState);
  int getState();
}

ConcurrentBag 的线程本地连接队列并不是严格意义的线程私有,其实仍然是共享的,其他线程可以通过共享的全局连接队列访问到某线程本地连接队列里的线程,所以线程获取到连接后要检查连接的状态,bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE) 操作成功才能认为拿到了连接。

2.1 添加新连接

新连接首先加入全局连接队列,如果当前有等待的线程、且新连接未被占用(加入全局队列后可能马上被其他线程获取),则尝试直接分配给等待者。

public void add(final T bagEntry) {
  if (closed) {
     LOGGER.info("ConcurrentBag has been closed, ignoring add()");
     throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
  }

  sharedList.add(bagEntry);

  // spin until a thread takes it or none are waiting
  while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
     Thread.yield();
  }
}

2.2 分配连接

申请连接时优先从线程本地的连接队列申请,没有空闲的才尝试全局连接队列,如果都没有则轮询 handoffQueue

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
  // 首先尝试线程本地
  final List<Object> list = threadList.get();
  for (int i = list.size() - 1; i >= 0; i--) {
     // 从本地列表移除
     final Object entry = list.remove(i);
     @SuppressWarnings("unchecked")
     final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
     if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
        return bagEntry;
     }
  }

  // Otherwise, scan the shared list ... then poll the handoff queue
  final int waiting = waiters.incrementAndGet();
  try {
     // 扫描全局共享队列
     for (T bagEntry : sharedList) {
        // 只是遍历,没有从全局列表移除
        if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
           // If we may have stolen another waiter's connection, request another bag add.
           if (waiting > 1) {
              listener.addBagItem(waiting - 1);
           }
           return bagEntry;
        }
     }

     listener.addBagItem(waiting);

    // 在剩余的时间轮询 handoffQueue 直至超时或拿到连接。
     timeout = timeUnit.toNanos(timeout);
     do {
        final long start = currentTime();
        final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
        if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
           return bagEntry;
        }

        timeout -= elapsedNanos(start);
     } while (timeout > 10_000);

     return null;
  } finally {
     waiters.decrementAndGet();
  }
}

2.3 释放

public void requite(final T bagEntry) {
  // 设置状态为 未使用
  bagEntry.setState(STATE_NOT_IN_USE);

  // 如果有等待线程,尝试直接分配
  for (int i = 0; waiters.get() > 0; i++) {
     if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
        return;
     } else if ((i & 0xff) == 0xff) {
        parkNanos(MICROSECONDS.toNanos(10));
     } else {
        Thread.yield();
     }
  }

  // 没有等待的线程,空闲连接
  // 控制本地连接队列的大小,如果每次获取的连接都加入到本地连接队列,
  // 可能导致本地连接队列持有所有的连接,这样就失去了本地连接队列的意义。
  final List<Object> threadLocalList = threadList.get();
  if (threadLocalList.size() < 50) {
     threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
  }
}

2.4 小结

从上面的代码可以看到,连接的申请与释放都不会对 sharedList 进行修改操作、只有读操作,因此可以采用 CopyOnWriteArrayList 这种适用于读多写少的并发安全数据结构。

本地连接队列相当于对全局连接队列进行分组,是全局连接队列的子集,这些子集之间可能存在交集,因此获取连接时需要 CAS 更新状态来判断是否真正获得了连接。

本地连接队列是动态的,在申请连接时会清理已被其他线程申请的连接,这样可以保持本地连接队列尽量小,而且基本都是没有竞争的。

通过分组,减少了对全局连接队列的并发竞争,也是一种通过分段减小竞争的思想。


欢迎关注我的微信公众号: coderbee笔记

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据