glmapper

SOFATracer 中 Disruptor 实践

字数统计: 6.3k阅读时长: 27 min
2018/11/10 Share

OpenTraceing 规范

SOFATracer 对 OpenTraceing 的实现

SOFATracer 就是根据 OpenTracing 规范 衍生出来的分布式 链路跟 踪的解决方案。

概念

OpenTracing 标准中有三个重要的相互关联的类型,分别是Tracer, SpanSpanContext

【下面的概念说明过程中,如不做说明,所使用的案例代码均以SOFATracer中的实现为例。】

Tracer

一个 trace 代表一个潜在的,分布式的,存在并行数据或并行执行轨迹(潜在的分布式、并行)的系统。一个trace可以认为是多个span的有向无环图(DAG)。

Tracer接口用来创建Span,以及处理如何处理Inject(serialize) 和 Extract (deserialize),用于跨进程边界传递。

SOFATracerSofaTracer这个类实现了 opentracingTracer 接口,并在此规范接口上做了一些扩展。看下Tracer 中声明的方法:

1
2
3
4
5
6
7
8
9
10
11
12
public interface Tracer {
//启动一个新的span
SpanBuilder buildSpan(String operationName);
//将SpanContext上下文Inject(注入)到carrier
<C> void inject(SpanContext spanContext, Format<C> format, C carrier);
//将SpanContext上下文从carrier中Extract(提取)
<C> SpanContext extract(Format<C> format, C carrier);

interface SpanBuilder {
// 省略
}
}

所以从接口定义来看,要实现一个Tracer,必须要实现其以下的几个能力:

启动一个新的span

SOFATracer 实现了 TracerbuildSpan 方法:

1
2
3
4
@Override
public SpanBuilder buildSpan(String operationName) {
return new SofaTracerSpanBuilder(operationName);
}

operationName :操作名称,字符串类型,表示由Span完成的工作 (例如,RPC方法名称、函数名称或一个较大的计算任务中的阶段的名称)。操作名称应该用泛化的字符串形式标识出一个Span实例。

何为泛化的字符串形式,比如现在有一个操作:获取用户 ;下面有几种标识方式:

  • 1、/get
  • 2、/get/user
  • 3、/get/user/123

方式1过于抽象,方式3过于具体。方式2是正确的操作名。

将SpanContext上下文Inject(注入)到carrier

1
2
3
4
5
6
7
8
@Override
public <C> void inject(SpanContext spanContext, Format<C> format, C carrier) {
RegistryExtractorInjector<C> registryInjector = TracerFormatRegistry.getRegistry(format);
if (registryInjector == null) {
throw new IllegalArgumentException("Unsupported injector format: " + format);
}
registryInjector.inject((SofaTracerSpanContext) spanContext, carrier);
}
  • SpanContext :实例
  • format(格式化)描述,一般会是一个字符串常量,但不做强制要求。通过此描述,通知Tracer实现,如何对SpanContext进行编码放入到carrier中。
    carrier,根据format确定。Tracer实现根据format声明的格式,将SpanContext序列化到carrier对象中。

RegistryExtractorInjector 见后面

将SpanContext上下文从carrier中Extract(提取)

1
2
3
4
5
6
7
8
@Override
public <C> SpanContext extract(Format<C> format, C carrier) {
RegistryExtractorInjector<C> registryExtractor = TracerFormatRegistry.getRegistry(format);
if (registryExtractor == null) {
throw new IllegalArgumentException("Unsupported extractor format: " + format);
}
return registryExtractor.extract(carrier);
}
  • 格式描述符(format descriptor)(通常但不一定是字符串常量),告诉Tracer的实现如何在载体对象中对SpanContext进行编码
  • 载体(carrier),其类型由格式描述符指定。Tracer的实现将根据格式描述对此载体对象中的SpanContext进行编码

返回一个SpanContext实例,可以使用这个SpanContext实例,通过Tracer创建新的Span

Format

Tracer的注入和提取来看,format都是必须的。

Inject(注入)和Extract(提取)依赖于可扩展的format参数。format参数规定了另一个参数"carrier"的类型,同时约束了"carrier"SpanContext是如何编码的。所有的Tracer实现,都必须支持下面的format

  • Text Map: 基于字符串:字符串的map,对于keyvalue不约束字符集。
  • HTTP Headers: 适合作为HTTP头信息的,基于字符串:字符串的map。(RFC 7230.在工程实践中,如何处理HTTP头具有多样性,强烈建议tracer的使用者谨慎使用HTTP头的键值空间和转义符)
  • Binary: 一个简单的二进制大对象,记录SpanContext的信息。

