本文主要基于 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. TraceSegmentServiceClient
    • 2.1 实现 BootService 接口
    • 2.2 实现 GRPCChannelListener 接口
    • 2.3 实现 TracingContextListener 接口
    • 2.4 实现 IConsumer 接口
  • 666. 彩蛋

1. 概述

分布式链路追踪系统,链路的追踪大体流程如下:

  1. Agent 收集 Trace 数据。
  2. Agent 发送 Trace 数据给 Collector 。
  3. Collector 接收 Trace 数据。
  4. Collector 存储 Trace 数据到存储器,例如,数据库。

本文主要分享【第二部分】 SkyWalking Agent 发送 Trace 数据

考虑到减少外部组件的依赖,Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列后台线程异步】发送给 Collector 。

本文涉及的类非常少,如下图所示:

2. TraceSegmentServiceClient

org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 发送服务客户端。它是一个服务,也是一个客户端,负责将 TraceSegment 异步发送到 Collector 。

我们先来看看 TraceSegmentServiceClient 的属性

  • TIMEOUT 静态属性,发送等待超时时长,单位:毫秒。
  • lastLogTime 属性,最后打印日志时间。该属性主要用于开发调试。
  • segmentUplinkedCounter 属性,TraceSegment 发送数量。
  • segmentAbandonedCounter 属性,TraceSegment 被丢弃数量。在 Agent 未连接上 Collector 时,产生的 TraceSegment 将被丢弃。
  • carrier 属性,内存队列。在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 有对 DataCarrier 的详细解析。
  • serviceStub 属性,非阻塞 Stub 。
  • status 属性,连接状态。

下面,我们来介绍 TraceSegmentServiceClient 实现的接口以及对应的方法。

2.1 实现 BootService 接口

#beforeBoot() 方法,代码如下:

  • 第 86 行:调用 GRPCChannelManager#addChannelListener(this) 方法,将自己添加到 GRPCChannelManager 中,作为一个监听器,从而调用 #statusChanged(GRPCChannelStatus) 方法,实现对连接状态status )的监听处理。

#boot() 方法,代码如下:

  • 第 95 至 97 行:创建 DataCarrier 对象,作为内存队列,并设置自己作为消费者,从而调用 #consume(List<TraceSegment> ) 方法,实现异步发送 TraceSegment 到 Collector 。

#afterBoot() 方法,代码如下:

  • 第 102 行:调用TracingContext.ListenerManager#add(this) 方法,将自己添加到 ListenerManager 中,作为一个监听器,从而调用 #afterFinished(TraceSegment) 方法,实现收集到新的 TraceSegment ,添加到内存队列。

#shutdown() 方法,代码如下:

  • 第 107 行:调用 DataCarrier#shutdownConsumers() 方法,停止消费。

2.2 实现 GRPCChannelListener 接口

#statusChanged(GRPCChannelStatus) 方法,代码如下:

  • 第 211 至 214 行:连接成功,创建 Stub 对象。
  • 第 215 行:记录连接状态。

2.3 实现 TracingContextListener 接口

#afterFinished(TraceSegment) 方法,代码如下:

  • 第 197 至 199 行:当 TraceSegment.ignore = true 时,忽略该 TraceSegment 。
  • 第 201 行:提交 TraceSegment 到内存队列。

2.4 实现 IConsumer 接口

#consume(List<TraceSegment>) 方法,代码如下:

  • ------ 连接中 ------
  • 第 119 行:创建 org.skywalking.apm.agent.core.remote。GRPCStreamServiceStatus 对象。
  • 第 122 至 141 行:创建 StreamObserver 对象。在下面,我们可以看到 Agent 发送 TraceSegment 给 Collector 是非阻塞的方式,通过该对象,观察执行结果。
    • 第 130 行 || 第 139 行:当发生错误或者完成时,调用 GRPCStreamServiceStatus#finished() 方法,标记完成。为什么呢?下面会看到。
    • 第 134 行:调用 GRPCChannelManager#reportError(Throwable) 方法,汇报错误。如果是连接错误,GRPCChannelManager 会负责断开重连。
  • 第 144 至 151 行:逐条非阻塞发送 TraceSegment 请求。
    • 注意,此处若等待完成超时,TraceSegment 依然在发送,或者被 Collector 处理中,直到最终的成功或失败。
    • DistributedTraceId#toUniqueId()
    • ID#transform()
    • AbstractTracingSpan#transform()
    • ExitSpan#transform()
    • LogDataEntity#transform()
    • TraceSegmentRef#transform()
    • KeyValuePair#transform()
    • 第 146 行:调用 TraceSegment#transform() 方法,将 TraceSegment 转换成 org.skywalking.apm.network.proto.UpstreamSegment 对象,用于 gRPC 传输,参见 TraceSegmentService.proto 的数据结构定义。
    • 第 154 行:调用 StreamObserver#onCompleted() 方法,标记全部请求发送完成。
    • 第 157 至 159 行:调用 GRPCStreamServiceStatus#wait4Finish(maxTimeout) 方法,等待 Collector 处理完成。这就是为什么上面需要调用 GRPCStreamServiceStatus#finished() 方法。完成后,记录数量到 segmentUplinkedCounter
  • ------ 未连接 ------
  • 第 161 行:记录数量到 segmentAbandonedCounter 。
  • ------ ALL ------
  • 调用 #printUplinkStatus() 方法,每三十秒,打印一次 segmentUplinkedCounter 和 segmentAbandonedCounter 数据。主要用于开发调试。另外,该方法会重置 segmentUplinkedCounter 和 segmentAbandonedCounter 计数。

ps:目前 DataCarrier 最长每 20 秒消费一次。

#onError(List<TraceSegment>, Throwable) 方法,当消费发生异常时,打印日志。


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 芋道 Spring Boot JPA 入门(二)之基于方法名查询
  2. DoDAF2.0方法论探究
  3. http协议请求方法都有哪些?网络安全学习提升
  4. 【前端词典】8 个提高 JS 性能的方法
  5. AngularJS 日期时间选择组件(附详细使用方法)
  6. 5 种方法教你用Python玩转histogram直方图
  7. IDEA Debug 无法进入断点的解决方法
  8. libp2p-rs kad 使用及调试方法
  9. 只会爬虫不会反爬虫?动图详解利用 User-Agent 进行反爬虫的原理和

随机推荐

  1. Android 蓝牙状态机以及蓝牙启动状态机
  2. Android事件分发机制
  3. Tiny210(Android)串口收发测试通过
  4. android 流量统计实现思路
  5. TextView中ellipsize属性焦点异常处理
  6. 如何为香蕉派 banana pi BPI-M2编译Andro
  7. android菜单Tips
  8. Android 编程设置 APN
  9. [android]Activity切换动画
  10. Android(安卓)EGL_BAD_CONFIG error,配置E