Apollo 源码解析 —— Config Service 记录 Instance
摘要: 原创出处 http://www.iocoder.cn/Apollo/config-service-audit-instance/ 「芋道源码」欢迎转载,保留摘要,谢谢!
- 1. 概述
- 2. 实体
- 3. InstanceConfigAuditUtil
- 666. 彩蛋
1. 概述
老艿艿:本系列假定胖友已经阅读过 《Apollo 官方 wiki 文档》 。
在 Portal 的应用详情页,我们可以看到每个 Namespace 下的实例列表。如下图所示:
- 实例( Instance ),实际就是 Apollo 的客户端。
本文分享实例相关的实体和如何存储的。
2. 实体
2.1 Instance
com.ctrip.framework.apollo.biz.entity.Instance
,Instance 实体。代码如下:
@Entity
@Table(name = "Instance")
public class Instance {
/**
* 编号
*/
@Id
@GeneratedValue
@Column(name = "Id")
private long id;
/**
* App 编号
*/
@Column(name = "AppId", nullable = false)
private String appId;
/**
* Cluster 名字
*/
@Column(name = "ClusterName", nullable = false)
private String clusterName;
/**
* 数据中心的 Cluster 名字
*/
@Column(name = "DataCenter", nullable = false)
private String dataCenter;
/**
* 客户端 IP
*/
@Column(name = "Ip", nullable = false)
private String ip;
/**
* 数据创建时间
*/
@Column(name = "DataChange_CreatedTime", nullable = false)
private Date dataChangeCreatedTime;
/**
* 数据最后更新时间
*/
@Column(name = "DataChange_LastTime")
private Date dataChangeLastModifiedTime;
@PrePersist
protected void prePersist() {
if (this.dataChangeCreatedTime == null) {
dataChangeCreatedTime = new Date();
}
if (this.dataChangeLastModifiedTime == null) {
dataChangeLastModifiedTime = dataChangeCreatedTime;
}
}
}
id
字段,编号,自增。appId
+clusterName
+dataCenter
+ip
组成唯一索引,通过这四个字段唯一一个实例( 客户端 )。
2.2 InstanceConfig
com.ctrip.framework.apollo.biz.entity.InstanceConfig
,Instance Config 实体,记录 Instance 对 Namespace 的配置的获取情况。如果一个 Instance 使用了多个 Namespace ,则会记录多条InstanceConfig 。
代码如下:
@Entity
@Table(name = "InstanceConfig")
public class InstanceConfig {
/**
* 编号
*/
@Id
@GeneratedValue
@Column(name = "Id")
private long id;
/**
* Instance 编号,指向 {@link Instance#id}
*/
@Column(name = "InstanceId")
private long instanceId;
/**
* App 编号
*/
@Column(name = "ConfigAppId", nullable = false)
private String configAppId;
/**
* Cluster 名字
*/
@Column(name = "ConfigClusterName", nullable = false)
private String configClusterName;
/**
* Namespace 名字
*/
@Column(name = "ConfigNamespaceName", nullable = false)
private String configNamespaceName;
/**
* Release Key ,对应 {@link Release#releaseKey}
*/
@Column(name = "ReleaseKey", nullable = false)
private String releaseKey;
/**
* 配置下发时间
*/
@Column(name = "ReleaseDeliveryTime", nullable = false)
private Date releaseDeliveryTime;
/**
* 数据创建时间
*/
@Column(name = "DataChange_CreatedTime", nullable = false)
private Date dataChangeCreatedTime;
/**
* 数据最后更新时间
*/
@Column(name = "DataChange_LastTime")
private Date dataChangeLastModifiedTime;
@PrePersist
protected void prePersist() {
if (this.dataChangeCreatedTime == null) {
dataChangeCreatedTime = new Date();
}
if (this.dataChangeLastModifiedTime == null) {
dataChangeLastModifiedTime = dataChangeCreatedTime;
}
}
}
id
字段,编号,自增。instanceId
+configAppId
+ConfigNamespaceName
组成唯一索引,因为一个 Instance 可以使用多个 Namespace 。releaseKey
字段,Release Key ,对应Release.releaseKey
字段。releaseDeliveryTime
字段,配置下发时间。- 通过
releaseKey
+releaseDeliveryTime
字段,可以很容易判断 Instance 在当前 Namespace 获取配置的情况。 configClusterName
字段,Cluster 名字。
3. InstanceConfigAuditUtil
在 《Apollo 源码解析 —— Config Service 配置读取接口》 中,我们看到,客户端读取配置时,会调用 Config Service 的 GET /configs/{appId}/{clusterName}/{namespace:.+}
接口。在接口中,会调用 InstanceConfigAuditUtil#audit(...)
的方法,代码如下:
private void auditReleases(String appId, String cluster, String dataCenter, String clientIp,
List<Release> releases) {
if (Strings.isNullOrEmpty(clientIp)) {
//no need to audit instance config when there is no ip
return;
}
// 循环 Release 数组
for (Release release : releases) {
// 记录 InstanceConfig
instanceConfigAuditUtil.audit(appId, cluster, dataCenter, clientIp, release.getAppId(),
release.getClusterName(),
release.getNamespaceName(), release.getReleaseKey());
}
}
下面我们来看看 InstanceConfigAuditUtil 的具体实现。
com.ctrip.framework.apollo.configservice.util.InstanceConfigAuditUtil
,实现 InitializingBean 接口,InstanceConfig 审计工具类。
3.1 构造方法
/**
* {@link #audits} 大小
*/
private static final int INSTANCE_CONFIG_AUDIT_MAX_SIZE = 10000;
/**
* {@link #instanceCache} 大小
*/
private static final int INSTANCE_CACHE_MAX_SIZE = 50000;
/**
* {@link #instanceConfigReleaseKeyCache} 大小
*/
private static final int INSTANCE_CONFIG_CACHE_MAX_SIZE = 50000;
private static final long OFFER_TIME_LAST_MODIFIED_TIME_THRESHOLD_IN_MILLI = TimeUnit.MINUTES.toMillis(10);//10 minutes
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
/**
* ExecutorService 对象。队列大小为 1 。
*/
private final ExecutorService auditExecutorService;
/**
* 是否停止
*/
private final AtomicBoolean auditStopped;
/**
* 队列
*/
private BlockingQueue<InstanceConfigAuditModel> audits = Queues.newLinkedBlockingQueue(INSTANCE_CONFIG_AUDIT_MAX_SIZE);
/**
* Instance 的编号的缓存
*
* KEY:{@link #assembleInstanceKey(String, String, String, String)}
* VALUE:{@link Instance#id}
*/
private Cache<String, Long> instanceCache;
/**
* InstanceConfig 的 ReleaseKey 的缓存
*
* KEY:{@link #assembleInstanceConfigKey(long, String, String)}
* VALUE:{@link InstanceConfig#id}
*/
private Cache<String, String> instanceConfigReleaseKeyCache;
@Autowired
private InstanceService instanceService;
public InstanceConfigAuditUtil() {
auditExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("InstanceConfigAuditUtil", true));
auditStopped = new AtomicBoolean(false);
instanceCache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).maximumSize(INSTANCE_CACHE_MAX_SIZE).build();
instanceConfigReleaseKeyCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS).maximumSize(INSTANCE_CONFIG_CACHE_MAX_SIZE).build();
}
- 基础属性
- KEY ,使用
instanceId
+configAppId
+ConfigNamespaceName
,恰好是 InstanceConfig 的唯一索引的字段。 - VALUE ,使用
releaseKey
。 - KEY ,使用
appId
+clusterName
+dataCenter
+ip
,恰好是 Instance 的唯一索引的字段。 - VALUE ,使用
id
。 instanceCache
属性,Instance 的编号的缓存。其中:instanceConfigReleaseKeyCache
属性,InstanceConfig 的ReleaseKey 的缓存。其中:
- KEY ,使用
- 线程相关
- InstanceConfigAuditUtil 记录 Instance 和 InstanceConfig 是提交到队列,使用线程池异步处理。
auditExecutorService
属性,ExecutorService 对象。队列大小为 1 。auditStopped
属性,是否停止。audits
属性,队列。
3.2 初始化任务
#afterPropertiesSet()
方法,通过 Spring 调用,初始化任务。代码如下:
1: @Override
2: public void afterPropertiesSet() {
3: // 提交任务
4: auditExecutorService.submit(() -> {
5: // 循环,直到停止或线程打断
6: while (!auditStopped.get() && !Thread.currentThread().isInterrupted()) {
7: try {
8: // 获得队首 InstanceConfigAuditModel 元素,非阻塞
9: InstanceConfigAuditModel model = audits.poll();
10: // 若获取不到,sleep 等待 1 秒
11: if (model == null) {
12: TimeUnit.SECONDS.sleep(1);
13: continue;
14: }
15: // 若获取到,记录 Instance 和 InstanceConfig
16: doAudit(model);
17: } catch (Throwable ex) {
18: Tracer.logError(ex);
19: }
20: }
21: });
22: }
- 第 4 至 21 行:提交任务到
auditExecutorService
中。- 第 6 至 20 行:循环,直到停止或线程打断。
- 第 9 行:调用
BlockingQueue#poll()
方法,获得队首InstanceConfigAuditModel 元素,非阻塞。 - 第 10 至 14 行:若获取不到,sleep 等待 1 秒。
- 第 16 行:若获取到,调用
#doAudit(InstanceConfigAuditModel)
方法,记录 Instance 和 InstanceConfig 。详细解析,见 「3.4 doAudit」 。
3.3 audit
#audit(...)
方法,添加到队列中。代码如下:
public boolean audit(String appId, String clusterName, String dataCenter, String
ip, String configAppId, String configClusterName, String configNamespace, String releaseKey) {
return this.audits.offer(new InstanceConfigAuditModel(appId, clusterName, dataCenter, ip,
configAppId, configClusterName, configNamespace, releaseKey));
}
创建 InstanceConfigAuditModel 对象,代码如下:
public static class InstanceConfigAuditModel {
private String appId;
private String clusterName;
private String dataCenter;
private String ip;
private String configAppId;
private String configClusterName;
private String configNamespace;
private String releaseKey;
/**
* 入队时间
*/
private Date offerTime;
public InstanceConfigAuditModel(String appId, String clusterName, String dataCenter, String
clientIp, String configAppId, String configClusterName, String configNamespace, String
releaseKey) {
this.offerTime = new Date(); // 当前时间
this.appId = appId;
this.clusterName = clusterName;
this.dataCenter = Strings.isNullOrEmpty(dataCenter) ? "" : dataCenter;
this.ip = clientIp;
this.configAppId = configAppId;
this.configClusterName = configClusterName;
this.configNamespace = configNamespace;
this.releaseKey = releaseKey;
}
}offerTime
属性,入队时间,取得当前时间,避免异步处理的时间差。
调用
BlockingQueue#offset(InstanceConfigAuditModel)
方法,添加到队列audits
中。
3.4 doAudit
#doAudit(InstanceConfigAuditModel)
方法,记录 Instance 和 InstanceConfig 。代码如下:
1: void doAudit(InstanceConfigAuditModel auditModel) {
2: // 拼接 instanceCache 的 KEY
3: String instanceCacheKey = assembleInstanceKey(auditModel.getAppId(), auditModel.getClusterName(),
4: auditModel.getIp(), auditModel.getDataCenter());
5: // 获得 Instance 编号
6: Long instanceId = instanceCache.getIfPresent(instanceCacheKey);
7: // 查询不到,从 DB 加载或者创建,并添加到缓存中。
8: if (instanceId == null) {
9: instanceId = prepareInstanceId(auditModel);
10: instanceCache.put(instanceCacheKey, instanceId);
11: }
12:
13: // 获得 instanceConfigReleaseKeyCache 的 KEY
14: // load instance config release key from cache, and check if release key is the same
15: String instanceConfigCacheKey = assembleInstanceConfigKey(instanceId, auditModel.getConfigAppId(),
16: auditModel.getConfigNamespace());
17: // 获得缓存的 cacheReleaseKey
18: String cacheReleaseKey = instanceConfigReleaseKeyCache.getIfPresent(instanceConfigCacheKey);
19: // 若相等,跳过
20: // if release key is the same, then skip audit
21: if (cacheReleaseKey != null && Objects.equals(cacheReleaseKey, auditModel.getReleaseKey())) {
22: return;
23: }
24: // 更新对应的 instanceConfigReleaseKeyCache 缓存
25: instanceConfigReleaseKeyCache.put(instanceConfigCacheKey, auditModel.getReleaseKey());
26: // 获得 InstanceConfig 对象
27: // if release key is not the same or cannot find in cache, then do audit
28: InstanceConfig instanceConfig = instanceService.findInstanceConfig(instanceId, auditModel.getConfigAppId(),
29: auditModel.getConfigNamespace());
30:
31: // 若 InstanceConfig 已经存在,进行更新
32: if (instanceConfig != null) {
33: // ReleaseKey 发生变化
34: if (!Objects.equals(instanceConfig.getReleaseKey(), auditModel.getReleaseKey())) {
35: instanceConfig.setConfigClusterName(auditModel.getConfigClusterName());
36: instanceConfig.setReleaseKey(auditModel.getReleaseKey());
37: instanceConfig.setReleaseDeliveryTime(auditModel.getOfferTime()); // 配置下发时间,使用入队时间
38: // 时间过近,例如 Client 先请求的 Config Service A 节点,再请求 Config Service B 节点的情况。
39: } else if (offerTimeAndLastModifiedTimeCloseEnough(auditModel.getOfferTime(), instanceConfig.getDataChangeLastModifiedTime())) {
40: //when releaseKey is the same, optimize to reduce writes if the record was updated not long ago
41: return;
42: }
43: // 更新
44: //we need to update no matter the release key is the same or not, to ensure the
45: //last modified time is updated each day
46: instanceConfig.setDataChangeLastModifiedTime(auditModel.getOfferTime());
47: instanceService.updateInstanceConfig(instanceConfig);
48: return;
49: }
50:
51: // 若 InstanceConfig 不存在,创建 InstanceConfig 对象
52: instanceConfig = new InstanceConfig();
53: instanceConfig.setInstanceId(instanceId);
54: instanceConfig.setConfigAppId(auditModel.getConfigAppId());
55: instanceConfig.setConfigClusterName(auditModel.getConfigClusterName());
56: instanceConfig.setConfigNamespaceName(auditModel.getConfigNamespace());
57: instanceConfig.setReleaseKey(auditModel.getReleaseKey());
58: instanceConfig.setReleaseDeliveryTime(auditModel.getOfferTime());
59: instanceConfig.setDataChangeCreatedTime(auditModel.getOfferTime());
60: // 保存 InstanceConfig 对象到数据库中
61: try {
62: instanceService.createInstanceConfig(instanceConfig);
63: } catch (DataIntegrityViolationException ex) {
64: // concurrent insertion, safe to ignore
65: }
66: }
============ Instance 相关 ============
第 2 至 4 行:拼接
instanceCache
的 KEY 。第 6 行:调用
Cache#getIfPresent(key)
从缓存instanceCache
中获得 Instance 编号。第 7 至 11 行:查询不到,从 DB 加载或者创建,并添加到缓存中。
#prepareInstanceId(InstanceConfigAuditModel)
方法,代码如下:private long prepareInstanceId(InstanceConfigAuditModel auditModel) {
// 查询 Instance 对象
Instance instance = instanceService.findInstance(auditModel.getAppId(), auditModel
.getClusterName(), auditModel.getDataCenter(), auditModel.getIp());
// 已存在,返回 Instance 编号
if (instance != null) {
return instance.getId();
}
// 若 Instance 不存在,创建 Instance 对象
instance = new Instance();
instance.setAppId(auditModel.getAppId());
instance.setClusterName(auditModel.getClusterName());
instance.setDataCenter(auditModel.getDataCenter());
instance.setIp(auditModel.getIp());
// 保存 Instance 对象到数据库中
try {
return instanceService.createInstance(instance).getId();
} catch (DataIntegrityViolationException ex) {
// 发生唯一索引冲突,意味着已经存在,进行查询 Instance 对象,并返回
// return the one exists
return instanceService.findInstance(instance.getAppId(), instance.getClusterName(),
instance.getDataCenter(), instance.getIp()).getId();
}
}- ©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任
更多相关文章
- 对比Excel学习SQL(4):计算字段
- mysql正则匹配模糊查询某个字段
- PHP二维数组按照指定的字段排序的函数
- php根据数组中某一字段来实现排序
- 初始化 Java 数据字段(学习 Java 编程语言 033)
- pgsql 修改字段类型为json
- 使用jQuery验证的MVC Razor View不验证空字段
- 使用jQuery在AJAX调用中的url字段格式
- 可编辑的表格:jQuery+PHP实现实时编辑表格字段内容
随机推荐
- ©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任