在上面的注入和提取代码中,有如下代码片段:

1
2
3
4
5
6
//注入
RegistryExtractorInjector<C> registryInjector  =
TracerFormatRegistry.getRegistry(format);
//提取
RegistryExtractorInjector<C> registryExtractor =
TracerFormatRegistry.getRegistry(format);

来通过TracerFormatRegistry这个类来来看下 SOFATracer 中的 Format 的具体实现。

X-B3

在看Format之前,先了解下X-B3

1
2
Access-Control-Expose-Headers: 
X-B3-TraceId,X-B3-ParentSpanId,X-B3-SpanId

HTTP请求时其span参数通过http headers来传递追踪信息;header中对应的key分别是:

  • X-B3-TraceId: 64 encoded bits(id被encode为hex Strings)
  • X-B3-SpanId : 64 encoded bits
  • X-B3-ParentSpanId: 64 encoded bits
  • X-B3-Sampled:(是否采样) Boolean (either “1” or “0”)(下面的调用是否进行采样)
  • X-B3-Flags:a Long

SOFATracer 中的 Format

具体代码在 tracer-core -> com.alipay.common.tracer.core.registy 包下:

  • TextMapFormatter
  • TextMapB3Formatter
  • HttpHeadersFormatter
  • HttpHeadersB3Formatter
  • BinaryFormater

BinaryFormater:这个的注入和提取实现没有编解码一说;本身就是基于二进制流的操作。

TextMapB3Formatter/TextMapFormatterHttpHeadersB3Formatter/HttpHeadersFormatter 区别就在于编解码不同。HttpHeadersB3Formatter使用的是 URLDecoder.decode && URLDecoder.encode ; TextMapB3Formatter 返回的是值本身(如果为空或者null则返回空字符串)。

TextMapFormatterTextMapB3Formatter区别在于注入或者提取是使用的key不用。TextMapB3Formatter中使用的是 x-b3-{} 的字符串作为key

Span

一个span代表系统中具有开始时间和执行时长的逻辑运行单元。span之间通过嵌套或者顺序排列建立逻辑因果关系。当Span结束后(span.finish()),除了通过Span获取SpanContext外,下列其他所有方法都不允许被调用。

同样先来看下opentracing规范api 定义的 span 的定义及方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Span extends Closeable {
SpanContext context();
void finish();
void finish(long finishMicros);
void close();
Span setTag(String key, String value);
Span setTag(String key, boolean value);
Span setTag(String key, Number value);
Span log(Map<String, ?> fields);
Span log(long timestampMicroseconds, Map<String, ?> fields);
Span log(String event);
Span log(long timestampMicroseconds, String event);
Span setBaggageItem(String key, String value);
String getBaggageItem(String key);
Span setOperationName(String operationName);
Span log(String eventName, /* @Nullable */ Object payload);
Span log(long timestampMicroseconds, String eventName, /* @Nullable */ Object payload);
}

通过Span获取SpanContext

1
2
3
4
5
//SOFATracerSpan
@Override
public SpanContext context() {
return this.sofaTracerSpanContext;
}

返回值,Span构建时传入的SpanContext。这个返回值在Span结束后(span.finish()),依然可以使用。

复写操作名

1
2
3
4
5
@Override
public Span setOperationName(String operationName) {
this.operationName = operationName;
return this;
}

operationName:新的操作名,覆盖构建Span时,传入的操作名。

结束Span

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void finish() {
this.finish(System.currentTimeMillis());
}

@Override
public void finish(long endTime) {
this.setEndTime(endTime);
//关键记录:report span
this.sofaTracer.reportSpan(this);
SpanExtensionFactory.logStoppedSpan(this);
}

有一个可选参数,如果指定完成时间则使用当前指定的时间;如果省略此参数,使用当前时间作为完成时间。finish方法中会将当前span进行report操作。

为Span设置tag

