1.设计层面 首先要有任务抽象,这里可以是Runnable及Callable的实现类。有了任务后,执行任务的一般是线程,但如果当前线程不够时希望任务能够被暂时缓存,之后有空闲线程后再被消费。所以就有了任务队列。 空闲线程就是从这个队列取任务进行处理,可以将这个职责抽象一个Worker类。 然后有一个核心流程服务类,执行用户提交的任务,如果无线程可用时执行相应的往任务队列塞任务或拒绝等逻辑。 这个核心流程实现类就是ThreadPoolExecutor。 2.任务加入到线程池代码分析 类是:ThreadPoolExecutor 基本流程对应流程图,可参考深入线程池一。 代码如下:
 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}


ctl 是一个 AtomicInteger 类型, 它的低29位 用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:
  • 111: RUNNING
  • 000: SHUTDOWN
  • 001: STOP
  • 010: TIDYING
  • 110: TERMINATED

代码流程如下: 1.首先校验入参任务是否为空,为空抛异常 2.获取ctl,【int c = ctl.get(); 】这里刚进来时是:-536870912 3.[workerCountOf(c) ]用于获取当前已有线程数,如果小于核心线程数则执行: if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } 这里的addWorker(command, true))用于创建新的worker逻辑。以下会分析。 这里只要知道如果创建新的Worker成功后,会返回给调用方,此时表示已经新建了一个Worker用于处理这个任务。
4.否则尝试将任务提交到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } 这里必须要求整个线程池状态必须是Running状态。即如果线程处理于SHUTDOWN,STOP,TIDYING,TERMINATED等状态时是不会再将任务加入到任务队列。即然要关闭线程池,就不要再加入任务到队列了,没意义。 这里可以看到,如果在加入到队列后,还会再次检查线程池状态,如果这里变成不可用状态则会执行拒绝策略。 这里的使用意义在于,如果外部调用shutdown方法,在执行到这段代码时,会执行相应的拒绝策略,如果使用默认的拒绝策略则当前任务不会被执行。如果使用的是以阻塞的形式往队列加任务时,则任务还是会被执行。
5.如果队列已满,执行以下代码,这里主要是想看能否创建线程来执行 else if (!addWorker(command, false)) reject(command);
可以看到当前如果线程数已经大于【最大线程数】则会执行【reject(command)】来执行拒绝策略。
6.addWorker方法,职责是想办法创建新的Worker来执行任务,代码如下: 方法参数:【boolean core】表示是否是小于核心线程数的逻辑,这个方法对于小于核心线程数及大于核心线程数的两种情况都可以复用。 然后主要逻辑分成两块: a.cas设置线程数大小,失败重试 b.创建Worker执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获取状态及当前线程数值,共32位,高3位用于存储状态,低29位用于存储线程数
int rs = runStateOf(c);

// 这里需要保证后续shutdown时,worker退出处理时要求在队列还有任务时,并且当前没有线程执行任务时,这里不会返回失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

//线程池还是running状态,或者虽然是shutdown,但还有任务需要执行,这时是由Worker退出时会调用这块代码
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


更多相关文章

  1. Java Executor多线程框架
  2. java线程--volatile实现可见性
  3. Maven:主线程中的NoClassDefFoundError
  4. java线程池使用场景和使用方法较详细文摘
  5. java中多线程安全问题产生&解决方案——同步方法

随机推荐

  1. 使用RxJava和RxAndroid封装RxBus,实现Even
  2. 解决:AndroidStudio 下使用AIDL不能生成对
  3. 哪个移动系统适合程序员?
  4. [置顶] Android系统体系结构分析
  5. Android消息推送接收后,通知栏的显示
  6. Android RxJava 使用RxJava开发
  7. Android自定义View-----上下拖动布局--Sl
  8. monkeyrunner截图限时免费阅读#1
  9. Android使用SQLite数据库(3)
  10. Android 实现从本地读取图片更改聊天背景