一、概述
Exchanger
可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange
方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger
可能被视为 SynchronousQueue
的双向形式。Exchanger
可能在应用程序(比如遗传算法和管道设计)中很有用。
二、算法描述
基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。
这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena, 竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:
-
Waiting:0 号槽的特殊之处在于当没有竞争时它是唯一存在的槽。一个占据 0 号槽的线程在短暂自旋后如果没有线程满足它 将阻塞。在其他情况下,占据线程最终放弃并尝试其他槽。等待线程在阻塞(如果是0号槽)或放弃(如果是其他槽)并重新开始之前会自旋等待一会儿(周期小于典型的上下文切换时间)。没有理由让线程阻塞,除非不大可能出现其他线程。占据者主要避免内存竞争,所以坐在静静地那里短暂地轮询,然后它会阻塞和解除阻塞。非0号槽等待那消失,因为缺少其他线程,在每次尝试会浪费一个额外的上下文切换,在平均上这仍然比可选的方法快。
-
Sizing:通常,仅使用少量的槽足以减少竞争。特别是少量线程时,使用太多的槽可能导致和使用太少的槽一样的低性能,且那对于错误没有太多空间。变量“max”维持了实际使用的槽的最大数量。它在当一个线程看到很多CAS失败时增加。(这类似于基于目标的负载值resing一个常规的哈希表只不过这里的增长步骤仅仅是一个接一个而不是按比例的。)增长要求每个槽上竞争失败的次数达到3次。要求多次失败是因为一些CAS失败不是因为竞争,而是两个线程之间的竟态race或出现读和CAS的线程优先级。而且,突然的竞争高峰可以比平均可接受的水平高很多。当一个非0号槽的等待由于没有等到满足而流逝elapses时,尝试减小max上限。经历过等待流逝elapsed的线程移往0号槽,所以最终(或将)找到存在存在线程,即使表由于不活跃而缩减。用于增长和缩减而选择的机制和阀值都是固定地与交换代码里的索引indexing和哈希hashing混在一起的,不能很好地抽取出来。
-
Hashing:每个线程选择它的的初始槽来使用,与一个简单的哈希码一致。给定的会交会的线程的序号是一样的,但高效地随机分布在线程间。使用竞技场arenas遭遇了所有哈希表的典型开销 vs 质量之间的平衡。这里,我们使用一步 FNV-1a 哈希码,基于当前线程的ID和一个廉价的大约相当于取模的操作(用于选择下标)。用这种方式优化下标选择的一个负面是哈希码是固定的hardwired,使用了表大小的最高32位。但这个值对于已知的平台和应用的满足的。
-
Probing:在感觉到选择的槽上的竞争时,我们顺序探测表,在表里出现哈希碰撞后类似于顺序探测。(我们周期性地移动,以反序方式,来吻合表的增长和收缩规则。)此外为了减少虚假警报(false-alarms)和缓存颠簸(cache trashing)的影响,我们在移动前两次尝试初次选择的槽。
-
Padding:即使了有了竞争管理,槽的竞争仍然很严重,所以我们使用 缓存填充(cache-padding)来避免低效的内存性能。由于这个,槽只在使用时延迟实例化,来避免不必要的空间浪费。位置的隔离一开始在一个应用程序里不会成为一个议题issue,随着时间推移和GC实行压缩,槽很可能被移到临近的,这可以导致在多核处理器是那个的缓存行颠簸,除非使用了填充。
这是对论文 “A Scalable Elimination-based Exchange Channel[http://hdl.handle.net/1802/2104]” 描述的算法的一个改进版,作者是 William Scherer, Doug Lea, and Michael Scott 。
三、简单的Exchanger
根据算法描述里的一部分,做了个简单的实现。未测试。
public class SimpleExchanger {
static class Slot {
final Object offer;
final Thread waiter;
volatile Object other;
public Slot(Thread waiter, Object offer) {
this. offer = offer;
this. waiter = waiter;
}
}
private volatile AtomicReference<Slot> refer = new AtomicReference<>();
public Object exchange(Object offer) {
Slot me = new Slot(Thread. currentThread(), offer);
Slot you;
for (;;) {
if ((you = refer.get()) == null) {
if( refer.compareAndSet( null, me)) {
LockSupport. park();
return me. other;
}
} else if( refer.compareAndSet(you, null)){
you. other = offer;
LockSupport. unpark(you.waiter);
return you. offer;
}
}
}
}
四、Exchanger 核心源码
内部类和属性
/**
* arena(竞技场)的容量。设置为一个提供了足够空间处理竞争的值。在小机器上,
* 大多数槽都不会使用,但它仍然不是浪费的,因为额外的空间提供了一些机器层面的地址填充,
* 来最小化大量CAS操作的槽的地址的干扰。
* 在每个大机器上,性能最终受限于内存的带宽,不是线程/CPU数量。
*
* 这个常量在没有修改 索引(indexing)和哈希(hashing)算法时不能修改。
*/
private static final int CAPACITY = 32;
/**
* 槽数组。元素在需要时延迟初始化。
* 声明为volatile是为了确保 double -checked延迟构建有效。
*/
private volatile Slot[] arena = new Slot[CAPACITY];
/**
* `max` 的值将在没有竞争的情况下持有所有线程。当这个值小于 CAPACITY 时,能避免一些膨胀。
*/
private static final int FULL =
Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
/**
* 在阻塞或放弃等待填充之前自旋(什么都不做,除了轮询内存位置)的次数。
* 在单处理器上应当是0。在多处理器上,这个值应当足够大以便两个线程
* 尽可能快地交换元素,仅在其中一个失速(stalled,由于GC或或被取代)
* 时阻塞,但也不能太长,以免浪费CPU资源。
*
* 考虑各种差异,这个值比大多数系统上下文切换时间的平均值的一半大一点。
*/
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
private static final int TIMED_SPINS = SPINS / 20;
private static final Object CANCEL = new Object();
private static final Object NULL_ITEM = new Object();
/**
* 被使用的槽序号的最大值。这个值在一个线程经历太多CAS竞争时增加,
* 在自旋等待 elapses时减少。变化只能通过compareAndSet来实施,
* 避免由于线程在设置前碰巧拖延stall而导致致数据腐化stale。
*/
private final AtomicInteger max = new AtomicInteger();
/**
* Node 持有交换数据的一部分。这个类继承自AtomicReference来表示一个洞。
* `get()` 返回洞(里的值),`compareAndSet` CAS 值到洞里。
*/
private static final class Node extends AtomicReference<Object> {
// 注意两个属性分别用final和volatile来保证内存可视性
public final Object item;
public volatile Thread waiter;
public Node(Object item) {
this.item = item;
}
}
/**
* Slot是一个具有缓存行填充的AtomicReference。由于填充显著地增加了空间(开销),
* 所有的槽都是按需创建的,这样,在能够提升吞吐量时会有不止一个槽,比使用额外空间更有价值。
*/
private static final class Slot extends AtomicReference<Object> {
// Improve likelihood of isolation on <= 64 byte cache lines
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
hashIndex 方法
此方法用于定位线程的默认槽。
private final int hashIndex() {
long id = Thread.currentThread().getId();
int hash = ((( int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
int m = max.get();
int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
((0x000001f8 >>> m) & 2) | // The constants hold
((0xffff00f2 >>> m) & 1)); // a lookup table
int index;
while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
return index;
}
spinWait 方法
此方法用于自旋等待,主要是默认槽是非0号槽的线程在没有线程可交换时使用。
private static Object spinWait(Node node, Slot slot) {
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
return v;
else if (spins > 0)
--spins;
else
tryCancel(node, slot);
}
}
// 注意,tryCancel并没有处理CAS失败的问题,CAS失败后重试是由调用者方法负责的。
private static boolean tryCancel(Node node, Slot slot) {
if (!node.compareAndSet( null, CANCEL))
return false;
if (slot.get() == node) // 提前检查以减少CAS竞争
slot.compareAndSet(node, null);
return true;
}
阻塞等待
当线程在 0 号槽也没有线程可交换时就会先自旋一段时间,如果仍然没有可交换线程则进入等待。
private static Object await(Node node, Slot slot) {
Thread w = Thread.currentThread();
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
return v;
else if (spins > 0) // 自旋等待阶段
--spins;
else if (node.waiter == null) // 先设置好,下次循环阻塞。
node.waiter = w;
else if (w.isInterrupted()) // 中断时终止
tryCancel(node, slot);
else
// Block 阻塞
LockSupport.park(node);
}
}
doExchange 方法
doExchange 方法是Exchanger类的核心方法,所以exchange方法极其变形最终都是调用这个方法。它的实现完全遵循前面的算法描述。
private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item); // 创建一个以防需要 occupying
int index = hashIndex(); // 当前槽的序号
int fails = 0; // CAS 失败计数
for (;;) {
Object y; // 当前槽的内容
Slot slot = arena[index];
if (slot == null) // 延迟初始化槽
createSlot(index); // 继续循环来重新读取
else if ((y = slot.get()) != null && // 尝试满足,也即已有线程在等待交换
slot.compareAndSet(y, null)) {
// 已有线程在等待交换,尝试交换
Node you = (Node)y; // 传输元素
if (you.compareAndSet( null, item)) {
// CAS 设置对象到等待者,考虑与其他线程 竞争,等待者取AtomicReference.value
LockSupport.unpark(you.waiter); // 成功交换,唤醒对方。
return you.item; // 返回对方的对象,完成交换者取等待者Node的item。
}
// 否则取消(由于与其他线程竞争交换失败),继续
}
else if (y == null && // 尝试占住槽
slot.compareAndSet( null, me)) {
// 没有线程在等待交换,当前等待其他线程来交换。
if (index == 0) // 0号槽特殊处理:阻塞等待
return timed ?
awaitNanos(me, slot, nanos) :
await(me, slot);
// 非0号槽自旋等待,方法返回时要么成功交换,要么被取消
Object v = spinWait(me, slot);
if (v != CANCEL) // 被fulfilled
return v;
// 取消等待,也就是等待流逝elapses。
me = new Node(item); // 丢弃已取消的结点
int m = max.get();
if (m > (index >>>= 1)) // 减小序号
max.compareAndSet(m, m - 1); // 可能缩减表
}
else if (++fails > 1) { // 每个槽上允许2次失败。
// CAS 失败处理,达到3次则增大 表,也就是增加CAS的槽。
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
index = m + 1; // 第三次失败时增加槽
else if (--index < 0)
index = m; // 循环遍历
}
}
}
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。