Tag是一个key:value格式的数据。key必须是String类型,value可以是字符串、布尔或者数字

  • 字符串类型的value 设置tag
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Span setTag(String key, String value) {
if (StringUtils.isBlank(key) || StringUtils.isBlank(value)) {
return this;
}
this.tagsWithStr.put(key, value);
//注意:server 还是 client 在 OpenTracing 标准中是用 tags 标识的,所以在这里进行判断
if (isServer()) {
Reporter serverReporter = this.sofaTracer.getServerReporter();
if (serverReporter != null) {
this.setLogType(serverReporter.getReporterType());
}
} else if (isClient()) {
Reporter clientReporter = this.sofaTracer.getClientReporter();
if (clientReporter != null) {
this.setLogType(clientReporter.getReporterType());
}
}
return this;
}
  • 布尔类型的value 设置tag
1
2
3
4
public Span setTag(String key, boolean value) {
this.tagsWithBool.put(key, value);
return this;
}
  • 数字类型的value 设置tag
1
2
3
4
5
6
7
public Span setTag(String key, Number number) {
if (number == null) {
return this;
}
this.tagsWithNumber.put(key, number);
return this;
}

Log结构化数据

1
2
3
4
5
6
7
8
9
10
11
@Override
public Span log(long currentTime, Map<String, ?> map) {
AssertUtils.isTrue(currentTime >= startTime, "current time must greater than start time");
this.logs.add(new LogData(currentTime, map));
return this;
}

@Override
public Span log(Map<String, ?> map) {
return this.log(System.currentTimeMillis(), map);
}
  • Map<String, ?> map : 键必须是字符串类型,值可以是任意类型
  • currentTime : 时间戳。如果指定时间戳,那么它必须在span的开始和结束时间之内。

设置一个baggage(随行数据)元素

Baggage元素是一个键值对集合,将这些值设置给给定的SpanSpanSpanContext,以及所有和此Span有直接或者间接关系的本地Span。 也就是说,baggage元素随trace一起保持在带内传递。(译者注:带内传递,在这里指,随应用程序调用过程一起传递)

Baggage元素为OpenTracing的实现全栈集成,提供了强大的功能 (例如:任意的应用程序数据,可以在移动端创建它,显然的,它会一直传递了系统最底层的存储系统。由于它如此强大的功能,他也会产生巨大的开销,请小心使用此特性。

再次强调,请谨慎使用此特性。每一个键值都会被拷贝到每一个本地和远程的下级相关的span中,因此,总体上,他会有明显的网络和CPU开销。

1
2
3
4
5
@Override
public Span setBaggageItem(String key, String value) {
this.sofaTracerSpanContext.setBizBaggageItem(key, value);
return this;
}

SofaTracerSpan 中的属性

  • sofaTracer : 当前 tracer
  • spanReferences : 当前span的关系,ChildOf(引用) or FollowsFrom(跟随)
  • tagsWithStr : String 类型的tag 集合
  • tagsWithBool : 布尔类型的tag集合
  • tagsWithNumber : 数值类型的tag集合
  • logs : log结构化数据列表,通过span.log(map)操作的map,均存储在logs中。
  • operationName:当前span的操作名
  • sofaTracerSpanContext:当前 spanContext
  • startTime : 当前span 开始时间
  • endTime : 当前span 结束时间,在finish方法中传入。
  • logType : report时才有意义:摘要日志类型,日志能够正确打印的关键信息;当前 span 的日志类型,如:客户端为 rpc-client-digest.log,服务端为 rpc-server-digest.log
  • parentSofaTracerSpan:父亲 span,当作为客户端结束并弹出线程上下文时,需要将父亲 span 再放入

SpanContext

opentracingSpanContext 接口中只有一个baggageItems方法,通过这个方法来遍历所有的baggage元素。

1
2
3
public interface SpanContext {
Iterable<Map.Entry<String, String>> baggageItems();
}

相对于OpenTracing中其他的功能,SpanContext更多的是一个“概念”。也就是说,OpenTracing实现中,需要重点考虑,并提供一套自己的API

OpenTracing的使用者仅仅需要,在创建span、向传输协议Inject(注入)和从传输协议中Extract(提取)时,使用SpanContextreferences

OpenTracing要求,SpanContext是不可变的,目的是防止由于Span的结束和相互关系,造成的复杂生命周期问题。

Disruptor 简介

A High Performance Inter-Thread Messaging Library 高性能的线程间消息传递库

关于 Disruptor 的 一些原理分析可以参考:disruptor

案例

先通过 Disruptor 的一个小例子来有个直观的认识;先看下它的构造函数:

1
2
3
4
5
6
7
8
9
10
11
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
  • eventFactory : 在环形缓冲区中创建事件的 factory
  • ringBufferSize:环形缓冲区的大小,必须是2的幂。
  • threadFactory:用于为处理器创建线程。
  • producerType:生成器类型以支持使用正确的sequencerpublisher创建RingBuffer;枚举类型,SINGLEMULTI两个项。对应于 SingleProducerSequencerMultiProducerSequencer两种Sequencer
  • waitStrategy : 等待策略;

如果我们想构造一个disruptor,那么我们就需要上面的这些组件。从eventFactory来看,还需要一个具体的Event来作为消息事件的载体。【下面按照官方给的案例进行简单的修改作为示例】

消息事件 LongEvent ,能够被消费的数据载体

1
2
3
4
5
6
7
8
9
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
public long getValue() {
return value;
}
}

