分布式作业 Elastic-Job-Lite 源码分析 —— 作业配置
1. 概述
本文主要分享 Elastic-Job-Lite 作业配置。
涉及到主要类的类图如下( 打开大图 ):
黄色的类在
elastic-job-common-core
项目里,为 Elastic-Job-Lite、Elastic-Job-Cloud 公用作业配置类。
另外建议你已经( 非必须 ):
阅读过《官方文档 —— 配置手册》
运行过 JavaMain.java
你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门
2. 作业配置
一个作业( ElasticJob )的调度,需要配置独有的一个作业调度器( JobScheduler ),两者是 1 : 1
的关系。这点大家要注意下,当然下文看代码也会看到。
作业调度器的创建可以配置四个参数:
注册中心( CoordinatorRegistryCenter ):用于协调分布式服务。必填。
Lite作业配置( LiteJobConfiguration ):必填。
作业事件总线( JobEventBus ):对作业事件异步监听。选填。
作业监听器( ElasticJobListener ):对作业执行前,执行后进行同步监听。选填。
2.1 注册中心配置
Elastic-Job 抽象了注册中心接口( RegistryCenter ),并提供了默认基于 Zookeeper 的注册中心实现( ZookeeperRegistryCenter )。
ZookeeperRegistryCenter 对应配置类为 ZookeeperConfiguration。该类注释很完整,可以点击链接直接查看源码,这里我们重点说下 namespace
属性。如果你有多个不同 Elastic-Job集群 时,使用相同 Zookeeper,可以配置不同的 namespace
进行隔离。
注册中心的初始化,我们会在《Elastic-Job-Lite 源码解析 —— 注册中心》详细分享。
2.2 Lite作业配置
LiteJobConfiguration 继承自接口 JobRootConfiguration,作为 Elastic-Job-Lite 里的作业( LiteJob )配置。Elastic-Job-Cloud 的作业( CloudJob )对应另外的配置类,也实现了该接口。
public final class LiteJobConfiguration implements JobRootConfiguration {
private final JobTypeConfiguration typeConfig;
private final boolean monitorExecution;
private final int maxTimeDiffSeconds;
private final int monitorPort;
private final String jobShardingStrategyClass;
private final int reconcileIntervalMinutes;
private final boolean disabled;
private final boolean overwrite;
// .... 省略部分get方法
public static class Builder {
// .... 省略部分属性
public final LiteJobConfiguration build() {
return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, reconcileIntervalMinutes, disabled, overwrite);
}
}
}
typeConfig
:作业类型配置。必填。monitorExecution
:监控作业运行时状态。默认为false
。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行》详细分享。每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。
每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。monitorPort
:作业监控端口。默认为-1
,不开启作业监控端口。选填。在《Elastic-Job-Lite 源码解析 —— 作业监控服务》详细分享。建议配置作业监控端口, 方便开发者dump作业信息。
使用方法: echo “dump” | nc 127.0.0.1 9888maxTimeDiffSeconds
:设置最大容忍的本机与注册中心的时间误差秒数。默认为-1
,不检查时间误差。选填。jobShardingStrategyClass
:作业分片策略实现类全路径。默认为使用分配侧路。选填。在《Elastic-Job-Lite 源码解析 —— 作业分片策略》详细分享。reconcileIntervalMinutes
:修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复。默认为10
。在《Elastic-Job-Lite 源码解析 —— 自诊断修复 》详细分享。disabled
:作业是否禁用执行。默认为false
。选填。overwrite
:设置使用本地作业配置覆盖注册中心的作业配置。默认为false
。选填。建议使用运维平台( console )配置作业配置,统一管理。Builder 类:使用该类配置 LiteJobConfiguration 属性,调用
#build()
方法最终生成作业配置。参见:《JAVA设计模式 — 生成器模式(Builder)》。
2.2.1 作业类型配置
作业类型配置接口( JobTypeConfiguration ) 有三种配置实现,针对三种作业类型:
配置实现 | 作业 | 说明 |
---|---|---|
SimpleJobConfiguration | SimpleJob | 简单作业。例如:订单过期作业 |
DataflowJobConfiguration | DataflowJob | 数据流作业。TODO:笔者暂时未了解流式处理数据,不误人子弟 |
ScriptJobConfiguration | ScriptJob | 脚本作业。例如:调用 shell 脚本备份数据库作业 |
三种配置类属性对比如:
属性 | SimpleJob | DataflowJob | ScriptJob | 说明 |
---|---|---|---|---|
coreConfig | √ | √ | √ | 作业核心配置 |
jobType | JobType.SIMPLE | JobType.DATAFLOW | JobType.SCRIPT | 作业类型 |
jobClass | √ | √ | √ (默认:ScriptJob.class) | 作业实现类全路径 |
streamingProcess | √ | 是否流式处理数据 | ||
scriptCommandLine | √ | 脚本型作业执行命令行 |
作业类型配置不仅仅适用于 Elastic-Job-Lite,也适用于 Elastic-Job-Cloud。
2.2.2 作业核心配置
作业核心配置( JobCoreConfiguration ),我们可以看到在每种作业类型配置都有该属性( coreConfig
)。
public final class JobCoreConfiguration {
private final String jobName;
private final String cron;
private final int shardingTotalCount;
private final String shardingItemParameters;
private final String jobParameter;
private final boolean failover;
private final boolean misfire;
private final String description;
private final JobProperties jobProperties;
public static class Builder {
// .... 省略部分属性
public final JobCoreConfiguration build() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(cron), "cron can not be empty.");
Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
return new JobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, description, jobProperties);
}
}
}
jobName
:作业名称。必填。cron
:cron表达式,用于控制作业触发时间。必填。shardingTotalCount
:作业分片总数。如果一个作业启动超过作业分片总数的节点,只有shardingTotalCount
会执行作业。必填。在《Elastic-Job-Lite 源码解析 —— 作业分片策略 》详细分享。shardingItemParameters
:分片序列号和参数。选填。分片序列号和参数用等号分隔,多个键值对用逗号分隔
分片序列号从0开始,不可大于或等于作业分片总数
如:
0=a,1=b,2=cjobParameter
:作业自定义参数。选填。作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
例:每次获取的数据量、作业实例从数据库读取的主键等failover
:是否开启作业执行失效转移。开启表示如果作业在一次作业执行中途宕机,允许将该次未完成的作业在另一作业节点上补偿执行。默认为false
。选填。在《Elastic-Job-Lite 源码解析 —— 作业失效转移 》详细分享。misfire
:是否开启错过作业重新执行。默认为true
。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行 》详细分享。description
:作业描述。选填。jobProperties
:作业属性配置。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行 》详细分享。public final class JobProperties {
private EnumMap<JobPropertiesEnum, String> map = new EnumMap<>(JobPropertiesEnum.class);
public enum JobPropertiesEnum {
/**
* 作业异常处理器.
*/
JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),
/**
* 线程池服务处理器.
*/
EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());
private final String key;
private final Class<?> classType;
private final String defaultValue;}
}JOB_EXCEPTION_HANDLER
:用于扩展异常处理类。EXECUTOR_SERVICE_HANDLER
:用于扩展作业处理线程池类。通过这个属性,我们可以自定义每个作业的异常处理和线程池服务。
2.3 作业事件配置
通过作业事件配置( JobEventConfiguration ),实现对作业事件的异步监听、处理。在《Elastic-Job-Lite 源码解析 —— 作业事件追踪》详细分享。
2.4 作业监听器
通过配置作业监听器( ElasticJobListener ),实现对作业执行的同步监听、处理。在《Elastic-Job-Lite 源码解析 —— 作业监听器》详细分享。
3. 作业配置服务
多个 Elastic-Job-Lite 使用相同注册中心和相同 namespace
组成集群,实现高可用。集群中,使用作业配置服务( ConfigurationService ) 共享作业配置。
public final class ConfigurationService {
/**
* 时间服务
*/
private final TimeService timeService;
/**
* 作业节点数据访问类
*/
private final JobNodeStorage jobNodeStorage;
public ConfigurationService(final CoordinatorRegistryCenter regCenter, final String jobName) {
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
timeService = new TimeService();
}
}
JobNodeStorage,封装注册中心,提供存储服务。在《Elastic-Job-Lite 源码解析 —— 作业数据存储》详细分享。
TimeService,时间服务,提供当前时间查询。
public final class TimeService {
/**
* 获取当前时间的毫秒数.
*
* @return 当前时间的毫秒数
*/
public long getCurrentMillis() {
return System.currentTimeMillis();
}}
3.1 读取作业配置
/**
* 读取作业配置.
*
* @param fromCache 是否从缓存中读取
* @return 作业配置
*/
public LiteJobConfiguration load(final boolean fromCache) {
String result;
if (fromCache) { // 缓存
result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
if (null == result) {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
} else {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
return LiteJobConfigurationGsonFactory.fromJson(result);
}
3.2 持久化作业配置
/**
* 持久化分布式作业配置信息.
*
* @param liteJobConfig 作业配置
*/
public void persist(final LiteJobConfiguration liteJobConfig) {
checkConflictJob(liteJobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
}
调用
#checkConflictJob(...)
方法校验注册中心存储的作业配置的作业实现类全路径(jobClass
)和当前的是否相同,如果不同,则认为是冲突,不允许存储:private void checkConflictJob(final LiteJobConfiguration liteJobConfig) {
Optional<LiteJobConfiguration> liteJobConfigFromZk = find();
if (liteJobConfigFromZk.isPresent()
&& !liteJobConfigFromZk.get().getTypeConfig().getJobClass().equals(liteJobConfig.getTypeConfig().getJobClass())) { // jobClass 是否相同
throw new JobConfigurationException("Job conflict with register center. The job '%s' in register center's class is '%s', your job class is '%s'",
liteJobConfig.getJobName(), liteJobConfigFromZk.get().getTypeConfig().getJobClass(), liteJobConfig.getTypeConfig().getJobClass());
}
}当注册中心未存储该作业配置 或者 当前作业配置允许替换注册中心作业配置(
overwrite = true
)时,持久化作业配置。
3.3 校验本机时间是否合法
/**
* 检查本机与注册中心的时间误差秒数是否在允许范围.
*
* @throws JobExecutionEnvironmentException 本机与注册中心的时间误差秒数不在允许范围所抛出的异常
*/
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
Elastic-Job-Lite 作业触发是依赖本机时间,相同集群使用注册中心时间为基准,校验本机与注册中心的时间误差是否在允许范围内(
LiteJobConfiguration.maxTimeDiffSeconds
)。
666. 彩蛋
Elastic-Job-Lite 源码解析系列第一篇文章,希望大家多多支持,预计全部更新完会有 15+ 篇。Elastic-Job-Cloud 源码系列后续也会更新。
道友,分享一波微信朋友圈支持支持支持,可好?
更多相关文章
- 注册中心 Eureka 源码解析 —— StringCache
- 注册中心 Eureka 源码解析 —— Eureka-Server 集群同步
- 注册中心 Eureka 源码解析 —— 网络通信
- 分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业失效转移
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 分布式作业 Elastic-Job-Lite 源码分析 —— 作业分片
- 分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业分片策略
- 注册中心 Eureka 源码解析 —— 任务批处理
- 消息中间件 RocketMQ 源码解析 —— 调试环境搭建