本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作业执行类型

  • 3. Producer 发布任务

  • 4. TaskLaunchScheduledService 提交任务

  • 5. TaskExecutor 执行任务

  • 6. SchedulerEngine 处理任务的状态变更


1. 概述

本文主要分享 Elastic-Job-Cloud 调度主流程。对应到 Elastic-Job-Lite 源码解析文章如下:

  • 《Elastic-Job-Lite 源码分析 —— 作业初始化》

  • 《Elastic-Job-Lite 源码分析 —— 作业执行》

  • 《Elastic-Job-Lite 源码分析 —— 作业分片》

如果你阅读过以下文章,有助于对本文的理解:

  • 《基于Mesos的当当作业云Elastic Job Cloud》

  • 《由浅入深 | 如何优雅地写一个Mesos Framework》

另外,笔者假设你已经对 《Elastic-Job-Lite 源码分析系列》 有一定的了解。

本文涉及到主体类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

Elastic-Job-Cloud 基于 Mesos 实现分布式作业调度,或者说 Elastic-Job-Cloud 是 Mesos 上的 框架( Framework )。

一个 Mesos 框架由两部分组成:

  • 控制器部分,称为调度器( Scheduler )。

  • 工作单元部分,称为执行器( Executor )。

Elastic-Job-Cloud 由两个项目组成:

  • Elastic-Job-Cloud-Scheduler,实现调度器,实现类为 com.dangdang.ddframe.job.cloud.scheduler.mesos.SchedulerEngine

  • Elastic-Job-Cloud-Executor,实现执行器,实现类为 com.dangdang.ddframe.job.cloud.executor.TaskExecutor

本文略微“啰嗦”,请保持耐心。搭配《用Mesos框架构建分布式应用》一起阅读,理解难度降低 99%。OK,开始我们的 Cloud 之旅。

2. 作业执行类型

在 Elastic-Job-Cloud,作业执行分成两种类型:

  • 常驻作业

常驻作业是作业一旦启动,无论运行与否均占用系统资源; 
常驻作业适合初始化时间长、触发间隔短、实时性要求高的作业,要求资源配备充足。

  • 瞬时作业

瞬时作业是在作业启动时占用资源,运行完成后释放资源。 
瞬时作业适合初始化时间短、触发间隔长、允许延迟的作业,一般用于资源不太充分,或作业要求的资源多,适合资源错峰使用的场景。

Elastic-Job-Cloud 不同于 Elastic-Job-Lite 去中心化执行调度,转变为 Mesos Framework 的中心节点调度。这里不太理解,没关系,下文看到具体代码就能明白了。

常驻作业、瞬时作业在调度中会略有不同,大体粗略流程如下:

下面,我们针对每个过程一节一节解析。

3. Producer 发布任务

在上文《Elastic-Job-Cloud 源码分析 —— 作业配置》的「3.1.1 操作云作业配置」可以看到添加云作业配置后,Elastic-Job-Cloud-Scheduler 会执行作业调度,实现代码如下:

// ProducerManager.java
/**
* 调度作业.

@param jobConfig 作业配置
*/

public void schedule(final CloudJobConfiguration jobConfig) {
   // 应用 或 作业 被禁用,不调度
   if (disableAppService.isDisabled(jobConfig.getAppName()) || disableJobService.isDisabled(jobConfig.getJobName())) {
       return;
   }
   if (CloudJobExecutionType.TRANSIENT == jobConfig.getJobExecutionType()) { // 瞬时作业
       transientProducerScheduler.register(jobConfig);
   } else if (CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) { // 常驻作业
       readyService.addDaemon(jobConfig.getJobName());
   }
}
  • 瞬时作业和常驻作业在调度上会有一定的不同。

3.1 常驻作业

常驻作业在调度时,直接添加到待执行作业队列。What?岂不是马上就运行了!No No No,答案在「5. TaskExecutor 执行任务」,这里先打住。

// ReadyService.java
/**
* 将常驻作业放入待执行队列.
*
@param jobName 作业名称
*/

public void addDaemon(final String jobName) {
   if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
       log.warn("Cannot add daemon job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
       return;
   }
   Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
   if (!cloudJobConfig.isPresent() || CloudJobExecutionType.DAEMON != cloudJobConfig.get().getJobExecutionType() || runningService.isJobRunning(jobName)) {
       return;
   }
   // 添加到待执行队列
   regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}

// ReadyNode.java
final class ReadyNode {

    static final String ROOT = StateNode.ROOT + "/ready";