创建消息事件的factory

1
2
3
4
5
6
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}

ConsumerThreadFactory

1
2
3
4
5
6
7
public class ConsumerThreadFactory implements ThreadFactory {
private final AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "disruptor-thread-" + index.getAndIncrement());
}
}

OK ,上面的这些可以满足创建一个disruptor了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private int ringBufferCapacity = 8;
//消息事件生产Factory
LongEventFactory longEventFactory = new LongEventFactory();
//执行事件处理器线程Factory
ConsumerThreadFactory consumerThreadFactory = new ConsumerThreadFactory();
//用于环形缓冲区的等待策略。
WaitStrategy waitStrategy = new BlockingWaitStrategy();

//构建disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
longEventFactory,
ringBufferCapacity,
longEventThreadFactory,
ProducerType.SINGLE,
waitStrategy);

现在是已经有了 disruptor 了,然后通过:start 来启动:

1
2
//启动 disruptor
disruptor.start();

到这里,已经构建了一个disruptor;但是目前怎么使用它来发布消息和消费消息呢?

发布消息

下面在 for 循环中 发布 5 条数据:

1
2
3
4
5
6
7
8
9
10
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
for (long l = 0; l < 5; l++)
{
long sequence = ringBuffer.next();
LongEvent event = ringBuffer.get(sequence);
event.set(100+l);
System.out.println("publish event :" + l);
ringBuffer.publish(sequence);
Thread.sleep(1000);
}

消息已经发布,下面需要设定当前disruptor的消费处理器。前面已经有个LongEventEventFactory ; 在disruptor中是通过 EventHandler 来进行消息消费的。

编写消费者代码

1
2
3
4
5
6
7
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue()+" -> " + Thread.currentThread().getName());
Thread.sleep(2000);
}
}

eventHandler 设置到 disruptor 的处理链上

1
2
3
//将处理事件的事件处理程序 -> 消费事件的处理程序
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);

运行结果(这里):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
publish event :0
Event: 0 -> disruptor-thread-1
-------------------------------->
publish event :1
Event: 1 -> disruptor-thread-1
-------------------------------->
publish event :2
Event: 2 -> disruptor-thread-1
-------------------------------->
publish event :3
Event: 3 -> disruptor-thread-1
-------------------------------->
publish event :4
Event: 4 -> disruptor-thread-1
-------------------------------->

基本概念和原理

Disruptor

整个基于ringBuffer实现的生产者消费者模式的容器。主要属性

1
2
3
4
5
private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
  • ringBuffer:内部持有一个 RingBuffer 对象,Disruptor 内部的事件发布都是依赖这个RingBuffer对象完成的。
  • executor:消费事件的线程池
  • consumerRepository:提供存储库机制,用于将EventHandlerEventProcessor关联起来
  • started : 用于标志当前Disruptor是否已经启动
  • exceptionHandler : 异常处理器,用于处理BatchEventProcessor事件周期中 uncaught exceptions

RingBuffer

环形队列[实现上是一个数组],可以类比为BlockingQueue之类的队列,ringBuffer的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。

1
2
public final class RingBuffer<E> extends RingBufferFields<E> 
implements Cursored, EventSequencer<E>, EventSink<E>
  • E:在事件的交换或并行协调期间存储用于共享的数据的实现 -> 消息事件

Sequencer

RingBuffer 中 生产者的顶级父接口,其直接实现有SingleProducerSequencerMultiProducerSequencer;对应 SINGLEMULTI 两个枚举值。

EventHandler

