ThreadPoolExecutor的使用

/ 多线程 / 没有评论 / 388浏览

ThreadPoolExecutor也就是线程池。它就是Java为我们开发多线程程序时提供的一个开发框架。它可以统一的管理线程的创建、销毁、优化、监控等,在使用线程池时比我们直接使用原始的线程类更加方便。既然线程池这么方便,那它到底是怎么实现上述的功能呢?下面我们先看一下当用线程池启动一个线程时它的流程图。

sjms.jpg

线程池的处理流程如下:

当我们用线程池启动一个任务时,线程池首先会检查核心线程池里面的线程数是否已经超过corePoolSize。如果没有超过则创建一个新的线程执行任务。如果超过了,那么将当前执行的任务添加到线程池的工作队列中,但在加入之前会先检查工作队列是否已经满了,如果工作队列已经满了,那么此时它会检查线程池中的线程是否超过了允许的最大数量。如果没有超过则创建线程执行任务,如果超过了最大数量,则按照无法执行的策略处理。

线程池的创建:在创建ThreadPoolExecutor时,会需要传递几个必要的参数,下面我们详细看一下它们每个参数所代表的含义。

  1. ArrayBlockingQueue:是基于数组结构的任务队列。此队列按先进先出的原则对任务进行排序。
  2. LinkedBlockingQueue:是基于链表结构的任务队列。此队列也是按先进先出的原则对任务进行排序。但性能通常要比ArrayBlockingQueue高
  3. SynchronousQueue:一个不存储元素的任务队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
  4. PriorityBlockingQueue:是一个具有优先级的任务队列。此队列中的元素必须能够比较。
  1. AbortPolicy:直接抛出异常。 
  2. CallerRunsPolicy:只用调用者所在线程来运行任务。 
  3. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 
  4. DiscardPolicy:不处理,丢弃掉。

下面我们用具体的代码来详细说明一下ThreadPoolExecutor的使用。

public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(1);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 1, TimeUnit.DAYS, arrayBlockingQueue);
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(String.format("任务: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(String.format("任务: %s", Thread.currentThread().getName()));
}
});
System.out.println(String.format("列队中线程数:%s", arrayBlockingQueue.size()));
Thread.sleep(1000);
threadPool.shutdown();
}
}
任务: pool-1-thread-1
任务: pool-1-thread-2
列队中线程数:0

按照上面的分析,因为我们创建ThreadPoolExecutor对象时初始化的空闲线程是2个,并且我们添加到线程池中的数量也是2个,所以当前任务是由核心线程池执行的任务并不会将任务添加到对列中。如果我在继续向线程池中提交任务,那么因为超过了我们设置的corePoolSize数量,所以此时队列中就有了我们新提交的任务了。

public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(1);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 1, TimeUnit.DAYS, arrayBlockingQueue);
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(String.format("任务1: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(String.format("任务2: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(String.format("任务3: %s", Thread.currentThread().getName()));
}
});
System.out.println(String.format("列队中线程数:%s", arrayBlockingQueue.size()));
Thread.sleep(1000);
threadPool.shutdown();
}
}
任务1: pool-1-thread-1
任务2: pool-1-thread-2
任务3: pool-1-thread-1
列队中线程数:1

因为我们设置的线程池的最大线程数是3也就是maximumPoolSize的值。如果超过这个值,并且我们没有更改相应的饱和策略,那么此时就会抛出异常信息。

public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(1);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 1, TimeUnit.DAYS, arrayBlockingQueue);
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务1: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务2: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务3: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务4: %s", Thread.currentThread().getName()));
}
});
System.out.println(String.format("列队中线程数:%s", arrayBlockingQueue.size()));
Thread.sleep(1000);
threadPool.shutdown();
}
}
列队中线程数:1
任务1: pool-1-thread-1
任务2: pool-1-thread-2
任务4: pool-1-thread-3
任务3: pool-1-thread-1

我们发现程序居然没有报错,这是因为什么呢。这是因为参数maximumPoolSize的作是指线程池最大能允许的最大并发数是3也就是说同时可以执行3个线程,但是我们别忘了线程池中还有一个队列呢。队列里存储的就是将要被执行的任务,只是现在已经超过了最大并发数所以队列里的任务只能等待线程池中有其它任务执行完后,它才可以执行。所以此时线程池中允许我们提交任务的最大数就是maximumPoolSize加上队列的数量。但如果我们继续向线程池有添加任务,那么线程池就会报错了,因为已经没有地方存储新任务了,队列也已经满了,所以只能走饱和策略的默认策略就是抛出异常。

public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(1);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 1, TimeUnit.DAYS, arrayBlockingQueue);
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务1: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务2: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务3: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务4: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务5: %s", Thread.currentThread().getName()));
}
});
Thread.sleep(1000);
System.out.println(String.format("列队中线程数:%s", arrayBlockingQueue.size()));
Thread.sleep(1000);
threadPool.shutdown();
}
}
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Test$5@2503dbd3 rejected from java.util.concurrent.ThreadPoolExecutor@4b67cf4d[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at Test.main(Test.java:59)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
任务2: pool-1-thread-2
任务1: pool-1-thread-1
任务4: pool-1-thread-3
任务3: pool-1-thread-2

下面我们修改一下线程池的饱和策略。

public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(1);
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Thread thread = new Thread(r);
thread.start();
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 1, TimeUnit.DAYS, arrayBlockingQueue, rejectedExecutionHandler);
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务1: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务2: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务3: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务4: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务5: %s", Thread.currentThread().getName()));
}
});
Thread.sleep(1000);
System.out.println(String.format("列队中线程数:%s", arrayBlockingQueue.size()));
Thread.sleep(1000);
threadPool.shutdown();
}
}
列队中线程数:1
任务1: pool-1-thread-1
任务2: pool-1-thread-2
任务4: pool-1-thread-3
任务5: Thread-0
任务3: pool-1-thread-1

线程池中相关方法的介绍

public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(1);
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Thread thread = new Thread(r);
thread.start();
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 1, TimeUnit.DAYS, arrayBlockingQueue, rejectedExecutionHandler);
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务1: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务2: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务3: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务4: %s", Thread.currentThread().getName()));
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务5: %s", Thread.currentThread().getName()));
}
});
System.out.println(String.format("taskCount%s", threadPool.getTaskCount()));
System.out.println(String.format("completedTaskCount%s", threadPool.getTaskCount()));
System.out.println(String.format("largestPoolSize%s", threadPool.getLargestPoolSize()));
System.out.println(String.format("getPoolSize%s", threadPool.getPoolSize()));
System.out.println(String.format("getActiveCount%s", threadPool.getActiveCount()));
Thread.sleep(1000);
threadPool.shutdown();
}
}
taskCount:4
completedTaskCount:4
largestPoolSize:3
getPoolSize:3
getActiveCount:3
任务1: pool-1-thread-1
任务5: Thread-0
任务2: pool-1-thread-2
任务4: pool-1-thread-3
任务3: pool-1-thread-1
taskCount:线程池需要执行的任务数。虽然我们向线程池中提交了5个任务,但第5个任务并不是由线程池执行的,是我们修改了饱和策略自己执行的。所以此值返回结果是4。
completedTaskCount:线程池中完成的任务数。
largestPoolSize:线程池中曾经创建过的最大线程数。也就是有多少个线程同是执行,也叫最大并发数。
getPoolSize:线程池中的线程数。如果线程池不销毁,那么线程池里的线程也不会自动销毁。
getActiveCount:活动的线程数。