    private static final String READY_JOB = ROOT + "/%s"// %s = ${JOB_NAME}
}
  • ReadyService,待执行作业队列服务,提供对待执行作业队列的各种操作方法。

  • 待执行作业队列存储在注册中心( Zookeeper )的持久数据节点 /${NAMESPACE}/state/ready/${JOB_NAME},存储值为待执行次数。例如此处,待执行次数为 1。使用 zkClient 查看如下:

    [zk: localhost:2181(CONNECTED) 4] ls /elastic-job-cloud/state/ready
    [test_job_simple]
    [zk: localhost:2181(CONNECTED) 5] get /elastic-job-cloud/state/ready/test_job_simple
    1
  • 在运维平台,我们可以看到待执行作业队列:


  • 从官方的 RoadMap 来看,待执行作业队列未来会使用 Redis 存储以提高性能。

    FROM http://elasticjob.io/docs/elastic-job-cloud/03-design/roadmap/ 
    Redis Based Queue Improvement

3.2 瞬时作业

瞬时作业在调度时,使用发布瞬时作业任务的调度器( TransientProducerScheduler )调度作业。当瞬时作业到达作业执行时间,添加到待执行作业队列。

3.2.1 TransientProducerScheduler

TransientProducerScheduler,发布瞬时作业任务的调度器,基于 Quartz 实现对瞬时作业的调度。初始化代码如下:

// TransientProducerScheduler.java
void start() {
   scheduler = getScheduler();
   try {
       scheduler.start();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}

private Scheduler getScheduler() {
   StdSchedulerFactory factory = new StdSchedulerFactory();
   try {
       factory.initialize(getQuartzProperties());
       return factory.getScheduler();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}

private Properties getQuartzProperties() {
   Properties result = new Properties();
   result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
   result.put("org.quartz.threadPool.threadCount", Integer.toString(Runtime.getRuntime().availableProcessors() * 2)); // 线程池数量
   result.put("org.quartz.scheduler.instanceName""ELASTIC_JOB_CLOUD_TRANSIENT_PRODUCER");
   result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
   result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
   return result;
}

3.2.2 注册瞬时作业

调用 TransientProducerScheduler#register(...) 方法,注册瞬时作业。实现代码如下:

// TransientProducerScheduler.java
private final TransientProducerRepository repository;

synchronized void register(final CloudJobConfiguration jobConfig) {
   String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
   // 添加 cron 作业集合
   JobKey jobKey = buildJobKey(cron);
   repository.put(jobKey, jobConfig.getJobName());
   // 调度 作业
   try {
       if (!scheduler.checkExists(jobKey)) {
           scheduler.scheduleJob(buildJobDetail(jobKey), buildTrigger(jobKey.getName()));
       }
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}
  • 调用 #buildJobKey(…) 方法,创建 Quartz JobKey。你会发现很有意思的使用的是 cron 参数作为主键。Why?在看下 !scheduler.checkExists(jobKey) 处,相同 JobKey( cron ) 的作业不重复注册到 Quartz Scheduler。Why?此处是一个优化,相同 cron 使用同一个 Quartz Job,Elastic-Job-Cloud-Scheduler 可能会注册大量的瞬时作业,如果一个瞬时作业创建一个 Quartz Job 太过浪费,特别是 cron每分钟、每5分钟、每小时、每天已经覆盖了大量的瞬时作业的情况。因此,相同 cron 使用同一个 Quartz Job。

  • 调用 TransientProducerRepository#put(...) 以 Quartz JobKey 为主键聚合作业。

    final class TransientProducerRepository {
    /**
     * cron 作业集合
     * key:作业Key
     */

    private final ConcurrentHashMap&lt;JobKey, List&lt;String&gt;&gt; cronTasks = new ConcurrentHashMap&lt;&gt;(2561);

    synchronized void put(final JobKey jobKey, final String jobName) {
        remove(jobName);
        List&lt;String&gt; taskList = cronTasks.get(jobKey);
        if (null == taskList) {
            taskList = new CopyOnWriteArrayList&lt;&gt;();
            taskList.add(jobName);
            cronTasks.put(jobKey, taskList);
            return;
        }
        if (!taskList.contains(jobName)) {
            taskList.add(jobName);
        }
    }
    }
  • 调用 #buildJobDetail(...) 创建 Quartz Job 信息。实现代码如下:

    private JobDetail buildJobDetail(final JobKey jobKey) {
        JobDetail result = JobBuilder.newJob(ProducerJob.class) // ProducerJob.java
                .withIdentity(jobKey).build();
        result.getJobDataMap().put("repository", repository);
        result.getJobDataMap().put("readyService", readyService);
        return result;
    }
    • JobBuilder#newJob(…) 的参数是 ProducerJob,下文会讲解到。

  • 调用 #buildTrigger(...) 创建 Quartz Trigger。实现代码如下:

    private Trigger buildTrigger(final String cron) {
       return TriggerBuilder.newTrigger()
               .withIdentity(cron)
               .withSchedule(CronScheduleBuilder.cronSchedule(cron) // cron
               .withMisfireHandlingInstructionDoNothing())
               .build();
    }

3.2.3 ProducerJob

ProducerJob,当 Quartz Job 到达 cron 执行时间( 即作业执行时间),将相应的瞬时作业添加到待执行作业队列。实现代码如下:

public static final class ProducerJob implements Job {

   private TransientProducerRepository repository;

   private ReadyService readyService;

   @Override
   public void execute(final JobExecutionContext context) throws JobExecutionException {
       List<String> jobNames = repository.get(context.getJobDetail().getKey());
       for (String each : jobNames) {
           readyService.addTransient(each);
       }
   }
}
  • 调用 TransientProducerRepository#get(...) 方法,获得该 Job 对应的作业集合。实现代码如下:

    final class TransientProducerRepository {
    /**
     * cron 作业集合
     * key:作业Key
     */

    private final ConcurrentHashMap&lt;JobKey, List&lt;String&gt;&gt; cronTasks = new ConcurrentHashMap&lt;&gt;(2561);

    List&lt;String&gt; get(final JobKey jobKey) {
        List&lt;String&gt; result = cronTasks.get(jobKey);
        return null == result ? Collections.&lt;String&gt;emptyList() : result;
    }
    }
  • 调用 ReadyService#addTransient(...) 方法,添加瞬时作业到待执行作业队列。实现代码如下:

    /**
    * 将瞬时作业放入待执行队列.

    @param jobName 作业名称
    */

    public void addTransient(final String jobName) {
       //
       if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
           log.warn("Cannot add transient job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
           return;
       }
       //
       Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
       if (!cloudJobConfig.isPresent() || CloudJobExecutionType.TRANSIENT != cloudJobConfig.get().getJobExecutionType()) {
           return;
       }
       // 
       String readyJobNode = ReadyNode.getReadyJobNodePath(jobName);
       String times = regCenter.getDirectly(readyJobNode);
       if (cloudJobConfig.get().getTypeConfig().getCoreConfig().isMisfire()) {
           regCenter.persist(readyJobNode, Integer.toString(null == times ? 1 : Integer.parseInt(times) + 1));
       } else {
           regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
       }
    }
    • 添加瞬时作业到待执行作业队列 和 添加常驻作业到待执行作业队列基本是一致的。

    • 当作业配置允许 misfire,则不断累积作业可执行次数。

3.3 小结

无论是常驻作业还是瞬时作业,都会加入到待执行作业队列。目前我们看到瞬时作业的每次调度是 TransientProducerScheduler 负责。那么常驻作业的每次调度呢?「5. TaskExecutor 执行任务」会看到它的调度,这是 Elastic-Job-Cloud 设计巧妙有趣的地方。


艿艿:因为本文实在有点太长了,微信有文章长度限制,麻烦胖友访问 http://www.iocoder.cn/Elastic-Job/cloud-job-scheduler-and-executor-first/ 进行继续阅读。

啊啊啊啊,我当初是怎么写完的。。。。。


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 分布式作业系统 Elastic-Job-Cloud 源码分析 —— 作业配置
  2. 分布式作业 Elastic-Job-Lite 源码分析 —— 注册中心
  3. 分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复
  4. 分布式做系统 Elastic-Job-Lite 源码分析 —— 作业初始化
  5. 分布式消息队列 RocketMQ源码解析:事务消息
  6. 分布式消息队列 RocketMQ源码解析:Filtersrv
  7. 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
  8. 分布式消息队列 RocketMQ 源码分析 —— 高可用
  9. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

随机推荐

  1. PHP如何实现支付宝支付功能(图文详解)
  2. PHP 数组常用函数总结
  3. PHP重置数组为连续数字索引的三种方式
  4. PHP中箭头函数的实例详解
  5. PHP根据键值合并数组
  6. php单例模式 使用场景和使用方法
  7. 学习PHP死循环写法和作用
  8. PHP 简单实现延时操作
  9. 详细解读PHP中return用法(附代码)
  10. PHP yield 协程 生成器用法的了解