事件处置器,改接口用于对外扩展来实现具体的消费逻辑。如上面 demo 中的 LongEventHandler ;

1
2
3
4
//回调接口,用于处理{@link RingBuffer}中可用的事件
public interface EventHandler<T> {
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
  • event : RingBuffer 已经发布的事件
  • sequence : 正在处理的事件 的序列号
  • endOfBatch : 用来标识否是来自 RingBuffer 的批次中的最后一个事件

SequenceBarrier

消费者路障。规定了消费者如何向下走。事实上,该路障算是变向的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
final class ProcessingSequenceBarrier implements SequenceBarrier {
//当等待(探测)的需要不可用时,等待的策略
private final WaitStrategy waitStrategy;
//依赖的其它Consumer的序号,这个用于依赖的消费的情况,
//比如A、B两个消费者,只有A消费完,B才能消费。
private final Sequence dependentSequence;
private volatile boolean alerted = false;
//Ringbuffer的写入指针
private final Sequence cursorSequence;
//RingBuffer对应的Sequencer
private final Sequencer sequencer;
//exclude method
}

waitStrategy 决定了消费者采用何种等待策略。

WaitStrategy

Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.

EventProcessor 的等待策略;具体实现在 disruptor 中有8种,

这些等待策略不同的核心体现是在如何实现 waitFor 这个方法上。

EventProcessor

事件处理器,实际上可以理解为消费者模型的框架,实现了线程Runnablerun方法,将循环判断等操作封在了里面。该接口有三个实现类:

1、BatchEventProcessor

1
2
3
4
5
6
7
8
9
10
public final class BatchEventProcessor<T> implements EventProcessor {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence( Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
//exclude method
}
  • ExceptionHandler:异常处理器
  • DataProvider:数据来源,对应 RingBuffer
  • EventHandler:处理 Event 的回调对象
  • SequenceBarrier:对应的序号屏障
  • TimeoutHandler:超时处理器,默认情况为空,如果要设置,只需要要将关联的EventHandler实现TimeOutHandler即可。

如果我们选择使用 EventHandler 的时候,默认使用的就是 BatchEventProcessor,它与EventHandler是一一对应,并且是单线程执行。

如果某个RingBuffer有多个BatchEventProcessor,那么就会每个BatchEventProcessor对应一个线程。

2、WorkProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class WorkProcessor<T> implements EventProcessor {
private final AtomicBoolean running = new AtomicBoolean(false);
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer<T> ringBuffer;
private final SequenceBarrier sequenceBarrier;
private final WorkHandler<? super T> workHandler;
private final ExceptionHandler<? super T> exceptionHandler;
private final Sequence workSequence;

private final EventReleaser eventReleaser = new EventReleaser() {
@Override
public void release() {
sequence.set(Long.MAX_VALUE);
}
};
private final TimeoutHandler timeoutHandler;
}

基本和 BatchEventProcessor 类似,不同在于,用于处理Event的回调对象是WorkHandler

原理图

无消费者情况下,生产者保持生产,但是 remainingCapacity 保持不变

在写demo的过程中,本来想通过不设定 消费者 来观察 RingBuffer 可用容量变化的。但是验证过程中,一直得不到预期的结果,(注:没有设置消费者,只有生产者),先看结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
publish event :0
bufferSie:8
remainingCapacity:8
cursor:0
-------------------------------->
publish event :1
bufferSie:8
remainingCapacity:8
cursor:1
-------------------------------->
publish event :2
bufferSie:8
remainingCapacity:8
cursor:2
-------------------------------->
publish event :3
bufferSie:8
remainingCapacity:8
cursor:3
-------------------------------->
publish event :4
bufferSie:8
remainingCapacity:8
cursor:4
-------------------------------->
publish event :5
bufferSie:8
remainingCapacity:8
cursor:5
-------------------------------->
publish event :6
bufferSie:8
remainingCapacity:8
cursor:6
-------------------------------->
publish event :7
bufferSie:8
remainingCapacity:8
cursor:7
-------------------------------->
publish event :8
bufferSie:8
remainingCapacity:8
cursor:8
-------------------------------->
publish event :9
bufferSie:8
remainingCapacity:8
cursor:9
-------------------------------->

从结果来看,remainingCapacity 的值应该随着 发布的数量 递减的;但是实际上它并没有发生任何变化。

来看下ringBuffer.remainingCapacity() 这个方法:

1
2
3
4
5
6
7
8
9
/**
* Get the remaining capacity for this ringBuffer.
*
* @return The number of slots remaining.
*/
public long remainingCapacity()
{
return sequencer.remainingCapacity();
}

这里面又使用 sequencer.remainingCapacity()这个方法来计算的。上面的例子中使用的是ProducerType.SINGLE,那来看SingleProducerSequencer 这个里面remainingCapacity的实现。

1
2
3
4
5
6
7
8
9
10
11
@Override
public long remainingCapacity()
{
//上次申请完毕的序列值
long nextValue = this.nextValue;
//计算当前已经消费到的序列值
long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
//当前生产到的序列值
long produced = nextValue;
return getBufferSize() - (produced - consumed);
}

来解释下这段代码的含义:

假设当前 ringBufferbufferSize 是 8 ;上次申请到的序列号是 5,其实也就是说已经生产过占用的序列号是5;假设当前已经消费到的序列号是 3,那么剩余的容量为: 8-(5-2) = 5;

因为这里我们可以确定 bufferSizeproduced 的值了,那么 remainingCapacity 的结果就取决于getMinimumSequence的计算结果了。

1
2
3
4
5
6
7
8
9
public static long getMinimumSequence(final Sequence[] sequences, long minimum)
{
for (int i = 0, n = sequences.length; i < n; i++)
{
long value = sequences[i].get();
minimum = Math.min(minimum, value);
}
return minimum;
}

这个方法是从 Sequence 数组中获取最小序列 。如果sequences 为空,则返回 minimum。回到上一步,看下sequences这个数组是从哪里过来的,它的值在哪里设置的。

1
long consumed = Util.getMinimumSequence(gatingSequences, nextValue);

gatingSequencesSingleProducerSequencer父类 AbstractSequencer 中的成员变量:

1
protected volatile Sequence[] gatingSequences = new Sequence[0];

gatingSequences 是在下面这个方法里面来管理的。

1
2
3
4
5
6
7
8
/**
* @see Sequencer#addGatingSequences(Sequence...)
*/
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

这个方法的调用栈向前追溯有这几个地方调用了:

WorkerPool来管理多个消费者;hangdlerEventsWith 这个方法也是用来设置消费者的。但是在上面的测试案例中我们是想通过不设定消费者 只设定生成者 来观察 环形队列的占用情况,所以gatingSequences 会一直是空的,因此在计算时会把 produced 的值作为 minimum 返回。这样每次计算就相当于:

1
return getBufferSize() - (produced - produced) === getBufferSize();

也就验证了为何在不设定消费者的情况下,remainingCapacity 的值会一直保持不变。

SOFATracer 中 Disruptor 实践

SOFATracer中,AsyncCommonDigestAppenderManagerdisruptor 进行了封装,用于处理外部组件的Tracer摘要日志。该部分借助 AsyncCommonDigestAppenderManager 的源码来分析下SOFATracer如何使用disruptor的。

SOFATracer中使用了两种不同的事件模型,一种是SOFATracer内部使用的 StringEvent , 一种是 外部扩展使用的 SofaTacerSpanEvent。这里以 SofaTacerSpanEvent 这种事件模型来分析。StringEvent 消息事件模型对应的是 AsyncCommonAppenderManager 类封装的disruptor

SofaTracerSpanEvent ( -> LongEvent)

定义消息事件模型,SofaTacerSpanEvent 和 前面 demo 中的 LongEvent 基本结构是一样的,主要是内部持有的消息数据不同,LongEvent 中是一个long类型的数据,SofaTacerSpanEvent中持有的是 SofaTracerSpan

1
2
3
4
5
6
7
8
9
public class SofaTracerSpanEvent {
private volatile SofaTracerSpan sofaTracerSpan;
public SofaTracerSpan getSofaTracerSpan() {
return sofaTracerSpan;
}
public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) {
this.sofaTracerSpan = sofaTracerSpan;
}
}

Consumer ( -> LongEventHandler)

ConsumerAsyncCommonDigestAppenderManager 的内部类;实现了 EventHandler 接口,这个consumer就是作为消费者存在的。

AsyncCommonAppenderManager中也有一个,这个地方个人觉得可以抽出去,这样可以使得AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager的代码看起来更干净;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private class Consumer implements EventHandler<SofaTracerSpanEvent> {
//日志类型集合,非该集合内的日志类型将不会被处理
protected Set<String> logTypes = Collections.synchronizedSet(new HashSet<String>());
@Override
public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch)
throws Exception {
// 拿到具体的消息数据 sofaTracerSpan
SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan();
// 如果没有数据,则不做任何处理
if (sofaTracerSpan != null) {
try {
String logType = sofaTracerSpan.getLogType();
// 验证当前日志类型是否可以被当前consumer消费
if (logTypes.contains(logType)) {
// 获取编码类型
SpanEncoder encoder = contextEncoders.get(logType);
//获取 appender
TraceAppender appender = appenders.get(logType);
// 对数据进行编码处理
String encodedStr = encoder.encode(sofaTracerSpan);
if (appender instanceof LoadTestAwareAppender) {
((LoadTestAwareAppender) appender).append(encodedStr,
TracerUtils.isLoadTest(sofaTracerSpan));
} else {
appender.append(encodedStr);
}
// 刷新缓冲区,日志输出
appender.flush();
}
} catch (Exception e) {
// 异常省略
}
}
}

public void addLogType(String logType) {
logTypes.add(logType);
}
}

