Java ThreadPoolExcutor

  |  
阅读次数
  |  
字数 4,322
  |  
时长 ≈ 20 分钟

我们先打开java.util.concurrent.ThreadPoolExecutor类,看下它的文档注释,大概了解一下它的作用。
我们可以了解到该类主要作用是进行线程池的创建,然后我们继续看到下面这一段描述

To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient Executors factory methods Executors.newCachedThreadPool() (unbounded thread pool, with automatic thread reclamation), Executors.newFixedThreadPool(int) (fixed size thread pool) and Executors.newSingleThreadExecutor() (single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class:

大概就是说为了更加方便使用,该类提供了手动配置以及调优的方法,不过一般比较推荐java.util.concurrent.Executors类的工厂方法进行创建线程池,它可以创建

  1. Executors.newCachedThreadPool() 无限线程池

它是一个可以无限扩大的线程池;
它比较适合处理执行时间比较小的任务;
corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

  1. Executors.newFixedThreadPool(int) 固定大小的线程池

它是一种固定大小的线程池;
corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉,但这里keepAliveTime无效;
阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

  1. Executors.newSingleThreadExecutor() 单线程的线程池

它只会创建一条工作线程处理任务;
采用的阻塞队列为LinkedBlockingQueue;

我们可以看一下他们具体的创建代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 无限制大小线程池,这里没有入参,只不过corePoolSize初始化的时候为0,maximumPoolSize为int的最大值。 **/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

/** 固定大小线程池,这里传入的参数是一个线程数,然后设置corePoolSize以及maximumPoolSize相同。 **/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

/** 单线程线程池,这里没有入参,默认设置了corePoolSize以及maximumPoolSize都为1。 **/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

可以看到,官方推荐的这几个创建不同类型的线程池方法里都是使用了ThreadPoolExecutor类的构造方法进行创建。

接下来让我们来看看ThreadPoolExecutor类的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

我们可以看到,有四个构造函数,前三个构造函数都是调用第四个构造函数进行创建的,所以我们可以看以下第四个构造函数的参数,
除了方法注释的简单参数介绍之外,
我们还可以再看回该类的文档注释,可以看到以下对这些核心参数的解释:

Core and maximum pool sizes
    A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(java.lang.Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).
Keep-alive times
    If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see getKeepAliveTime(java.util.concurrent.TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, java.util.concurrent.TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSizeThreads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero.
Queuing
    Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
        If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
        If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
        If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
    There are three general strategies for queuing:
        1. Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
        2. Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesnt have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.
        3. Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.

经过上面的图解以及文档注释解析之后我们来总结一下这些参数的作用:

int corePoolSize : 核心池的大小,当有任务来之后,才会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;若想一开始就创建所有核心线程需调用prestartAllCoreThreads()或者prestartCoreThread()方法,会直接预先创建corePoolSize的线程.
int maximumPoolSize : 线程池最大线程数,表示在线程池中最多能创建多少个线程,任务放入队列到达最大数后才会放到这里,如果运行中的线程超过了这个数字,那么相当于线程池已满,新来的任务会使用RejectedExecutionHandler 进行处理
long keepAliveTime : 表示线程没有任务执行时最多保持多久时间后被终止,线程池的最低数目最终将维持在corePoolSize 大小
TimeUnit unit : 参数keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue : 一个阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,才会放在这里
ThreadFactory threadFactory : 线程工厂,主要用来创建线程,比如可以指定线程的名字
RejectedExecutionHandler handler : 如果线程池已满(线程池满表示:任务放入队列到达最大数,然后其余的放到maximumPoolSize-corePoolSize里面,最终超过的部分),新的任务的处理方式,java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。

以上有一个重要的概念就是队列:

每种线程池都有一个corePoolSize,如果线程数超过了corePoolSize,则开始把线程先放到阻塞队列里,相当于生产者消费者的一个数据通道,有以下一些阻塞队列可供选择:

  1. ArrayBlockingQueue:一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。
  2. DelayQueue:阻塞的是其内部元素,DelayQueue中的元素必须实现 java.util.concurrent.Delayed接口,该接口只有一个方法就是long getDelay(TimeUnit unit),返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象,DelayQueue可应用于定时关闭连接、缓存对象,超时处理等各种场景;
  3. LinkedBlockingQueue:阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
  4. PriorityBlockingQueue:一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。
  5. SynchronousQueue:队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

其中在我们上面三个线程池里面,
CachedThreadPool使用了SynchronousQueue来达到一有新任务就立即创建新线程的操作,FixedThreadPool以及SingleFixedThreadPool则使用了默认的LinkedBlockingQueue队列作为存储任务的媒介,他的默认大小为Integer.MAX_VALUE

除了以上三种常见的线程池之外,我们常用的微服务框架dubbo里面也自己实现了另外两种线程池,在这里也介绍一下:

  1. LimitedThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.alibaba.dubbo.common.threadpool.support.limited;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
* automatically.
*/
public class LimitedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

}

这个线程池实现跟java自带的CachedThreadPool大同小异,最大的区别则在于这里面keepAliveTime的参数由原先的60L变为了Long.MAX_VALUE,也就是相当于无限长,意思也就是对已创建的线程永远不进行回收。

  1. EagerThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.alibaba.dubbo.common.threadpool.support.eager;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
* EagerThreadPool
* When the core threads are all in busy,
* create new thread instead of putting task into blocking queue.
*/
public class EagerThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}

