关注Java领域相关技术 记录有趣的事情

Disruptor是什么

US-B.Ralph
US-B.Ralph
2016-06-15

了解Disruptor之前需要知道的前置知识

CPU Cache

关于CPU Cache,在这里将使用Lgor Ostrovsky的文章 Gallery of Processor Cache Effects来进行介绍。

为什么需要CPU Cache

随着工艺的提升最近几十年CPU的频率不断提升,而受制于制造工艺和成本限制,目前计算机的内存主要是DRAM并且DRAM的访问速度并没有质的突破。因此CPU的处理速度和内存的访问速度差距越来越大,甚至可以达到上万倍。这种情况下传统的CPU通过FSB直连内存的方式显然就会因为内存访问的等待,导致计算资源大量闲置,降低了CPU整体吞吐量。同时又由于内存数据访问的热点集中性,所以会在CPU和内存之间增加一层快速而成本较高的SDRAM做缓存。

为什么要有多级CPU Cache

随着科技发展,热点数据的体积越来越大,单纯的增加一级缓存大小的性价比已经很低了。因此,就慢慢出现了在一级缓存(L1 Cache)和内存之间又增加一层访问速度和成本都介于两者之间的二级缓存(L2 Cache)。下面是一段从What Every Programmer Should Know About Memory中摘录的解释:

Soon after the introduction of the cache the system got more complicated. The speed difference between the cache and the main memory increased again, to a point that another level of cache was added, bigger and slower than the first-level cache. Only increasing the size of the first-level cache was not an option for economical rea- sons.

下面一张图可以看出各级缓存之间的响应时间差距,以及内存到底有多慢!

CPU Cache Access Latencies in Clock Cycles

由上图可以看出高速缓存离CPU越近,速度越快,容量越小。L1缓存很小且非常快,紧挨使用它的核心。L2更大,更慢,并且仍然仅由单个内核使用。L3在现代多核计算机中也很普遍,相较L2,L3更大,也要更慢,并且在单个插槽的多个内核之间共享。最后是主内存,在所有内核和所有套接字之间共享。

当CPU执行操作时,首先要在L1中查找所需的数据,然后在L2中查找,然后在L3中查找,最后,如果不在任何高速缓存中,则需要从主内存中获取数据。当缓存离CPU越远,操作所需的时间就越长。因此,如果您经常执行某项操作,则需要确保数据位于L1缓存中。QCon演示中提供了一些测试结果说明缓存未命中时需要付出的代价:

Latency from CPU to… Approx. number of CPU cycles Approx. time in nanoseconds
Main memory ~60-80ns
QPI transit(between sockets, not drawn) ~20ns
L3 cache ~40-45 cycles, ~15ns
L2 cache ~10 cycles, ~3ns
L1 cache ~3-4 cycles, ~1ns
Register 1 cycle

这里有一篇关于多级缓存的科普文章

CPU Cache的淘汰策略

常见的淘汰策略主要有LRURandom两种。通常意义下LRU对于Cache的命中率会比Random更好,所以CPU Cache的淘汰策略选择的是LRU。当然也有些实验显示在Cache Size较大的时候Random策略会有更高的命中率

Cache Line

Cache Line是什么

Cache Line可以简单的理解为CPU Cache中的最小缓存单位。目前主流的CPU Cache的Cache Line大小都是64Bytes。

假设我们有一个512Bytes的一级缓存,那么按照64Bytes的缓存行来算,这个一级缓存所能存放的缓存行个数就是512/64 = 8个。Java中long类型是8Bytes,因此每个缓存行中可以存放8个long变量:

CacheLines

证明Cache Line的存在

根据Cache Line的作用,CPU每次从主存中加载数据时,会把相邻的数据也载入到同一个cache line中。
目前主流CPU Cache的Cache Line的大小是64Bytes,当访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个,因此可以非常快的遍历这个数组。事实上,我们可以非常快速的遍历分配给内存中连续块的任何数据结果。而如果数据结构中的各节点在内存中不是彼此相邻的(如链表),你将得不到cache line预加载所带来的优势,并且在这些数据结构中的每一个项都可能会出现缓存未命中的情况。

但是,预加载有一个缺点。想象一下,如果long不是数组的一部分,而只是一个变量,例如head,其后紧挨着另一个变量tail。现在,当加载head到缓存中时,tail也将被预加载到缓存中。

FalseSharing

这样看起来不错,但是当tail由生产者写入,而head由消费者写入时,而headtail之间并不紧密相关,反而可能被运行在不同内核上的不同的线程所使用。

FalseSharingWriteHead

假设消费者更新了head,缓存值将被更新,内存中的值也将被更新,并且包含head的所有其他缓存行都将失效,因为其他缓存行中的head仍为之前的值。

FalseSharingReadTail

如果在另一个内核上运行的某个线程想读取tail的值,则需要从主内存中重新读取整个缓存行。所以,一个与消费者无关的线程正在读取一个与head无关的值(tail),由于缓存未命中导致访问tail变慢。

如果两个单独的线程正在写入两个不同的值,则情况更糟:两个内核都将使另一个内核上的缓存行失效,每次有其他线程写入时,该线程都必须重新读取被更新的缓存行。这种情况下即使两个线程正在写入两个不同的变量,在写线程之间也发生了竞争,这种现象被称为False sharing。每次访问head都会得到tail,并且每次访问tail也会得到head。所有这些都是静默执行的,没有编译器警告你、告诉你刚刚编写的代码对于并发访问的效率是非常低的。

例1:

public class CacheLinePadding {
    private static long count = 1_0000_0000L;
    //p1-p7\p9-p15是冗余数据,为了缓存行对齐
    //无冗余数据程序需执行 4066ms
    //有冗余数据程序需执行 935ms
    private static class T {
        public volatile long p1, p2, p3, p4, p5, p6, p7;
        public volatile long x;
        public volatile long p9, p10, p11, p12, p13, p14, p15;
    }

    static T[] arr = new T[2];
    static {
        arr[0] = new T();
        arr[1] = new T();
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < count; i++) {
                arr[0].x = i;
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < count; i++) {
                arr[1].x = i;
            }
        });

        long startTime = System.nanoTime();
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println((System.nanoTime() - startTime)/100_0000);
    }

}

例2:

public class T07_CacheLine {
    static long[][] arr;
    public static void main(String[] args) {
        arr = new long[1024 * 1024][];
        for (int i = 0; i < arr.length; i++) {
            arr[i] = new long[8];
            for (int j = 0; j < 8; j++) {
                arr[i][j] = 0L;
            }
        }

        long sum = 0L;
        long start = System.currentTimeMillis();
        for (int i = 0; i < arr.length; i++) {
            for (int j = 0; j < 8; j++) {
                sum+=arr[i][j];
            }
        }
        System.out.println(System.currentTimeMillis()-start);

        System.out.println("=====================================");
        start = System.currentTimeMillis();
        for (int i = 0; i < 8; i++) {
            for (int j = 0; j < arr.length; j++) {
                sum+=arr[j][i];
            }

        }
        System.out.println(System.currentTimeMillis()-start);
    }
}

内存屏障

内存屏障是什么

内存屏障是一条CPU指令。用于:

  • 确保执行某些操作的顺序
  • 影响某些数据的可见性

编译器和CPU为了提高性能在保证最终结果相同的前提下,会重新安排指令执行的顺序。在指令间插入内存屏障来告诉CPU和编译器:在该内存屏障之前发生的事情需要留在该内存屏障指令之前,而内存屏障之后发生的事情需要留在内存屏障之后,不可以对他们重排序。

MemoryBarrier

内存屏障的另一个作用是强制同步CPU缓存。例如,写屏障会将所有在屏障之前写入的数据刷新到缓存中,因此任何其他试图读取该数据的线程都会得到最新版本的数据,无论它由哪个核心或哪个socket执行。

内存屏障与Java有什么关系

Java中的关键字volatile保证变量可见性的。如果一个成员变量被volatile修饰,在Java内存模型中,会在写入它之后插入一条写屏障指令,在读取它之前插入一条读屏障指令。

MemoryBarrierWrite

这意味着,如果写入一个volatile修饰的字段:

  • 在写入该字段之后访问该字段的任何线程都将获取到更新后的值
  • 在写入该字段之前所做的任何操作都将被保证发生,并且任何更新的数据值也将可见,因为内存屏障会将所有先前的写入刷新到缓存中。

False Sharing

False Sharing是什么

如果有多个线程操作不同的成员变量,当这些成员位于相同的缓存行时就会发生False Sharing。例如一个cache lien包含v1、v2,且cache line被线程1和线程2共享,如果此时线程3修改了v2的值,那么线程1和线程2将会强制重新加载cache line。即使对于多个线程来说这些更新操作是逻辑独立的,但是CPU是以cache line为最小读取单元的,即一致性的保持是以cache line为基本单元,而不是以单个独立的变量。这种明显没有必要的共享数据的方式被称作“False sharing”.

在Martin Thompson的false-sharing一文中有一张经典图示,如下所示:

cache-line

图中,运行在 core1上的线程想要更新变量 X 的值,同时另外一个运行在 core2 上的线程想要更新变量 Y 的值。但是,这两个X和Y处于同一条缓存行中。core1和core2上的线程将争夺缓存行的所有权,以便他们可以对其进行更新。当 core1 取得了所有权,开始更新 X,则 core2 对应的缓存行需要设为失效状态。当 core2 取得了所有权开始更新 Y,则 core1 对应的缓存行需要设为失效态。轮番争夺缓存行的所有权不但带来大量的 RFO 消息,而且如果某个线程需要读此行数据时,L1 和 L2 缓存上都是失效数据,只有 L3 缓存上是同步好的数据而 L3 的读取性能较L2、L1慢数倍,频繁通过L3同步数据将非常影响性能。更坏的情况是跨槽读取,跨槽读取时 L3 都要 miss状态,只能从内存上加载。

如何避免False Sharing?

  1. 避免False Sharing的方式
  • 在文章false-sharing中Martin Thompson介绍了Disruptor在避免False Sharing效应中的做法:
    • 缓存行填充,使用缓存行填充将不同的遍历分布到不同的缓存行中,最终使得不同的线程操作不同的缓存行。
    • cache line的大小为64Bytes,在Hotspot JVM中,对象头固定占8Bytes或12Bytes(64位操作系统开启压缩,不开压缩为16Bytes),以64位操作系统不开压缩为例,我们只需要在变量周围用6个long填充缓存行,就可以避免False Sharing。
    • 在Disruptor中使用填满cache line的方式将RingBUffer cursor 和 BatchEventProcessor sequences周围填满。
    • 文章中给出的多线程共享缓存行和不共享缓存行的测试代码:
public final class FalseSharing implements Runnable{
    public final static int NUM_THREADS = 4; // change
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private final int arrayIndex;

    private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
    static {
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
    }

    public FalseSharing(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    public static void main(final String[] args) throws Exception {
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharing(i));
        }

        for (Thread t : threads) {
            t.start();
        }

        for (Thread t : threads) {
            t.join();
        }
    }

    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }

    public final static class VolatileLong {
        public volatile long value = 0L;
        public long p1, p2, p3, p4, p5, p6; // comment out
    }
}
  • 另一种避免False Sharing的技术是私用编译指示,强制使每一个变量对齐。
  //编译器使用__declspec( align(n) ) 此处 n=64,按照 cache line 边界对齐
  __declspec (align(64)) int thread1_global_variable;
  __declspec (align(64)) int thread2_global_variable;
  • False Sharing的例子:JDK的LinkedBlockingQueue中,有两个引用分别指向Queue的head和tail。在并发环境下,head、tail经常被不同的线程修改,但head、tail有可能位于同一个缓存行中,于是就可能产生False Sharing,线程越多、核越多False Sharing对性能所产生的负面影响越大。
  1. 多线程环境下缓存行共享会对性能产生较大负面影响,我们一定要通过缓存行填充去解决掉潜在的False Sharing问题吗?
  • 其实并不一定。首先False Sharing是很隐蔽的,我们暂时无法从系统层面上通过工具来探测False Sharing事件。其次,不同类型的计算机具有不同的微架构(如 32 位系统和 64 位系统的 java 对象所占字节数就不一样),如果设计到跨平台的设计,那就更难以把握了,一个确切的填充方案只适用于一个特定的操作系统。还有,缓存的资源是有限的,如果填充会浪费珍贵的 cache 资源,并不适合大范围应用。最后,目前主流的 Intel 微架构 CPU 的 L1 缓存,已能够达到 80% 以上的命中率。综上所述,并不是每个系统都适合花大量精力去解决潜在的False Sharing问题。