SofaTracerSpanEventFactory (-> LongEventFactory)

用于产生消息事件的 Factory

1
2
3
4
5
6
public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> {
@Override
public SofaTracerSpanEvent newInstance() {
return new SofaTracerSpanEvent();
}
}

ConsumerThreadFactory (-> LongEventThreadFactory )

用来产生消费线程的 Factory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ConsumerThreadFactory implements ThreadFactory {
private String workName;
public String getWorkName() {
return workName;
}
public void setWorkName(String workName) {
this.workName = workName;
}
@Override
public Thread newThread(Runnable runnable) {
Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName);
worker.setDaemon(true);
return worker;
}
}

构建disruptor

disruptor 的构建是在 AsyncCommonDigestAppenderManager 的构造函数中完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) {
// 使用这个计算来保证realQueueSize是2的次幂(返回当前 大于等于queueSize的最小的2的次幂数 )
int realQueueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
//构建disruptor,使用的是 ProducerType.MULTI
//等待策略是 BlockingWaitStrategy
disruptor = new Disruptor<SofaTracerSpanEvent>(new SofaTracerSpanEventFactory(),
realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
//消费者列表
this.consumers = new ArrayList<Consumer>(consumerNumber);

for (int i = 0; i < consumerNumber; i++) {
Consumer consumer = new Consumer();
consumers.add(consumer);
//设置异常处理程序
disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler());
//绑定消费者
disruptor.handleEventsWith(consumer);
}

//是否允许丢弃,从配置文件获取
this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD));