通过上面我们都知道,当线程池中的所有核心线程都在忙碌时,会先把任务放进去队列里面,当队列满了之后才会放入新的线程,直到线程数大于最大maximumPoolSize时,会将超出的任务做另外处理。
EagerThreadPool线程池的操作就与此相反,它则是如果线程数超过了核心线程数之后,再添加新的任务不会放入阻塞队列,而是创建新的线程,直到达到最大线程限制,此时如果还有任务,才会放入阻塞队列。

这里它通过自定义的TaskQueue以及EagerThreadPoolExecutor来完成此操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 继承LinkedBlockingQueue队列
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

private static final long serialVersionUID = -2635853580887179627L;

private EagerThreadPoolExecutor executor;

public TaskQueue(int capacity) {
super(capacity);
}

public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
// 重写 offer 操作
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

int currentPoolThreadSize = executor.getPoolSize();
// 小于核心线程数 直接调用父类offer
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}

// 当前线程数大于核心线程数,并且小于最大线程数时,直接返回失败使线程池新建线程来执行任务 伪装阻塞队列已满
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}

// 当前线程数超过最大线程数时(currentPoolThreadSize >= max),放入队列
return super.offer(runnable);
}

// 重试 offer 操作
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}

主要逻辑是通过伪装队列已满,来使线程池新建线程执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 继承ThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

/**
* task count
* 任务数量
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

/**
* @return current tasks which are executed
* 返回正在执行的任务数量
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}

@Override
// 重写 execute 方法
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
// 当前任务数计数器
submittedTaskCount.incrementAndGet();
try {
//执行父类execute逻辑,
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
// 提交任务拒绝时 直接放入阻塞队列
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
}
}
}

看完TaskQueue以及ThreadPoolExecutor的逻辑,我们看回线程池Executor.execute()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void execute(Runnable command) {
/*如果提交的任务为null 抛出空指针异常*/
if (command == null)
throw new NullPointerException();

int c = ctl.get();
/*如果当前的任务数小于等于设置的核心线程大小,那么调用addWorker直接执行该任务*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*如果当前的任务数大于设置的核心线程大小,而且当前的线程池状态时运行状态,那么向阻塞队列中添加任务*/
//workQueue.offer(command)调用TaskQueue的offer方法,在上面以及看到超过核心线程数后如果当前线程数不超过总大小会返回false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*如果向队列中添加失败,那么就新开启一个线程来执行该任务*/
//tip:最主要的就是这里 当向阻塞队列插入失败时,会直接调用 addWorker 方法,创建新的线程执行任务
else if (!addWorker(command, false))
reject(command);
}

通过 ThreadPoolExecutor 源码可以发现,当向阻塞队列插入失败时,会直接调用 addWorker 方法,创建新的线程执行任务。
现在就和重写的 TaskQueue 的 offer 方法对上了。