Disruptor 是什么

Disruptor是英国外汇交易公司LMAX金融交易平台业务逻辑处理器的核心组件。研发的初衷是解决内存队列的延迟问题(在性能测试中发现JDK内置队列延迟与I/O操作处于同样的数量级),它能够以很低的延迟(latency)产生大量交易(吞吐量),基于Disruptor开发的系统单线程能支撑每秒600万订单。

Disruptor运行在内存中(in-memory),使用事件源驱动方式(event sourcing)运行,能够在无锁的情况下实现网络的Queue并发操作。

Martin Thompson 和 Michael Barker于2010年在QCon发表了主题为LMAX – How to Do 100K TPS at Less than 1ms Latency的演讲获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍介绍LMAX系统架构(文章地址:The LMAX Architecture)。同年Disruptor获得了Oracle官方的Duke大奖。

目前,包括Apache Storm、Apache HBase Server、Spring Messaging、Apache Solr Core、Log4j2在内的很多知名项目都应用了Disruptor。具体可点击Artifacts using Disruptor Framework查看。


Disruptor 能解决什么问题 使用场景/模式

模式

  • 发布订阅模式
  • 点对点模式
  • 顺序消费模式

场景

  • 低延迟,高吞吐,有界缓存队列
  • 提高吞吐量,减少并发执行上下文切换的延迟等待

Disruptor的设计方案

Disruptor底层采用的数据结构

  • Disruptor底层采用数组。数组的访问速度比链表要快,同时,数组对处理器的缓存机制更加友好(预加载)。

  • 元素位置定位:通过位运算index &(arr.len-1),其中arr.len=2^n,快速找到元素位置。

  • 为了避免垃圾回收,在构建RingBuffer(下文会有详细介绍)时预先分配数组中的元素,并使这些对象永生。

无锁设计

CAS操作

当需要确保操作是线程安全的(特别是在多个生产者的情况下,更新下一个可用序列号)时,使用CAS(比较和交换/设置)操作。CAS是CPU级别的指令,CPU在更新数据时会拿当前值与预期值进行比较,如果要更改的值不是预期的值,则该操作将失败,显然有其他线程已经抢先对数据进行了更改。(下图中CPU1 CPU2是两个不同的内核而非两个单独的CPU)

ConcurrencyCAS

无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。

内存屏障在Disruptor中的作用

RingBuffer cursor是被volatile修饰的变量,它是Disruptor实现无锁设计的原因之一。

BarriersWriteExample

  • 生产者:生产者在向RingBuffer中写入数据之前,首先获取下一个可写入的Entry(或一批Entry),然后更新这些Entry。更改结束后,生产者将调用commit方法,更新Entry序列号(7)。该操作实际上是对volatile修饰的cursor的写入创建一个内存屏障,使得最终所有的缓存保持最新状态(或至少使它们失效)。生产者更新完之后消费者可以获取更新的序列号,并且利用内存屏障保证了内存屏障之前所执行指令的顺序,因此消费者可以确信生产者对位置7中的Entry所做的所有更改是可用的。

  • 消费者:消费者上的sequence是被volatile修饰的,sequence可以被许多外部对象读取,例如其他消费者可能正在跟踪监控这个消费者,而且ProducerBarrier/RingBuffer(取决于Disruptor的版本)也将跟踪这个消费者,以确保RingBuffer上不会发生未读覆盖。

    • 因此,如果下游消费者(C2)看到较早的消费者(C1)已经到达序号12的位置,当C2从RingBuffer读取最多12个条目时,它将获得对C1所在位置更新之前的所有的更新。
      MemoryBarrierReadExample

    • 故在C2得到已更新的序列号(如上蓝色显示)之后所发生的所有事情,都必须发生在C1对RingBuffer进行的更新之前的所有操作(以黑色显示)之后。

内存屏障对性能的影响

内存障碍是CPU级指令,使用内存屏障时内核不会在多个线程之间进行切换。但是并不是说内存屏障没有成本。使用内存屏障带来的成本有:

  • 编译器/CPU无法重新排序指令,这将导致无法尽可能高效地使用CPU。
  • 刷新缓存对性能产生影响。

因此,不要以为使用volatile不会对性能产生影响。

Disruptor的实现中尽量避免频繁读取和写入sequence,因为每次读取或写入volatile变量都要花费成本。当认识到这一点就可以很好地利用批处理。无论在生产者端还是在消费者端,都不应该过于频繁地读取或写入序列,而是在更新序列号之前整批读取可以更新的Entry,然后进行处理。以下是BatchConsumer的一个示例:

    long nextSequence = sequence + 1;
    while (running)
    {
        try 
        {
            final long availableSequence = consumerBarrier.waitFor(nextSequence);
            while (nextSequence <= availableSequence)
            {
                entry = consumerBarrier.getEntry(nextSequence);
                handler.onAvailable(entry);
                nextSequence++;
            }
            handler.onEndOfBatch();
            sequence = entry.getSequence();
        }
        ...
        catch (final Exception ex)
        {
            exceptionHandler.handle(ex, entry);
            sequence = entry.getSequence();
            nextSequence = entry.getSequence() + 1;
        }
    }

在上面的代码中,我们使用局部变量在循环中消费者正在处理的条目上递增。这意味着我们尽可能少地读写volatile修饰的sequence字段。

缓存行对齐

Disruptor对于缓存行为64Bytes或更小的体系结构,通过添加填充来缺保RingBuffer的序列号永远不会与其他任何变量处于同一个缓存行中,从而消除了False Sharing的发生。减少了缓存未命中发生的概率。

public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding

Disruptor原理

这部分主要译自Trisha 关于Disruptor的一系列博文,在文末我会给出原文地址。