if (allowDiscard) {
   //是否记录丢失日志的数量
this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER,
DEFAULT_IS_OUT_DISCARD_NUMBER));
//是否记录丢失日志的TraceId和RpcId
this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID,
DEFAULT_IS_OUT_DISCARD_ID));
//丢失日志的数量达到该阈值进行一次日志输出
this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD,
DEFAULT_DISCARD_OUT_THRESHOLD));
if (isOutDiscardNumber) {
this.discardCount = new PaddedAtomicLong(0L);
}
}
}

启动 disruptor

disruptor的启动委托给了AsyncCommonDigestAppenderManagerstart方法来执行。

1
2
3
4
public void start(final String workerName) {
this.threadFactory.setWorkName(workerName);
this.ringBuffer = this.disruptor.start();
}

来看下,SOFATracer 中 具体是在哪里调用这个start 的:

  • CommonTracerManager : 这个里面持有了AsyncCommonDigestAppenderManager 类的一个单例对象,并且是static 静态代码块中调用了start方法;这个用来输出普通日志。
  • SofaTracerDigestReporterAsyncManager:这里类里面也是持有了AsyncCommonDigestAppenderManager 类的一个单例对像,并且提供了getSofaTracerDigestReporterAsyncManager方法来获取该单例,在这个方法中调用了start方法;该对象用来输出摘要日志。

发布事件

