Java8 Striped64 和 LongAdder

数据 striping

根据维基百科的这段说明

In computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that consecutive segments are stored on different physical storage devices.

Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage, network interface controllers, different computers in clustered file systems and grid-oriented storage, and RAM in some systems.

数据 striping 就是把逻辑上连续的数据分为多个段,使这一序列的段存储在不同的物理设备上。通过把段分散到多个设备上可以增加访问并发性,从而提升总体的吞吐量。

Striped64

JDK 8 的 java.util.concurrent.atomic 下有一个包本地的类 Striped64 ,它持有常见表示和机制用于类支持动态 striping 到 64bit 值上。

设计思路

这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是包私有的,通过子类直接访问。

表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended )的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对性能有巨大的副作用),在没有这个防备下。

部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。

一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,会增加竞争和减少本地性,这仍然好于其他选择。

通过 ThreadLocalRandom 维护线程探针字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法(Marsaglia XorShift)来尝试找到一个自由槽位。

表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们到达容量,我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence 是慢的,因为线程通常不会一直绑定到CPU上,可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。

当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。

设计思路小结

  • striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。
  • 分解表示:对于一个数字 5,可以分解为一序列数的和:2 + 3,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)
  • 通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。
  • 当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。

Cell 类

Cell 类是 Striped64 的静态内部类。通过注解 @sun.misc.Contended 来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的volatile变量。

@sun.misc.Contended static final class Cell {
     volatile long value;
     Cell(long x) { value = x; }
     final boolean cas(long cmp, long val) {
          return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
     }

     // Unsafe mechanics
     private static final sun.misc.Unsafe UNSAFE;
     private static final long valueOffset;
     static {
          try {
               UNSAFE = sun.misc.Unsafe.getUnsafe();
               Class<?> ak = Cell.class;
               valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
          } catch (Exception e) {
               throw new Error(e);
          }
     }
}

Striped64

Striped64 通过一个 Cell 数组维持了一序列分解数的表示,通过 base 字段维持数的初始值,通过 cellsBusy 字段来控制 resing 和/或 创建Cell。它还提供了对数进行累加的机制。

abstract class Striped64 extends Number {
    static final int NCPU = Runtime.getRuntime().availableProcessors();

     // 存放 Cell 的表。当不为空时大小是 2 的幂。
    transient volatile Cell[] cells;

     // base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。
    transient volatile long base;

     // 自旋锁,在 resizing 和/或 创建Cell时使用。
    transient volatile int cellsBusy;
}

累加机制 longAccumulate

设计思路里针对机制的实现,核心逻辑。该方法处理涉及初始化、resing、创建新cell、和/或竞争的更新。

逻辑如下:

  • if 表已初始化

    • if 映射到的槽是空的,加锁后再次判断,如果仍然是空的,初始化cell并关联到槽。
    • else if (槽不为空)在槽上之前的CAS已经失败,重试。
    • else if (槽不为空、且之前的CAS没失败,)在此槽的cell上尝试更新
    • else if 表已达到容量上限或被扩容了,重试。
    • else if 如果不存在冲突,则设置为存在冲突,重试。
    • else if 如果成功获取到锁,则扩容。
    • else 重散列,尝试其他槽。
  • else if 锁空闲且获取锁成功,初始化表

  • else if 回退 base 上更新且成功则退出
  • else 继续