Trisha 在介绍Disruptor原理时使用的是Disruptor较早的版本,新版本(2.0及之后)相比较老版本最主要的变化有:

  • 更贴切的命名;
    • 下图分别新老版本的配置示意图
      1P3C-Diamond-RingBuffer

    • 可以看到新版本中ProducerBarrier不再作为一个实体存在,而是被PublishPort接口所替代,而RingBuffer本身实现了PublishPort接口;
      NewWorldOrder

    • DependencyBarrier替代了ConsumerBarrier,厘清了此对象的职责;

    • Publisher替代了Producer,EventProcessor替代了Consumer。这样可以更能精确地体现出它们的行为;
    • RingBuffer中存储数据的Entry,改名叫Event了,相应的处理器就是EventProcessor;
  • ProducerBarrier被整合到了RingBuffer中。

  • 将Disruptor Wizard加入了主代码库,使得事件发布者和事件处理者之间的接驳更加简单了。

下面我们开始介绍Disruptor的原理

RingBuffer结构

RingBuffer是Disruptor的核心,它是一个环形缓存区,可以用做在不同上下文间传递数据的buffer。其底层是一个数组,之所以选择数组有以下几点原因:

  • 数组比链表快,根据之前对缓存行的解释,当数组中的一个元素被加载时,与其相邻的数组元素也会被预加载,因此在这样的结构中,CPU无需时不时去主内存加载数组中的下一个元素。
  • 数组在构建时可以预先分配内存,使得数组对象永生(除非程序终止),这就意味着对于RingBuffer来说无需垃圾回收且仅占用预先分配好的固定大小内存。
  • 数组不像链表那样,需要为每一个添加到其上面的对象创造节点对象,当删除节点时,还需要执行相应的内存清理操作。RingBuffer实质是一个普通的数组,当数据填充满队列(即到达2^n-1位置)之后,继续写入数据时,会从0开始,覆盖之前的数据,对历史数据采用的是覆盖方式,避免了jvm的GC。
    RingBuffer
  • RingBuffer的结构是一个带有指向下一个可用节点的指针的数组。数组的大小为2的n次方,这样元素定位可以通过位运算index\&(arr.len-1) ,效率会更高。
    RingBufferInitial
  • 随着数据不停地写入缓存区,序列不断地增加,直到绕过这个环:
    RingBufferWrapped

向RingBuffer中写入数据

ProduceBarriers

Disruptor 中没有提供关于生产者的接口,而是让生产者通过 ProducerBarrier 对象向 RingBuffer 中写入数据。写入数据的过程可分为两步(two-phase commit):

  • 生产者向RingBuffer申请下一个节点;
  • 生产者将数据写入节点中,并调用ProduceBarrier的commit方法;

一个生产者的情况:

  • 避免发生未读覆盖:
    • ConsumerTrackingProducerBarrier对象中维护着所有正在访问RingBuffer的消费者。为什么生产者需要关心消费者的状态呢?因为Disruptor与队列不同,在Disruptor中由消费者负责通知生产者处理到了哪个序列号,而不是由RingBuffer负责协调。所以为了防止RingBuffer中Entry被覆盖,需要检查所有消费者读到了哪里。
      PreventRingFromWrapping

    • 上图中Consumer1读到了序号为12的位置(红色),Consumer2正在读取序号为3的Entry,因此如果Consumer2想要赶上Consumer1的进度,需要读完整个RingBuffer一圈的距离。

    • 现在Producer想要向RingBuffer中序号为3的Entry(因为3是RingBuffer当前光标(12)的下一个节点的位置)写入数据。由于Consumer2正在占用3这个Entry所以ProducerBarrier不会向3中写入数据,而是在这里自旋等待,直到Consumer2消费完离开。
  • 申请下一个节点

    • 当Consumer2消费了一批节点后向前移动了它的序号,可能挪到了序号9(消费者采用批处理的方式,真实情况下可能会到12的位置)。
      ProducerNextEntry

    • 此时ProducerBarrier发现序号3的位置可以使用了,它会抢占这个节点上的Entry并把下一个序号13更新成Entry的序号,然后把Entry交给Producer,生产者向序号为13的Entry中写入数据。

  • 提交数据

    • 当生产者向序号13的Entry中写入数据后,它会要求ProducerBarrier提交。
      ProducerCommit)

    • ProducerBarrier先等待RingBuffer的游标追上当前的位置即的位置12,然后ProducerBarrier 更新RingBuffer的游标移动到刚才写入的Entry的序号上,并且ProducerBarrier会通知消费者有新的数据产生了(ProducerBarrier会通知ConsumerBarrier 上的 WaitStrategy对象,不同的WaitStrategy有不同的提醒方式)。

    • 此时Consumer1可以读序号为13的Entry中的数据,Consumer2可以读序号为13及之前的所有数据。
  • ProducerBarrier上的批处理

    • Disruptor可以同时在生产者和消费者两端实现批处理。
    • 由于ConsumerTrackingProducerBarrier对象中维护着所有正在访问RingBuffer的消费者,所以ProducerBarrier清楚的知道最慢的消费者的位置,它很容易就可以发现当前哪些节点可以用。
      ProducerBatching

    • 本例中Consumer1位于12,最慢的消费者Consumer2位于9的位置,所以ProducerBarrier知道节点3、4、5、6、7、8是可以使用的,它可以让生产者向这些可以使用的节点中写入数据而不需要反复检查消费者的位置。

    • 总结一下:Producer当前位于a位置,当Producer向RingBuffer申请下一个可以写入的序号时,RingBuffer会返回当前可以写入的最大序号n,若有m个元素可以写入,判断时否会发生未读覆盖。若返回正确,则生成者开始批量写入数据。

多个生产者的情况

  • 申请下一个节点

    • 多生产者的场景中,下一个可以写入的序号并不是RingBuffer游标+1这么简单,因为有可能其他生产者正在向RingBuffer中写入数据导致某些Entry正在被生产者占用但还没有提交的情况。
      ProducersNextEntry

    • 生产者向ClaimStrategy申请下一个可用节点时,Producer1拿到序号是13,Producer2拿到序号是14,尽管RingBuffer当前的游标位于12的位置。这是因为ClaimSequence不但负责分发序号,而且负责跟踪哪些序号已经被分配。
      ProducersCommit

    • 现在每个生产者都拥有了自己的Entry及其序号。假设Producer1由于某些原因未能及时提交数据,Producer2已经准备好提交了,并且向ProducerBarrier发出了提交数据的请求。

    • 上面我们在介绍单生产者场景时说到,ProducerBarrier只有在RingBuffer游标到达准备提交的前一个位置时他才会提交。所以当前情况下Producer2提交数据的请求并不会被执行。
      ProducersCommit2

    • 当Producer1完成数据的生产并申请提交序号为13的Entry中的数据时,ProducerBarrier让ClaimStratrgy先等待RingBuffer的游标到达序号12的位置,然后将RingBuffer的游标移动到13,并且ProducerBarrier通知ConsumerBarrier 上的 WaitStrategy对象,RingBuffer中有数据更新了。

    • 接下来ProducerBarrier可以完成Producer2的数据提交的请求,并让RingBuffer游标移动到14的位置,并通知所有人RingBuffer中有数据更新了。
    • 尽管生产者在不同的时间完成数据的写入,但是RingBuffer中内容的顺序总是遵循nextEntry()的调用顺序。如果一个生产者在写入 RingBuffer 的时候暂停了,只有当它解除暂停后,其他等待中的提交才会立即执行。
