分布式链路追踪 SkyWalking 源码分析 —— 应用于应用实例的注册
本文主要基于 SkyWalking 3.2.6 正式版
1. 概述
2. Collector 注册相关 API
2.1 应用的注册 API
2.2 应用实例的正常注册 API
2.3 应用实例的恢复注册 API
2.4 应用实例的心跳 API
3. Agent 调用注册 API
1. 概述
本文主要分享 应用与应用实例的注册。先来简单了解下注册的整体流程:
应用启动,Agent 向 Collector 注册应用。
注册应用成功后,Agent 向 Collector 注册应用实例。
下面,我们分成两个小节,分别从 API 的实现与调用,分享代码的具体实现。
友情提示:推荐阅读 《探针与Collector间通讯协议》 。
2. Collector 注册相关 API
Collector 注册相关 API 相关有四个接口:
2.1 应用的注册 API
2.2 应用实例的正常注册 API
2.3 应用实例的恢复注册 API
2.4 应用实例的心跳 API
API 处理的流程大体如下:
绿框部分,【2.3】【2.4】两个 API ,直接 Service 调用 DAO 方法,无需经过 Graph / Stream 相关方法。
2.1 应用的注册 API
我们先来看看 API 的定义,ApplicationRegisterService.proto
,如下图所示:
其中,KeyWithIntegerValue 数据类型,在
KeyWithIntegerValue.proto
中定义。
2.1.1 ApplicationRegisterServiceHandler#register(...)
ApplicationRegisterServiceHandler#register(Application, StreamObserver<ApplicationMapping>)
, 代码如下:
第 51 行:获得请求的应用编码(
applicationCode
)数组。第 54 至 64 行:循环应用编码数组,获取或创建应用。
第 57 行:调用
IApplicationIDService#getOrCreate(applicationCode)
方法,获取或创建应用,最终获得应用编号(applicationId
)。第 60 至 63 行:获得到应用编号(
applicationId != 0
),则添加到响应。为什么会存在获得不到的情况呢?在下文中,我们会看到,实际异步保存应用,所以会存在获取失败的情况。当获取失败,调用方( 例如 Agent )可以重新发起该请求进行注册应用,从而在异步保存应用,获取到应用编号。第 67 至 68 行:响应。
2.1.2 IApplicationIDService#getOrCreate(...)
org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService
,继承 Service 接口,应用编号服务接口。
定义了
#getOrCreate(applicationCode)
接口方法,根据应用编码获取或创建应用,并获得应用编号。
org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService
,实现 IApplicationIDService 接口,应用编号服务实现类。
实现了
#getOrCreate(applicationCode)
方法,代码如下:第 66 行:调用
ApplicationCacheService#get(applicationCode)
方法,从缓存中获取应用编号。在 《SkyWalking 源码分析 —— Collector Cache 缓存组件》 有详细解析。第 69 至 76 行:当获取不到应用编号时,获取 Application 对应的
Graph<Application>
对象,调用Graph#start(application)
方法,进行流式处理。在这过程中,会保存应用到存储器。第 77 行:返回应用编号。
2.1.3 Graph#start(application)
在 #createApplicationRegisterGraph()
方法中,我们可以看到 Application 对应的 Graph<Application>
对象的创建。
org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterRemoteWorker
,继承 AbstractRemoteWorker 抽象类,应用注册远程 Worker 。Factory 内部类,实现 AbstractRemoteWorkerProvider 抽象类,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.2.3 AbstractRemoteWorker」 有详细解析。
AbstractRemoteWorker ,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.2.3 AbstractRemoteWorker」 有详细解析。
#id()
实现方法,返回 10006 。#selector
实现方法,返回Selector.ForeverFirst
。在 《SkyWalking 源码分析 —— Collector Remote 远程通信服务》 有详细解析。#onWork(Application)
实现方法,调用Next#execute(message)
方法,提交数据给下面的节点,继续流式处理。总结:ApplicationRegisterRemoteWorker ,使用 Collector 集群的第一个节点( 按照
ip
排序 ) 进行后续的流式处理,即,保存应用。
org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker
,继承 AbstractLocalAsyncWorker 抽象类,异步保存应用 Worker 。第 58 行:调用
ApplicationCacheService#get(applicationCode)
方法,从缓存中获取应用编号。第 60 行:当获取不到应用编号时,使用
applicationCode
创建应用并保存。第 62 行:调用
ApplicationH2RegisterDAO#getMinApplicationId()
方法,获得 Application 记录的应用编号的最小值。--------- 分隔 ---------
第 63 行:当
min == 0
时,说明没有 Application 记录。第 64 至 68 行:创建第一条、特殊的 Application 记录。该记录
applicationId = 1
,applicationCode = User
,用于表示用户发起请求。在 SkyWaling UI 中,我们可以看到该条 Application 记录如下图:第 70 至 74 行:创建当前请求的对应的 Application 记录,并且
applicationId = -1
。--------- 分隔 ---------
第 76 行:调用
ApplicationH2RegisterDAO#getMaxApplicationId()
方法,获得 Application 记录的应用编号的最大值。第 77 行:调用
IdAutoIncrement#increment(min, max)
方法,获得应用编号。该方法较为有趣,在下文详细解析。第 79 至 82 行:创建当前请求的对应的 Application 记录。
--------- 分隔 ---------
第 85 行:调用
ApplicationEsRegisterDAO#save(Application)
方法,保存应用。Factory 内部类,实现 AbstractLocalAsyncWorkerProvider 抽象类,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.2.2 AbstractLocalAsyncWorker」 有详细解析。
AbstractLocalAsyncWorker ,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.2.2 AbstractLocalAsyncWorker」 有详细解析。
#id()
实现方法,返回 101 。#onWork(Application)
实现方法,保存应用( Application )。代码如下( 以 ES 作为 DAO 实现为例子 ):
IdAutoIncrement#increment(min, max)
方法,双向均匀自增。可能看起来比较奇怪,以上文 Application 的调用举例子:
min | max | result | applicationCode |
---|---|---|---|
0 | / | 1 | User |
0 | / | -1 | 应用 A |
-1 | 1 | 2 | 应用 B |
-1 | 2 | -2 | 应用 C |
-2 | 2 | 3 | 应用 D |
【User】和【应用 A】是直接获得
result
,不调用#increment(min, max)
方法。总的来说,我们可以看到,以
min + max = 0
为中心点( 实际以0
为中心点), 双向均匀自增。
TODO 【4007】
2.1.4 Application
org.skywalking.apm.collector.storage.table.register.Application
,应用。例如记录在 ES 如下图:
2.2 应用实例的正常注册 API
我们先来看看 API 的定义,InstanceDiscoveryService
,如下图所示:
整体代码和 「2.1 应用的注册 API」 非常相似,所以本小节,更多的是提供代码的链接地址。
2.2.1 InstanceDiscoveryServiceHandler#register(...)
InstanceDiscoveryServiceHandler#register(ApplicationInstance, StreamObserver<ApplicationInstanceMapping>)
,注册应用实例。
2.2.2 IInstanceIDService#getOrCreate(...)
org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService
,继承 Service 接口,应用实例编号服务接口。
定义了
#getOrCreate(applicationCode)
接口方法,根据应用编号 + AgentUUID,获取或创建应用实例,并获得应用编号。
org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService
,实现 IInstanceIDService 接口,应用编号服务实现类。
实现了
#getOrCreate(applicationCode)
方法。
2.2.3 Graph#start(instance)
在 #createInstanceRegisterGraph()
方法中,我们可以看到 Instance 对应的 Graph<Instance>
对象的创建。
org.skywalking.apm.collector.agent.stream.worker.register.InstanceRegisterRemoteWorker
,继承 AbstractRemoteWorker 抽象类,应用实例注册远程 Worker 。org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker
,继承 AbstractLocalAsyncWorker 抽象类,异步保存应用 Worker 。不同 Application ,Instance 的应用实例编号,从
"1"
正向递增。InstanceEsRegisterDAO#save(Instance)
2.2.4 Instance
org.skywalking.apm.collector.storage.table.register.Instance
,应用实例。例如记录在 ES 如下图:
2.3 应用实例的恢复注册 API
我们先来看看 API 的定义,InstanceDiscoveryService.proto
,如下图所示:
其中,Downstream 数据类型,在
Downstream.proto
中定义。
一般情况下,Agent 在注册应用时候成功后,如果因为各种原因原因和 Collector 断开了 gRPC Channel 连接( 例如,网络 ),恢复连接后,需要调用该 API ,进行恢复注册。
2.3.1 InstanceDiscoveryServiceHandler#recover(...)
InstanceDiscoveryServiceHandler#recover(ApplicationInstanceRecover, StreamObserver<Downstream>)
, 代码如下:
第 71 行:调用
TimeBucketUtils#getSecondTimeBucket(time)
方法,将registerTime
转成 timeBucket 。第 73 行:调用
IInstanceIDService#recover(instanceId, applicationId, registerTime, osInfo)
方法,恢复注册应用实例。第 75 至 76 行:响应。
2.3.2 IInstanceIDService#recover(...)
InstanceIDService#recover(instanceId, applicationId, registerTime, osInfo)
实现方法,恢复注册。代码如下:
第 96 至 103 行:创建 Instance 对象,用于下面更新操作。
第 99 行:TODO 【4008】
第 106 行:调用 InstanceEsRegisterDAO#save(Instance) 方法,更新应用实例。
2.4 应用实例的心跳 API
我们先来看看 API 的定义,InstanceDiscoveryService.proto
,如下图所示:
其中,Downstream 数据类型,在
Downstream.proto
中定义。
一般情况下,Agent 在注册应用时候成功后,定时向 Collector 发送心跳,记录应用存活。
2.4.1 InstanceDiscoveryServiceHandler#heartbeat(...)
InstanceDiscoveryServiceHandler#heartbeat(ApplicationInstanceHeartbeat, StreamObserver<org.skywalking.apm.network.proto.Downstream>) ,目前该方法暂未实现。实现后,会首先调用一个 Service 方法,而后调用 InstanceEsRegisterDAO#updateHeartbeatTime(instanceId, heartbeatTime)
方法,记录应用实例的心跳时间。
3. Agent 调用注册 API
org.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient
,实现 BootService 、GRPCChannelListener 、Runnable 、TracingContextListener 接口,注册应用与实例的客户端。该客户端会调用上述所有 API 。
PROCESS_UUID
静态属性,Agent UUID ,使用 UUID 算法生成,去除多余"-"
。---------- 分割线 ----------
status
属性,gRPC 连接状态。applicationRegisterServiceBlockingStub
/instanceDiscoveryServiceBlockingStub
/serviceNameDiscoveryServiceBlockingStub
属性,对应 gRPC 提供 API 的阻塞 Stub 。needRegisterRecover
属性,是否需要发起恢复的注册。如上五个属性,在
#statusChanged(GRPCChannelStatus)
实现方法,根据 gRPC 连接状态的变更,创建或销毁 Stub 。在
#boot()
实现方法,将自己作为监听器( 因为实现了 GRPCChannelListener 接口 )添加到 GRPCChannelManager 中,从而监听 gRPC Channel 的状态。在 《SkyWalking 源码分析 —— Agent Remote 远程通信服务》 有详细解析。---------- 分割线 ----------
applicationRegisterFuture
属性,注册应用与实例的定时任务。#boot()
实现方法,创建applicationRegisterFuture
。该定时任务无初始化延迟,每Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL
( 默认:3 s ) 执行一次#run()
方法。---------- 分割线 ----------
lastSegmentTime
属性,最后记录 Segment 的时间。#afterFinished()
实现方法,记录 Segment 最后的时间。#afterBoot()
实现方法,将自己作为监听器( 因为实现了 TracingContextListener 接口 )添加到 GRPCChannelManager 中,从而监听 Segment 的记录。在 《SkyWalking 源码分析 —— Agent 收集 Trace 数据》 有详细解析。
#run()
实现方法,执行应用的注册,应用实例的正常注册、恢复注册、心跳的逻辑。
第 123 行:循环,当 gRPC 处于连接中,并且需要重试(
shouldTry
)。可能对shouldTry
会比较疑惑?该变量用于,应用的注册成功后,重新标记shouldTry = true
,继续执行应用实例的注册。第 126 至 135 行:当本地应用编号为空时,说明应用暂未注册,调用 「2.1 应用的注册 API」 。
第 138 至 148 行:当本地应用实例编号为空时,说明应用实例暂未注册,调用 「2.2 应用实例的正常注册 API」 。
第 150 至 158 行:当需要发起恢复注册时,即 gRPC Channel 断开后重连成功,调用 「2.3 应用实例的恢复注册 API」 。
第 159 至 167 行:当现在时间超过
lastSegmentTime
一分钟,调用 「2.4 应用实例的心跳 API」 。第 170 至 173 行:同步应用字典、操作字典。在 《SkyWalking 源码分析 —— Agent DictionaryManager 字典管理》 详细解析。
第 178 至 180 行:当发生异常时,调用
GRPCChannelManager#reportError(t)
方法,处理异常,例如请求超时。
更多相关文章
- DoDAF2.0方法论探究
- http协议请求方法都有哪些?网络安全学习提升
- 【前端词典】8 个提高 JS 性能的方法
- AngularJS 日期时间选择组件(附详细使用方法)
- 5 种方法教你用Python玩转histogram直方图
- IDEA Debug 无法进入断点的解决方法
- libp2p-rs kad 使用及调试方法
- 只会爬虫不会反爬虫?动图详解利用 User-Agent 进行反爬虫的原理和
- 如何不再当分母?我告诉你一个方法