线程池原理


  • 线程池好处
  1. 降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控

** 线程池流程图:**

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都执行任务,则进入下个流程。

  2. 线程池判断工作队列是否已经饱满。如果工作队列没有饱满,则将新提交的任务存储在这工作队列里。如果工作队列满了,则进入下一个流程。

  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经饱满了,则交给饱和策略来处理这个任务。

**
ThreadPoolExecutor 结构图:**

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤
    需要获取全局锁)。

  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
    RejectedExecutionHandler.rejectedExecution()方法。

来看看源码(JDK1.8)的实现部分:

public void execute(Runnable command) {
         // 如果 command为null 则抛出异常
       if (command == null)
           throw new NullPointerException();
       /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        */
         
         // 获取当前的原子值大小
       int c = ctl.get();
         // 如果当前线程小于当前的核心线程数,先加入工作队列中。
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
   // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
   // 则创建一个线程执行任务。
       else if (!addWorker(command, false)) 
           reject(command);  // 抛出异常
   }

其中 ctl.get() 是获取原子值大小

线程池的使用

线程池工厂代码封装

package com.pwh.mycode.chapter09;

import java.util.concurrent.*;

/**
 * 线程池工厂
 */
public class ThreadPoolFactory {

    private static final int INIT_POOL_SIZE = 4;

    private static final int MAX_POOL_SIZE = 10;


    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(INIT_POOL_SIZE,
            MAX_POOL_SIZE, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1000),
            new ThreadPoolExecutor.AbortPolicy());


    /**
     * 异步
     *
     * @param r
     * @return
     */
    public static String addTaskSubmint(Runnable r) {
        System.out.println("运行的线程数:" + getActiveCount(threadPoolExecutor));
        System.out.println("阻塞的线程数:" + getBlockingQueue(threadPoolExecutor));
        threadPoolExecutor.execute(r);
        return "success";
    }


    /**
     * 同步
     *
     * @param c
     * @return
     */
    public static String addTaskSubmit(Callable c) {
        String m = null;
        System.out.println("运行的线程数:" + getActiveCount(threadPoolExecutor));
        System.out.println("阻塞的线程数:" + getBlockingQueue(threadPoolExecutor));
        Future<?> future = threadPoolExecutor.submit(c);

        try {
            Object object = future.get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "success";
    }





    /**
     * 运行线程数
     *
     * @param pool
     * @return
     */
    public static int getActiveCount(ThreadPoolExecutor pool) {
        return pool.getActiveCount();
    }


    /**
     * 获取最大的线程数
     *
     * @param pool
     * @return
     */
    public static int getMaximunPoolSize(ThreadPoolExecutor pool) {
        return pool.getMaximumPoolSize();
    }

    /***
     * 阻塞的线程数
     * @param pool
     * @return
     */
    public static int getBlockingQueue(ThreadPoolExecutor pool) {

        return pool.getQueue().size();
    }


}

线程池的创建也是通过 new ThreadPoolExecutor():

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

corePoolSize:线程池的基本大小

maximumPoolSize :允许最大的线程池数

TimeUnit unit : 时间戳

BlockingQueue workQueue :阻塞队列(此处是接口,JDK1.8)

RejectedExecutionHandler handler 饱和策略 当队列和线程池都满了,说明线程池处于饱和状
态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法
处理新任务时抛出异常


文章作者: coderpwh
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 coderpwh !
  目录