public long tryNext(int n) throws InsufficientCapacityException {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do{
        current = cursor.get();
        next = current + n;

        if (!hasAvailableCapacity(gatingSequences, n, current)) {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    //cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据
    while (!cursor.compareAndSet(current, next));

    return next;
}

从RingBuffer中读取数据

ConsumerBarrier与Consumer

  • ConsumerBarrier是由RingBuffer创建并代表消费者与RingBuffer交互的对象;
    ConsumerWaitFor

  • Consumer是一个从RingBuffer中读取数据的线程,它可以访问ConsumerBarrier对象;

获取下一个可以访问的节点

  • 就像RingBuffer需要一个序号才能找到下一个可用节点一样,Consumer也需要一个序号来指示下一个将要访问的节点的位置,上图中Consumer处理完序号8之前的所有数据,下一个将要访问的位置的序号是9;

    • Consumer可以访问ConsumerBarrier的waitFor()方法,获取下一个可以访问的序号;
final long availableSeq = consumerBarrier.waitFor(nextSequence);
  • ConsumerBarrier返回RingBuffer的最大可访问序号12。ConsumerBarrier采用何种策略等待这个序号由WaitStrategy来决定。

等待数据写入RingBuffer

  • 接下来,消费者会一直原地停留,等待更多数据被写入RingBuffer。并且,一旦数据写入后消费者会收到通知——节点9、10、11、12 已写入。现在序号到12了,消费者可以让ConsumerBarrie去拿这些序号节点里的数据了。
    ConsumerRequest

  • 拿到数据后Consumer会自更新自己的序号(或者游标cursor)。

这样设计的好处

  • 这样做是怎样有助于平缓延迟的峰值了。以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,现在Consumer只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,waitFor会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(RingBuffer本身的序号已经更新),而且Consumer对这些节点的唯一操作是读而不是写,因此访问不用加锁。
  • 当有多个Consumer去读同一个RingBuffe时不需要加锁,也不需要用另外的队列来协调不同的线程(Consumer)。BatchHandler接口可以批处理那些需要处理的节点。

Disruptor中组件的依赖关系

多生产者的场景下,Disruptor通过 ProducerBarrier 保证写入的操作顺序。在多个消费者的场景中Disruptor的是使用会变得比较复杂,某些情况下我们需要多个消费者在访问 RingBuffer 的时候互相等待(依赖)。像很多应用里,有一连串的工作需要在实际执行业务逻辑之前完成 (happen before) —— 例如,在做任何操作之前,我们都必须先保证消息写入磁盘。下面我们看以下这样的场景:

菱形结构

现在有这样一个问题:有一个生产者、三个消费者,Consumer3代表需要业务逻辑,Consumer2代表数据采集或预处理,Consumer1代表备份接收到的数据。Consumer3必须等另外两个消费者处理完成才能开始处理。
1P3C-Diamond

使用队列实现菱形结构

1P3C-Diamond-Queue

这种方案中,每个处理阶段都会用队列分开,一条消息从P1传输到C3需要完整的穿过四个队列,每个队列在消息进入和取出的时候都会产生成本。

使用Disruptor实现菱形结构

1P3C-Diamond-RingBuffer

在Disruptor中一切都由RingBuffer管理,所有参与者都只依赖RingBuffer,所有的交互都是基于Barrier(ProducerBarrier、ConsumerBarrier)和所依赖的目标序号(Producer、Consumer、RingBuffer中均维护有序号)来实现的。

  • 生产者:我们现在讨论的场景是上文中单生产者模型。这个场景中Producer不需要关注所有Consumer,只需要关注Consumer3就可以了(Consumer3需要等另外两个Consumer处理完才能处理),只要Consumer3的位置向前移动,RingBuffer的后续节点就会空闲出来。
  • 管理消费者的依赖关系需要两个 ConsumerBarrier 对象。第一个仅仅与 Ring Buffer 交互,C1 和 C2 消费者向它申请下一个可访问节点。第二个 ConsumerBarrier 只知道消费者 C1 和 C2,它返回两个消费者访问过的消息序号中较小的那个。

消费者之间的依赖关系

1P3C-Diamond-RingBuffer-Example

  • 场景:生产者 P1 已经在 RingBuffer 里写到序号 22的位置了,消费者 C1 已经访问和处理完了序号 21 之前的所有数据。消费者 C2 处理到了序号 18的位置。消费者 C3(依赖其他消费者),才处理到序号 15的位置。
    1P3C-Diamond-RingBuffer-Example2

  • 此时 P1 不能继续向 RingBuffer 中写入数据了,因为序号 15 的位置占据了我们想要写入序号 23 的数据节点的位置。

  • 第一个 ConsumerBarrier(CB1)告诉 C1 和 C2 可以去访问序号 22 前面的所有数据。第二个 ConsumerBarrier (CB2) 不但会检查 RingBuffer 的序号,也会检查另外两个消费者的序号并且返回它们之间的最小值。因此,三号消费者被告知可以访问 Ring Buffer 里序号 18 前面的数据。注意C3还是直接从 RingBuffer 中读取数据——并不是由 C1 和 C2 把数据节点从 RingBuffer 里取出再传递给 C3 的。
  • 这产生了一个问题:如果所有数据都来自于 RingBuffer,那么 C3 如何读到前面两个消费者处理完成的数据呢?如果 C3 消费者关心的只是先前的消费者是否已经完成它们的工作(例如,把数据复制到别的地方),那么这一切都没有问题。但是,如果 C3 消费者需要访问先前的消费者的处理结果,它又从哪里去获取呢?

更新数据节点

  • 为了使C3能够获取到其他消费者的处理结果,可以在C1、C2处理完之后将处理结果写入 RingBuffer 数据节点 (Entry) 中。这样,当 C3 消费者从 RingBuffer 取出节点时,它就可以拿到其他消费者的处理结果了。
  • 这里真正重要的地方是节点 (Entry) 对象的每一个字段应该只允许一个消费者写入。这样可以避免产生并发写入冲突 (write-contention),保证消息处理速度。
    FizzBuzzEntry

  • 例如,在FizzBuzzEntry中有两个字段:fizz和buzz。当FizzCustomer拿到FizzBuzzEntry时,它只能写入fizz,如果是BuzzCustomer它只能写入buzz。FizzBuzzCustomer只能去读取这两个字段不会做写入操作。

Disruptor 与相互依赖(等待)的多个消费者关联的关键点:

  • 使用多个 ConsumerBarrier 来管理消费者之间的依赖(等待)关系。
  • 使用 ProducerBarrier 监视依赖关系中最后一个消费者。
  • 只允许每个消费者更新数据节点 (Entry) 中每一个独立字段。

菱形结构代码实现

//未使用DSL实现菱形结构
Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3();
RingBuffer ringBuffer = new RingBuffer(ENTRY_FACTORY, RING_BUFFER_SIZE);
ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier();
BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1);
BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2);
ConsumerBarrier consumerBarrier2 =
    ringBuffer.createConsumerBarrier(consumer1, consumer2);
BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3);
executor.execute(consumer1);
executor.execute(consumer2);
executor.execute(consumer3);
ProducerBarrier producerBarrier =
    ringBuffer.createProducerBarrier(consumer3);
//使用DSL实现菱形结构
dw.consumeWith(handler1a, handler2a);
dw.after(handler1a).consumeWith(handler1b);
dw.after(handler2a).consumeWith(handler2b);
dw.after(handler1b, handler2b).consumeWith(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();

菱形样式的并行结构代码示例

两条消费者链并行运行,最终消费者依赖于两者

dw.consumeWith(handler1a,handler2a); 
dw.after(handler1a).consumeWith(handler1b); 
dw.after(handler2a).consumeWith(handler2b); 
dw.after(handler1b,handler2b).consumeWith(handler3); 
ProducerBarrier producerBarrier = dw.createProducerBarrier();

Disruptor等待策略

消费者等待策略

类名 策略采取的措施 适用场景
BlockingWaitStrategy 加锁:等待生产者唤醒,被唤醒后循环检查依赖的sequence是否已经消费 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
LiteBlockingWaitStrategy 加锁:与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个waitFor()和signalAll(),可以减少lock的次数 CPU资源紧缺,吞吐量和延迟并不重要的场景
LiteTimeoutBlockingWaitStrategy 加锁:与LiteBlockingWaitStrategy相比增加了阻塞超时时间,超时后抛出异常 CPU资源紧缺,吞吐量和延迟并不重要的场景
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略:根据时间参数和传入的等待策略来决定时殷弘哪种策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep:sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁:有超时限制,超时后抛出异常 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋:尝试100次然后Thread.yield()让出CPU 性能和CPU资源之间有很好的折中。延迟比较均匀

生产者等待策略

//休眠1ns
LockSupport.parkNanos(1);

与JDK队列进行比较

JDK中的队列

队列 数据结构 保证线程安全的手段 有界与否
ArrayBlockingQueue ArrayList 悲观锁-ReentrantLock 有界
LinkedBlockingQueue LinkedList 悲观锁-ReentrantLock 无界
ConcurrentLinkedQueue LinkedList 乐观锁-CAS 无界
LinkedTransferQueue LinkedList CAS+LockSupport 无界
PriorityBlockingQueue Heap 悲观锁-ReentrantLock 无界,可自增长至Integer.MAX_VALUE – 8
DelayQueue Heap 悲观锁-ReentrantLock 无界,可自增长至Integer.MAX_VALUE – 8
SynchronousQueue:非公平交易 Dual Stack CAS+LockSupport 容量为0
SynchronousQueue:公平交易 Queue CAS+LockSupport 容量为0

队列中保证线程安全的手段

其中阻塞队列利用了 ReentrantLock 以及它的 Condition 来实现,而非阻塞队列则是利用 CAS 方法实现线程安全。

悲观锁

认为不锁住资源,别的线程会来争抢,可能造成错误结果,为了确保数据正确,每一次访问数据时,会先加锁再访问,让其他线程无法修改。典型:synchronized、Lock、ReentrantLock。

乐观锁

认为自己操作共享资源时,其他线程不会过来干扰,所以不会锁住对象,同时为了确保数据的准确性,在更新之前会对比现在的数据是否等于修改之前的数据,判断数据有没有被其他线程修改过,如果没有修改过说明只有自己在操作数据,可以正常更新否则放弃本次修改并采取一定措施,如报错、重试。典型应用:AtomicXXX中利用CAS更新数据。

Distuptor与JDK自带队列比较

比较对象的选择

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列有LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者通过锁的方式来实现线程安全,而ConcurrentLinkedQueue以及LinkedTransferQueue都是利用CAS操作来实现的。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。

ArrayBlockingQueue 有什么缺点

ArrayBlockingQueue在使用过程中因为加锁和伪共享会导致严重性能问题。

加锁

ArrayBlockingQueue中通过ReentrantLock以及它的两个condition来控制并发。

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

锁的成本: 传统阻塞队列使用锁保证线程安全。而锁通过操作系统内核的上下文切换实现,会暂停线程去等待锁直到释放。执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态。这种状态会导致高水平的写入争用。

伪共享

伪共享问题导致的性能低下。ArrayBlockingQueue有三个成员变量:

  • takeIndex:需要被取走的元素下标
  • putIndex:可被元素插入的位置的下标
  • count:队列中元素的数量
    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

这三个变量很容易被放到同一个缓存行中,但是它们之间的修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行失效,需要从主内存中重新读取。

垃圾回收

队列是垃圾的重要来源,队列中的元素和用于存储元素的节点对象需要进行频繁的重新分配。

Disruptor与ArrayBlockingQueued性能比较

这里引用官方给出的测试结果,查看测试结果原文点这里:

官方采用Disruptor1.x版本与ArrayBlockingQueue进行比较。

官方测试案例

unicast 1p-1c

three step pipeline 1p-3c

sequencer 3P-1c

multicast 1p-3c

diamond 1p-3c

吞吐量性能测试( ops per sec)
  • Nehalem 2.8Ghz – Windows 7 SP1 64-bit
模型 ArrayBlockingQueue Disruptor
Unicast: 1P – 1C 5,339,256 25,998,336
Pipeline: 1P – 3C 2,128,918 16,806,157
Sequencer: 3P – 1C 5,539,531 13,403,268
Multicast: 1P – 3C 1,077,384 9,377,871
Diamond: 1P – 3C 2,113,941 16,143,613
  • Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit
模型 ArrayBlockingQueue Disruptor
Unicast: 1P – 1C 4,057,453 22,381,378
Pipeline: 1P – 3C 2,006,903 15,857,913
Sequencer: 3P – 1C 2,056,118 14,540,519
Multicast: 1P – 3C 260,733 10,860,121
Diamond: 1P – 3C 2,082,725 15,295,197
延迟性能测试(ns)
  • 2.2Ghz Core i7-2720QM running Java 1.6.0_25 64-bit on Ubuntu 11.04.
模型 Array Blocking Queue (ns) Disruptor (ns)
Mean Latency 32,757 52
99% observations less than 2,097,152 128
99.99% observations less than 4,194,304 8,192

Distuptor的使用

单生产者单消费者

/**
 * <pre>
 * UniCast a series of items between 1 publisher and 1 event processor.
 *
 * +----+    +-----+
 * | P1 |--->| EP1 |
 * +----+    +-----+
 *
 * Disruptor:
 * ==========
 *              track to prevent wrap
 *              +------------------+
 *              |                  |
 *              |                  v
 * +----+    +====+    +====+   +-----+
 * | P1 |--->| RB |<---| SB |   | EP1 |
 * +----+    +====+    +====+   +-----+
 *      claim      get    ^        |
 *                        |        |
 *                        +--------+
 *                          waitFor
 *
 * P1  - Publisher 1
 * RB  - RingBuffer
 * SB  - SequenceBarrier
 * EP1 - EventProcessor 1
 *
 * </pre>
 */
public final class OneToOneSequencedBatchThroughputTest extends AbstractPerfTestDisruptor
{
    public static final int BATCH_SIZE = 10;
    private static final int BUFFER_SIZE = 1024 * 64;
    private static final long ITERATIONS = 1000L * 1000L * 100L;
    private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE);
    private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS) * BATCH_SIZE;
    ///////////////////////////////////////////////////////////////////////////////////////////////

    private final RingBuffer<ValueEvent> ringBuffer =
        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
    private final BatchEventProcessor<ValueEvent> batchEventProcessor =
        new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);

    {
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
    }
    ///////////////////////////////////////////////////////////////////////////////////////////////

    @Override
    protected int getRequiredProcessorCount()
    {
        return 2;
    }

    @Override
    protected PerfTestContext runDisruptorPass() throws InterruptedException
    {
        PerfTestContext perfTestContext = new PerfTestContext();
        final CountDownLatch latch = new CountDownLatch(1);
        long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS * BATCH_SIZE;
        handler.reset(latch, expectedCount);
        executor.submit(batchEventProcessor);
        long start = System.currentTimeMillis();

        final RingBuffer<ValueEvent> rb = ringBuffer;

        for (long i = 0; i < ITERATIONS; i++)
        {
            long hi = rb.next(BATCH_SIZE);
            long lo = hi - (BATCH_SIZE - 1);
            for (long l = lo; l <= hi; l++)
            {
                rb.get(l).setValue(i);
            }
            rb.publish(lo, hi);
        }

        latch.await();
        perfTestContext.setDisruptorOps((BATCH_SIZE * ITERATIONS * 1000L) / (System.currentTimeMillis() - start));
        perfTestContext.setBatchData(handler.getBatchesProcessed(), ITERATIONS * BATCH_SIZE);
        waitForEventProcessorSequence(expectedCount);
        batchEventProcessor.halt();

        failIfNot(expectedResult, handler.getValue());

        return perfTestContext;
    }

    private void waitForEventProcessorSequence(long expectedCount) throws InterruptedException
    {
        while (batchEventProcessor.getSequence().get() != expectedCount)
        {
            Thread.sleep(1);
        }
    }

    public static void main(String[] args) throws Exception
    {
        OneToOneSequencedBatchThroughputTest test = new OneToOneSequencedBatchThroughputTest();
        test.testImplementations();
    }
}

