- 线程池好处:
- 降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控
** 线程池流程图:**
线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都执行任务,则进入下个流程。
线程池判断工作队列是否已经饱满。如果工作队列没有饱满,则将新提交的任务存储在这工作队列里。如果工作队列满了,则进入下一个流程。
线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经饱满了,则交给饱和策略来处理这个任务。
**
ThreadPoolExecutor 结构图:**
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤
需要获取全局锁)。如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
来看看源码(JDK1.8)的实现部分:
public void execute(Runnable command) {
// 如果 command为null 则抛出异常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取当前的原子值大小
int c = ctl.get();
// 如果当前线程小于当前的核心线程数,先加入工作队列中。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
// 则创建一个线程执行任务。
else if (!addWorker(command, false))
reject(command); // 抛出异常
}
其中 ctl.get() 是获取原子值大小
线程池的使用
线程池工厂代码封装
package com.pwh.mycode.chapter09;
import java.util.concurrent.*;
/**
* 线程池工厂
*/
public class ThreadPoolFactory {
private static final int INIT_POOL_SIZE = 4;
private static final int MAX_POOL_SIZE = 10;
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(INIT_POOL_SIZE,
MAX_POOL_SIZE, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.AbortPolicy());
/**
* 异步
*
* @param r
* @return
*/
public static String addTaskSubmint(Runnable r) {
System.out.println("运行的线程数:" + getActiveCount(threadPoolExecutor));
System.out.println("阻塞的线程数:" + getBlockingQueue(threadPoolExecutor));
threadPoolExecutor.execute(r);
return "success";
}
/**
* 同步
*
* @param c
* @return
*/
public static String addTaskSubmit(Callable c) {
String m = null;
System.out.println("运行的线程数:" + getActiveCount(threadPoolExecutor));
System.out.println("阻塞的线程数:" + getBlockingQueue(threadPoolExecutor));
Future<?> future = threadPoolExecutor.submit(c);
try {
Object object = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
/**
* 运行线程数
*
* @param pool
* @return
*/
public static int getActiveCount(ThreadPoolExecutor pool) {
return pool.getActiveCount();
}
/**
* 获取最大的线程数
*
* @param pool
* @return
*/
public static int getMaximunPoolSize(ThreadPoolExecutor pool) {
return pool.getMaximumPoolSize();
}
/***
* 阻塞的线程数
* @param pool
* @return
*/
public static int getBlockingQueue(ThreadPoolExecutor pool) {
return pool.getQueue().size();
}
}
线程池的创建也是通过 new ThreadPoolExecutor():
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize:线程池的基本大小
maximumPoolSize :允许最大的线程池数
TimeUnit unit : 时间戳
BlockingQueue
RejectedExecutionHandler handler 饱和策略 当队列和线程池都满了,说明线程池处于饱和状
态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法
处理新任务时抛出异常