前面的demo中是通过一个for循环来发布事件的,在 SOFATracer 中 的事件发布无非就是当有Tracer日志需要输出时会触发发布,那么对应的就是日志的 append 操作,将日志 append 到环形缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public boolean append(SofaTracerSpan sofaTracerSpan) {
long sequence = 0L;
//是否允许丢弃
if (allowDiscard) {
try {
//允许丢弃就使用tryNext尝试申请序列,申请不到抛出异常
sequence = ringBuffer.tryNext();
} catch (InsufficientCapacityException e) {
//是否输出丢失日志的TraceId和RpcId
if (isOutDiscardId) {
SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan
.getSofaTracerSpanContext();
if (sofaTracerSpanContext != null) {
SynchronizingSelfLog.warn("discarded tracer: traceId["
+ sofaTracerSpanContext.getTraceId()
+ "];spanId[" + sofaTracerSpanContext.getSpanId()
+ "]");
}
}
//是否输出丢失日志的数量
if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) {
discardCount.set(0);
if (isOutDiscardNumber) {
SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs");
}
}

return false;
}
} else {
// 不允许丢弃则使用next方法
sequence = ringBuffer.next();
}

try {
SofaTracerSpanEvent event = ringBuffer.get(sequence);
event.setSofaTracerSpan(sofaTracerSpan);
} catch (Exception e) {
SynchronizingSelfLog.error("fail to add event");
return false;
}
//发布
ringBuffer.publish(sequence);
return true;
}

SOFATracer 事件发布的调用逻辑:

追溯调用的流程,可以知道当前 span 调用 finish时或者 SOFATracer中调用reportSpan时 就相当于发布了一个消息事件。

小结

本文对 SOFATracer 中使用 Disruptor 来进行日志输出的代码进行了简单的分析,更多内部细节原理可以自行看下SOFATracer的代码。SOFATracer 作为一种比较底层的中间件组件,在实际的业务开发中基本是无法感知的。但是作为技术来学习,还是有很多点可以挖一挖。

SOFATracer GitHub 传送门

如果有小伙伴对中间件感兴趣,欢迎加入我们团队,欢迎来撩;对 SOFA 技术体系有兴趣的可以关注我们 ALIPAY SOFA 社区;附团队镇楼图。

原文作者:GuoLei Song

原文链接:http://www.glmapper.com/2018/11/10/sofa-tracer-disruptor/

发表日期:November 10th 2018, 12:26:20 pm

更新日期:December 14th 2019, 11:04:39 am

版权声明:转载请注明出处

CATALOG
  1. 1. OpenTraceing 规范
  2. 2. SOFATracer 对 OpenTraceing 的实现
    1. 2.1. 概念
    2. 2.2. Tracer
      1. 2.2.1. 启动一个新的span
      2. 2.2.2. 将SpanContext上下文Inject(注入)到carrier
      3. 2.2.3. 将SpanContext上下文从carrier中Extract(提取)
      4. 2.2.4. Format
      5. 2.2.5. X-B3
      6. 2.2.6. SOFATracer 中的 Format
    3. 2.3. Span
      1. 2.3.1. 通过Span获取SpanContext
      2. 2.3.2. 复写操作名
      3. 2.3.3. 结束Span
      4. 2.3.4. 为Span设置tag
      5. 2.3.5. Log结构化数据
      6. 2.3.6. 设置一个baggage(随行数据)元素
      7. 2.3.7. SofaTracerSpan 中的属性
      8. 2.3.8. SpanContext
  3. 3. Disruptor 简介
  4. 4. 案例
    1. 4.1. 消息事件 LongEvent ,能够被消费的数据载体
    2. 4.2. 创建消息事件的factory
    3. 4.3. ConsumerThreadFactory
    4. 4.4. 发布消息
    5. 4.5. 编写消费者代码
    6. 4.6. 运行结果(这里):
  5. 5. 基本概念和原理
    1. 5.1. Disruptor
    2. 5.2. RingBuffer
    3. 5.3. Sequencer
    4. 5.4. EventHandler
    5. 5.5. SequenceBarrier
    6. 5.6. WaitStrategy
    7. 5.7. EventProcessor
    8. 5.8. 原理图
  6. 6. 无消费者情况下,生产者保持生产,但是 remainingCapacity 保持不变
  7. 7. SOFATracer 中 Disruptor 实践
    1. 7.1. SofaTracerSpanEvent ( -> LongEvent)
    2. 7.2. Consumer ( -> LongEventHandler)
    3. 7.3. SofaTracerSpanEventFactory (-> LongEventFactory)
    4. 7.4. ConsumerThreadFactory (-> LongEventThreadFactory )
    5. 7.5. 构建disruptor
    6. 7.6. 启动 disruptor
    7. 7.7. 发布事件
  8. 8. 小结