多生产者单消费者

/**
 * <pre>
 *
 * Sequence a series of events from multiple publishers going to one event processor.
 *
 * +----+
 * | P1 |------+
 * +----+      |
 *             v
 * +----+    +-----+
 * | P1 |--->| EP1 |
 * +----+    +-----+
 *             ^
 * +----+      |
 * | P3 |------+
 * +----+
 *
 * Disruptor:
 * ==========
 *             track to prevent wrap
 *             +--------------------+
 *             |                    |
 *             |                    v
 * +----+    +====+    +====+    +-----+
 * | P1 |--->| RB |<---| SB |    | EP1 |
 * +----+    +====+    +====+    +-----+
 *             ^   get    ^         |
 * +----+      |          |         |
 * | P2 |------+          +---------+
 * +----+      |            waitFor
 *             |
 * +----+      |
 * | P3 |------+
 * +----+
 *
 * P1  - Publisher 1
 * P2  - Publisher 2
 * P3  - Publisher 3
 * RB  - RingBuffer
 * SB  - SequenceBarrier
 * EP1 - EventProcessor 1
 *
 * </pre>
 *
 * @author mikeb01
 */
public final class ThreeToOneSequencedBatchThroughputTest extends AbstractPerfTestDisruptor
{
    private static final int NUM_PUBLISHERS = 3;
    private static final int BUFFER_SIZE = 1024 * 64;
    private static final long ITERATIONS = 1000L * 1000L * 100L;
    private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 1, DaemonThreadFactory.INSTANCE);
    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1);
    ///////////////////////////////////////////////////////////////////////////////////////////////

    private final RingBuffer<ValueEvent> ringBuffer =
        createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy());

    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
    private final BatchEventProcessor<ValueEvent> batchEventProcessor =
        new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    private final ValueBatchPublisher[] valuePublishers = new ValueBatchPublisher[NUM_PUBLISHERS];

    {
        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            valuePublishers[i] = new ValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS, 10);
        }

        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
    }
    ///////////////////////////////////////////////////////////////////////////////////////////////

    @Override
    protected int getRequiredProcessorCount()
    {
        return 4;
    }

    @Override
    protected PerfTestContext runDisruptorPass() throws Exception
    {
        PerfTestContext perfTestContext = new PerfTestContext();
        final CountDownLatch latch = new CountDownLatch(1);
        handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS));

        Future<?>[] futures = new Future[NUM_PUBLISHERS];
        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            futures[i] = executor.submit(valuePublishers[i]);
        }
        executor.submit(batchEventProcessor);

        long start = System.currentTimeMillis();
        cyclicBarrier.await();

        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            futures[i].get();
        }

        latch.await();

        perfTestContext.setDisruptorOps((ITERATIONS * 1000L) / (System.currentTimeMillis() - start));
        perfTestContext.setBatchData(handler.getBatchesProcessed(), ITERATIONS);
        batchEventProcessor.halt();

        return perfTestContext;
    }

    public static void main(String[] args) throws Exception
    {
        new ThreeToOneSequencedBatchThroughputTest().testImplementations();
    }
}

