SOFATracer 中 Disruptor 实践
OpenTraceing 规范
SOFATracer 对 OpenTraceing 的实现
SOFATracer 就是根据 OpenTracing 规范 衍生出来的分布式 链路跟 踪的解决方案。
概念
OpenTracing
标准中有三个重要的相互关联的类型,分别是Tracer
, Span
和 SpanContext
。
【下面的概念说明过程中,如不做说明,所使用的案例代码均以SOFATracer中的实现为例。】
Tracer
一个 trace
代表一个潜在的,分布式的,存在并行数据或并行执行轨迹(潜在的分布式、并行)的系统。一个trace
可以认为是多个span
的有向无环图(DAG
)。
Tracer接口用来创建Span,以及处理如何处理Inject(serialize) 和 Extract (deserialize),用于跨进程边界传递。
SOFATracer
中 SofaTracer
这个类实现了 opentracing
的 Tracer
接口,并在此规范接口上做了一些扩展。看下Tracer
中声明的方法:
1 | public interface Tracer { |
所以从接口定义来看,要实现一个Tracer,必须要实现其以下的几个能力:
启动一个新的span
SOFATracer
实现了 Tracer
中 buildSpan
方法:
1 |
|
operationName
:操作名称,字符串类型,表示由Span完成的工作 (例如,RPC方法名称、函数名称或一个较大的计算任务中的阶段的名称)。操作名称应该用泛化的字符串形式标识出一个Span实例。
何为泛化的字符串形式,比如现在有一个操作:获取用户 ;下面有几种标识方式:
- 1、/get
- 2、/get/user
- 3、/get/user/123
方式1过于抽象,方式3过于具体。方式2是正确的操作名。
将SpanContext上下文Inject(注入)到carrier
1 |
|
SpanContext
:实例format
(格式化)描述,一般会是一个字符串常量,但不做强制要求。通过此描述,通知Tracer实现,如何对SpanContext进行编码放入到carrier中。
carrier,根据format确定。Tracer实现根据format声明的格式,将SpanContext序列化到carrier对象中。
RegistryExtractorInjector 见后面
将SpanContext上下文从carrier中Extract(提取)
1 |
|
- 格式描述符(
format descriptor
)(通常但不一定是字符串常量),告诉Tracer
的实现如何在载体对象中对SpanContext
进行编码 - 载体(
carrier
),其类型由格式描述符指定。Tracer
的实现将根据格式描述对此载体对象中的SpanContext
进行编码
返回一个SpanContext
实例,可以使用这个SpanContext
实例,通过Tracer
创建新的Span
。
Format
从Tracer
的注入和提取来看,format
都是必须的。
Inject
(注入)和Extract
(提取)依赖于可扩展的format
参数。forma
t参数规定了另一个参数"carrier"
的类型,同时约束了"carrier"
中SpanContext
是如何编码的。所有的Tracer
实现,都必须支持下面的format
。
Text Map
: 基于字符串:字符串的map
,对于key
和value
不约束字符集。HTTP Headers
: 适合作为HTTP
头信息的,基于字符串:字符串的map
。(RFC 7230.
在工程实践中,如何处理HTTP
头具有多样性,强烈建议tracer
的使用者谨慎使用HTTP
头的键值空间和转义符)Binary
: 一个简单的二进制大对象,记录SpanContext
的信息。
在上面的注入和提取代码中,有如下代码片段:
1 | //注入 |
来通过TracerFormatRegistry
这个类来来看下 SOFATracer
中的 Format
的具体实现。
X-B3
在看Format
之前,先了解下X-B3
。
1 | Access-Control-Expose-Headers: |
HTTP
请求时其span
参数通过http headers
来传递追踪信息;header
中对应的key
分别是:
- X-B3-TraceId: 64 encoded bits(id被encode为hex Strings)
- X-B3-SpanId : 64 encoded bits
- X-B3-ParentSpanId: 64 encoded bits
- X-B3-Sampled:(是否采样) Boolean (either “1” or “0”)(下面的调用是否进行采样)
- X-B3-Flags:a Long
SOFATracer 中的 Format
具体代码在 tracer-core -> com.alipay.common.tracer.core.registy
包下:
- TextMapFormatter
- TextMapB3Formatter
- HttpHeadersFormatter
- HttpHeadersB3Formatter
- BinaryFormater
BinaryFormater:这个的注入和提取实现没有编解码一说;本身就是基于二进制流的操作。
TextMapB3Formatter/TextMapFormatter 和 HttpHeadersB3Formatter/HttpHeadersFormatter 区别就在于编解码不同。HttpHeadersB3Formatter
使用的是 URLDecoder.decode
&& URLDecoder.encode
; TextMapB3Formatter
返回的是值本身(如果为空或者null
则返回空字符串)。
TextMapFormatter和TextMapB3Formatter区别在于注入或者提取是使用的key
不用。TextMapB3Formatter
中使用的是 x-b3-{}
的字符串作为key
。
Span
一个span
代表系统中具有开始时间和执行时长的逻辑运行单元。span
之间通过嵌套或者顺序排列建立逻辑因果关系。当Span
结束后(span.finish()
),除了通过Span
获取SpanContext
外,下列其他所有方法都不允许被调用。
同样先来看下opentracing
规范 api
定义的 span
的定义及方法:
1 | public interface Span extends Closeable { |
通过Span获取SpanContext
1 | //SOFATracerSpan |
返回值,Span
构建时传入的SpanContext
。这个返回值在Span
结束后(span.finish()
),依然可以使用。
复写操作名
1 |
|
operationName:新的操作名,覆盖构建Span
时,传入的操作名。
结束Span
1 |
|
有一个可选参数,如果指定完成时间则使用当前指定的时间;如果省略此参数,使用当前时间作为完成时间。finish
方法中会将当前span
进行report
操作。
为Span设置tag
Tag
是一个key:value
格式的数据。key
必须是String
类型,value
可以是字符串、布尔或者数字。
- 字符串类型的value 设置tag
1 |
|
- 布尔类型的value 设置tag
1 | public Span setTag(String key, boolean value) { |
- 数字类型的value 设置tag
1 | public Span setTag(String key, Number number) { |
Log结构化数据
1 |
|
- Map<String, ?> map : 键必须是字符串类型,值可以是任意类型
- currentTime : 时间戳。如果指定时间戳,那么它必须在
span
的开始和结束时间之内。
设置一个baggage(随行数据)元素
Baggage
元素是一个键值对集合,将这些值设置给给定的Span
,Span
的SpanContext
,以及所有和此Span
有直接或者间接关系的本地Span
。 也就是说,baggage
元素随trace
一起保持在带内传递。(译者注:带内传递,在这里指,随应用程序调用过程一起传递)
Baggage
元素为OpenTracing
的实现全栈集成,提供了强大的功能 (例如:任意的应用程序数据,可以在移动端创建它,显然的,它会一直传递了系统最底层的存储系统。由于它如此强大的功能,他也会产生巨大的开销,请小心使用此特性。
再次强调,请谨慎使用此特性。每一个键值都会被拷贝到每一个本地和远程的下级相关的span
中,因此,总体上,他会有明显的网络和CPU
开销。
1 |
|
SofaTracerSpan 中的属性
- sofaTracer : 当前 tracer
- spanReferences : 当前span的关系,ChildOf(引用) or FollowsFrom(跟随)
- tagsWithStr : String 类型的tag 集合
- tagsWithBool : 布尔类型的tag集合
- tagsWithNumber : 数值类型的tag集合
- logs : log结构化数据列表,通过span.log(map)操作的map,均存储在logs中。
- operationName:当前span的操作名
- sofaTracerSpanContext:当前 spanContext
- startTime : 当前span 开始时间
- endTime : 当前span 结束时间,在finish方法中传入。
- logType : report时才有意义:摘要日志类型,日志能够正确打印的关键信息;当前 span 的日志类型,如:客户端为 rpc-client-digest.log,服务端为 rpc-server-digest.log
- parentSofaTracerSpan:父亲 span,当作为客户端结束并弹出线程上下文时,需要将父亲 span 再放入
SpanContext
opentracing
中 SpanContext
接口中只有一个baggageItems
方法,通过这个方法来遍历所有的baggage
元素。
1 | public interface SpanContext { |
相对于OpenTracing
中其他的功能,SpanContext
更多的是一个“概念”。也就是说,OpenTracing
实现中,需要重点考虑,并提供一套自己的API
。
OpenTracing
的使用者仅仅需要,在创建span
、向传输协议Inject
(注入)和从传输协议中Extract
(提取)时,使用SpanContext
和references
,
OpenTracing
要求,SpanContext
是不可变的,目的是防止由于Span
的结束和相互关系,造成的复杂生命周期问题。
Disruptor 简介
A High Performance Inter-Thread Messaging Library 高性能的线程间消息传递库
关于 Disruptor 的 一些原理分析可以参考:disruptor
案例
先通过 Disruptor
的一个小例子来有个直观的认识;先看下它的构造函数:
1 | public Disruptor( |
- eventFactory : 在环形缓冲区中创建事件的
factory
- ringBufferSize:环形缓冲区的大小,必须是2的幂。
- threadFactory:用于为处理器创建线程。
- producerType:生成器类型以支持使用正确的
sequencer
和publisher
创建RingBuffer
;枚举类型,SINGLE
、MULTI
两个项。对应于SingleProducerSequencer
和MultiProducerSequencer
两种Sequencer
。 - waitStrategy : 等待策略;
如果我们想构造一个disruptor
,那么我们就需要上面的这些组件。从eventFactory
来看,还需要一个具体的Event
来作为消息事件的载体。【下面按照官方给的案例进行简单的修改作为示例】
消息事件 LongEvent ,能够被消费的数据载体
1 | public class LongEvent { |
创建消息事件的factory
1 | public class LongEventFactory implements EventFactory<LongEvent> { |
ConsumerThreadFactory
1 | public class ConsumerThreadFactory implements ThreadFactory { |
OK ,上面的这些可以满足创建一个disruptor
了:
1 | private int ringBufferCapacity = 8; |
现在是已经有了 disruptor
了,然后通过:start
来启动:
1 | //启动 disruptor |
到这里,已经构建了一个disruptor
;但是目前怎么使用它来发布消息和消费消息呢?
发布消息
下面在 for
循环中 发布 5 条数据:
1 | RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); |
消息已经发布,下面需要设定当前disruptor
的消费处理器。前面已经有个LongEvent
和 EventFactory
; 在disruptor
中是通过 EventHandler
来进行消息消费的。
编写消费者代码
1 | public class LongEventHandler implements EventHandler<LongEvent> { |
将 eventHandler
设置到 disruptor
的处理链上
1 | //将处理事件的事件处理程序 -> 消费事件的处理程序 |
运行结果(这里):
1 | publish event :0 |
基本概念和原理
Disruptor
整个基于ringBuffer
实现的生产者消费者模式的容器。主要属性
1 | private final RingBuffer<T> ringBuffer; |
ringBuffer
:内部持有一个RingBuffer
对象,Disruptor
内部的事件发布都是依赖这个RingBuffer
对象完成的。executor
:消费事件的线程池consumerRepository
:提供存储库机制,用于将EventHandler
与EventProcessor
关联起来started
: 用于标志当前Disruptor
是否已经启动exceptionHandler
: 异常处理器,用于处理BatchEventProcessor
事件周期中uncaught exceptions
。
RingBuffer
环形队列[实现上是一个数组],可以类比为BlockingQueue
之类的队列,ringBuffer
的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。
1 | public final class RingBuffer<E> extends RingBufferFields<E> |
- E:在事件的交换或并行协调期间存储用于共享的数据的实现 -> 消息事件
Sequencer
RingBuffer
中 生产者的顶级父接口,其直接实现有SingleProducerSequencer
和MultiProducerSequencer
;对应 SINGLE
、MULTI
两个枚举值。
EventHandler
事件处置器,改接口用于对外扩展来实现具体的消费逻辑。如上面 demo
中的 LongEventHandler
;
1 | //回调接口,用于处理{@link RingBuffer}中可用的事件 |
event
:RingBuffer
已经发布的事件sequence
: 正在处理的事件 的序列号endOfBatch
: 用来标识否是来自RingBuffer
的批次中的最后一个事件
SequenceBarrier
消费者路障。规定了消费者如何向下走。事实上,该路障算是变向的锁。
1 | final class ProcessingSequenceBarrier implements SequenceBarrier { |
waitStrategy
决定了消费者采用何种等待策略。
WaitStrategy
Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.
EventProcessor
的等待策略;具体实现在 disruptor
中有8种,
这些等待策略不同的核心体现是在如何实现 waitFor
这个方法上。
EventProcessor
事件处理器,实际上可以理解为消费者模型的框架,实现了线程Runnable
的run
方法,将循环判断等操作封在了里面。该接口有三个实现类:
1、BatchEventProcessor
1 | public final class BatchEventProcessor<T> implements EventProcessor { |
- ExceptionHandler:异常处理器
- DataProvider:数据来源,对应
RingBuffer
- EventHandler:处理
Event
的回调对象 - SequenceBarrier:对应的序号屏障
- TimeoutHandler:超时处理器,默认情况为空,如果要设置,只需要要将关联的
EventHandler
实现TimeOutHandler
即可。
如果我们选择使用 EventHandler
的时候,默认使用的就是 BatchEventProcessor
,它与EventHandler
是一一对应,并且是单线程执行。
如果某个RingBuffer
有多个BatchEventProcessor
,那么就会每个BatchEventProcessor
对应一个线程。
2、WorkProcessor
1 | public final class WorkProcessor<T> implements EventProcessor { |
基本和 BatchEventProcessor
类似,不同在于,用于处理Event
的回调对象是WorkHandler
。
原理图
无消费者情况下,生产者保持生产,但是 remainingCapacity
保持不变
在写demo
的过程中,本来想通过不设定 消费者 来观察 RingBuffer
可用容量变化的。但是验证过程中,一直得不到预期的结果,(注:没有设置消费者,只有生产者),先看结果:
1 | publish event :0 |
从结果来看,remainingCapacity
的值应该随着 发布的数量 递减的;但是实际上它并没有发生任何变化。
来看下ringBuffer.remainingCapacity()
这个方法:
1 | /** |
这里面又使用 sequencer.remainingCapacity()
这个方法来计算的。上面的例子中使用的是ProducerType.SINGLE
,那来看SingleProducerSequencer
这个里面remainingCapacity
的实现。
1 |
|
来解释下这段代码的含义:
假设当前 ringBuffer
的 bufferSize
是 8 ;上次申请到的序列号是 5,其实也就是说已经生产过占用的序列号是5;假设当前已经消费到的序列号是 3,那么剩余的容量为: 8-(5-2) = 5;
因为这里我们可以确定 bufferSize
和 produced
的值了,那么 remainingCapacity
的结果就取决于getMinimumSequence
的计算结果了。
1 | public static long getMinimumSequence(final Sequence[] sequences, long minimum) |
这个方法是从 Sequence
数组中获取最小序列 。如果sequences
为空,则返回 minimum
。回到上一步,看下sequences
这个数组是从哪里过来的,它的值在哪里设置的。
1 | long consumed = Util.getMinimumSequence(gatingSequences, nextValue); |
gatingSequences
是 SingleProducerSequencer
父类 AbstractSequencer
中的成员变量:
1 | protected volatile Sequence[] gatingSequences = new Sequence[0]; |
gatingSequences
是在下面这个方法里面来管理的。
1 | /** |
这个方法的调用栈向前追溯有这几个地方调用了:
WorkerPool
来管理多个消费者;hangdlerEventsWith
这个方法也是用来设置消费者的。但是在上面的测试案例中我们是想通过不设定消费者 只设定生成者 来观察 环形队列的占用情况,所以gatingSequences
会一直是空的,因此在计算时会把 produced
的值作为 minimum
返回。这样每次计算就相当于:
1 | return getBufferSize() - (produced - produced) === getBufferSize(); |
也就验证了为何在不设定消费者的情况下,remainingCapacity
的值会一直保持不变。
SOFATracer 中 Disruptor 实践
SOFATracer
中,AsyncCommonDigestAppenderManager
对 disruptor
进行了封装,用于处理外部组件的Tracer
摘要日志。该部分借助 AsyncCommonDigestAppenderManager
的源码来分析下SOFATracer
如何使用disruptor
的。
SOFATracer
中使用了两种不同的事件模型,一种是SOFATracer
内部使用的 StringEvent
, 一种是 外部扩展使用的 SofaTacerSpanEvent
。这里以 SofaTacerSpanEvent
这种事件模型来分析。StringEvent
消息事件模型对应的是 AsyncCommonAppenderManager
类封装的disruptor
。
SofaTracerSpanEvent ( -> LongEvent)
定义消息事件模型,SofaTacerSpanEvent
和 前面 demo
中的 LongEvent
基本结构是一样的,主要是内部持有的消息数据不同,LongEvent
中是一个long
类型的数据,SofaTacerSpanEvent
中持有的是 SofaTracerSpan
。
1 | public class SofaTracerSpanEvent { |
Consumer ( -> LongEventHandler)
Consumer
是 AsyncCommonDigestAppenderManager
的内部类;实现了 EventHandler
接口,这个consumer
就是作为消费者存在的。
在AsyncCommonAppenderManager
中也有一个,这个地方个人觉得可以抽出去,这样可以使得AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager
的代码看起来更干净;
1 | private class Consumer implements EventHandler<SofaTracerSpanEvent> { |
SofaTracerSpanEventFactory (-> LongEventFactory)
用于产生消息事件的 Factory
1 | public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> { |
ConsumerThreadFactory (-> LongEventThreadFactory )
用来产生消费线程的 Factory
。
1 | public class ConsumerThreadFactory implements ThreadFactory { |
构建disruptor
disruptor
的构建是在 AsyncCommonDigestAppenderManager
的构造函数中完成的。
1 | public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) { |
启动 disruptor
disruptor
的启动委托给了 AsyncCommonDigestAppenderManager
的start
方法来执行。
1 | public void start(final String workerName) { |
来看下,SOFATracer
中 具体是在哪里调用这个start
的:
CommonTracerManager
: 这个里面持有了AsyncCommonDigestAppenderManager
类的一个单例对象,并且是static
静态代码块中调用了start
方法;这个用来输出普通日志。SofaTracerDigestReporterAsyncManager
:这里类里面也是持有了AsyncCommonDigestAppenderManager
类的一个单例对像,并且提供了getSofaTracerDigestReporterAsyncManager
方法来获取该单例,在这个方法中调用了start
方法;该对象用来输出摘要日志。
发布事件
前面的demo
中是通过一个for
循环来发布事件的,在 SOFATracer
中 的事件发布无非就是当有Tracer
日志需要输出时会触发发布,那么对应的就是日志的 append
操作,将日志 append
到环形缓冲区。
1 | public boolean append(SofaTracerSpan sofaTracerSpan) { |
SOFATracer 事件发布的调用逻辑:
追溯调用的流程,可以知道当前 span
调用 finish
时或者 SOFATracer
中调用reportSpan
时 就相当于发布了一个消息事件。
小结
本文对 SOFATracer
中使用 Disruptor
来进行日志输出的代码进行了简单的分析,更多内部细节原理可以自行看下SOFATracer
的代码。SOFATracer
作为一种比较底层的中间件组件,在实际的业务开发中基本是无法感知的。但是作为技术来学习,还是有很多点可以挖一挖。
SOFATracer 中 Disruptor 实践
http://www.glmapper.com/2018/11/10/sofa/sofa-tracer-disruptor/