final void longAccumulate(long x, LongBinaryOperator fn,
                                boolean wasUncontended) {
     int h;
     if ((h = getProbe()) == 0) {
          // 未初始化的
          ThreadLocalRandom.current(); // 强制初始化
          h = getProbe();
          wasUncontended = true;
     }

     // 最后的槽不为空则 true,也用于控制扩容,false重试。
     boolean collide = false;
     for (;;) {
          Cell[] as; Cell a; int n; long v;
          if ((as = cells) != null && (n = as.length) > 0) {
               // 表已经初始化

               if ((a = as[(n - 1) & h]) == null) {
                    // 线程所映射到的槽是空的。

                    if (cellsBusy == 0) {       // 尝试关联新的Cell

                         // 锁未被使用,乐观地创建并初始化cell。
                         Cell r = new Cell(x);

                         if (cellsBusy == 0 && casCellsBusy()) {
                              // 锁仍然是空闲的、且成功获取到锁

                              boolean created = false;
                              try {               // 在持有锁时再次检查槽是否空闲。
                                   Cell[] rs; int m, j;
                                   if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        // 所映射的槽仍为空

                                        rs[j] = r;          // 关联 cell 到槽
                                        created = true;
                                   }
                              } finally {
                                   cellsBusy = 0;     // 释放锁
                              }
                              if (created)
                                   break;               // 成功创建cell并关联到槽,退出
                              continue;           // 槽现在不为空了
                         }
                    }
                    // 锁被占用了,重试
                    collide = false;
               }
               // 槽被占用了

               else if (!wasUncontended)       // 已经知道 CAS 失败
                    wasUncontended = true;      // 在重散列后继续

               // 在当前槽的cell上尝试更新
               else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                  fn.applyAsLong(v, x))))
                    break;

               // 表大小达到上限或扩容了;
               // 表达到上限后就不会再尝试下面if的扩容了,只会重散列,尝试其他槽
               else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale

               //  如果不存在冲突,则设置为存在冲突
               else if (!collide)
                    collide = true;

               // 有竞争,需要扩容
               else if (cellsBusy == 0 && casCellsBusy()) {
                    // 锁空闲且成功获取到锁
                    try {
                         if (cells == as) {      // 距上一次检查后表没有改变,扩容:加倍
                              Cell[] rs = new Cell[n << 1];
                              for (int i = 0; i < n; ++i)
                                   rs[i] = as[i];
                              cells = rs;
                         }
                    } finally {
                         cellsBusy = 0;     // 释放锁
                    }
                    collide = false;
                    continue;                   // 在扩容后的表上重试
               }

               // 没法获取锁,重散列,尝试其他槽
               h = advanceProbe(h);
          }
          else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
               // 加锁的情况下初始化表

               boolean init = false;
               try {                           // Initialize table
                    if (cells == as) {
                         Cell[] rs = new Cell[2];
                         rs[h & 1] = new Cell(x);
                         cells = rs;
                         init = true;
                    }
               } finally {
                    cellsBusy = 0;     // 释放锁
               }
               if (init)
                    break;     // 成功初始化,已更新,跳出循环
          }
          else if (casBase(v = base, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
               // 表未被初始化,可能正在初始化,回退使用 base。
               break;                          // 回退到使用 base
     }
}

LongAdder

LongAdder 继承自 Striped64,它的方法只针对简单的情况:cell存在且更新无竞争,其余情况都通过 Striped64 的longAccumulate方法来完成。

public void add(long x) {
     Cell[] as; long b, v; int m; Cell a;
     if ((as = cells) != null || !casBase(b = base, b + x)) {
          // cells 不为空 或在 base 上cas失败。也即出现了竞争。
          boolean uncontended = true;

          //
          if (as == null || (m = as.length - 1) < 0 ||
               (a = as[getProbe() & m]) == null ||
               !(uncontended = a.cas(v = a.value, v + x)))
               // 如果所映射的槽不为空,且成功更新则返回,否则进入复杂处理流程。
               longAccumulate(x, null, uncontended);
     }
}

// 获取当前的和。base值加上每个cell的值。
public long sum() {
     Cell[] as = cells; Cell a;
     long sum = base;
     if (as != null) {
          for (int i = 0; i < as.length; ++i) {
               if ((a = as[i]) != null)
                    sum += a.value;
          }
     }
     return sum;
}

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

Java8 Striped64 和 LongAdder》有一个想法

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据