多生产者多消费者

/**
 * <pre>
 *
 * Sequence a series of events from multiple publishers going to one event processor.
 *
 * Disruptor:
 * ==========
 *             track to prevent wrap
 *             +--------------------+
 *             |                    |
 *             |                    |
 * +----+    +====+    +====+       |
 * | P1 |--->| RB |--->| SB |--+    |
 * +----+    +====+    +====+  |    |
 *                             |    v
 * +----+    +====+    +====+  | +----+
 * | P2 |--->| RB |--->| SB |--+>| EP |
 * +----+    +====+    +====+  | +----+
 *                             |
 * +----+    +====+    +====+  |
 * | P3 |--->| RB |--->| SB |--+
 * +----+    +====+    +====+
 *
 * P1 - Publisher 1
 * P2 - Publisher 2
 * P3 - Publisher 3
 * RB - RingBuffer
 * SB - SequenceBarrier
 * EP - EventProcessor
 *
 * </pre>
 */
public final class ThreeToThreeSequencedThroughputTest extends AbstractPerfTestDisruptor
{
    private static final int NUM_PUBLISHERS = 3;
    private static final int ARRAY_SIZE = 3;
    private static final int BUFFER_SIZE = 1024 * 64;
    private static final long ITERATIONS = 1000L * 1000L * 180L;
    private final ExecutorService executor =
        Executors.newFixedThreadPool(NUM_PUBLISHERS + 1, DaemonThreadFactory.INSTANCE);
    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1);
    ///////////////////////////////////////////////////////////////////////////////////////////////

    @SuppressWarnings("unchecked")
    private final RingBuffer<long[]>[] buffers = new RingBuffer[NUM_PUBLISHERS];
    private final SequenceBarrier[] barriers = new SequenceBarrier[NUM_PUBLISHERS];
    private final LongArrayPublisher[] valuePublishers = new LongArrayPublisher[NUM_PUBLISHERS];

    private final LongArrayEventHandler handler = new LongArrayEventHandler();
    private final MultiBufferBatchEventProcessor<long[]> batchEventProcessor;

    private static final EventFactory<long[]> FACTORY = new EventFactory<long[]>()
    {
        @Override
        public long[] newInstance()
        {
            return new long[ARRAY_SIZE];
        }
    };

    {
        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            buffers[i] = RingBuffer.createSingleProducer(FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
            barriers[i] = buffers[i].newBarrier();
            valuePublishers[i] = new LongArrayPublisher(
                cyclicBarrier,
                buffers[i],
                ITERATIONS / NUM_PUBLISHERS,
                ARRAY_SIZE);
        }

        batchEventProcessor = new MultiBufferBatchEventProcessor<long[]>(buffers, barriers, handler);

        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            buffers[i].addGatingSequences(batchEventProcessor.getSequences()[i]);
        }
    }
    ///////////////////////////////////////////////////////////////////////////////////////////////

    @Override
    protected int getRequiredProcessorCount()
    {
        return 4;
    }

    @Override
    protected PerfTestContext runDisruptorPass() throws Exception
    {
        PerfTestContext perfTestContext = new PerfTestContext();
        final CountDownLatch latch = new CountDownLatch(1);
        handler.reset(latch, ITERATIONS);

        Future<?>[] futures = new Future[NUM_PUBLISHERS];
        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            futures[i] = executor.submit(valuePublishers[i]);
        }
        executor.submit(batchEventProcessor);

        long start = System.currentTimeMillis();
        cyclicBarrier.await();

        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            futures[i].get();
        }

        latch.await();

        perfTestContext.setDisruptorOps((ITERATIONS * 1000L * ARRAY_SIZE) / (System.currentTimeMillis() - start));
        perfTestContext.setBatchData(handler.getBatchesProcessed(), ITERATIONS * ARRAY_SIZE);
        batchEventProcessor.halt();

        return perfTestContext;
    }

    public static void main(String[] args) throws Exception
    {
        new ThreeToThreeSequencedThroughputTest().testImplementations();
    }
}

参考

Gallery of Processor Cache Effects

Dissecting the Disruptor: What’s so special about a ring buffer?

Dissecting the Disruptor: How do I read from the ring buffer?

Dissecting the Disruptor: Writing to the ring buffer

Dissecting the Disruptor: Wiring up the dependencies

Dissecting the Disruptor: Why it’s so fast (part one) – Locks Are Bad

Dissecting the Disruptor: Why it’s so fast (part two) – Magic cache line padding

Dissecting the Disruptor: Demystifying Memory Barriers

Disruptor 2.0 – All Change Please

The LMAX Architecture

False Sharing

disruptor_github

LMAX Disruptor_github

Memory Barriers/Fences

Lock-Based vs Lock-Free Concurrent Algorithms

The results of latency and throughput testing against a queue implementation

What Every Programmer Should Know About Memory

US-B.Ralph
Java并发编程语言

Leave a Comment

邮箱地址不会被公开。 必填项已用*标注

5 × 4 =