侧边栏壁纸
博主头像
林雷博主等级

斜月沉沉藏海雾,碣石潇湘无限路

  • 累计撰写 132 篇文章
  • 累计创建 47 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

9、Executor及ThreadPoolExecutor线程池源码解析

林雷
2023-05-18 / 0 评论 / 19 点赞 / 321 阅读 / 37,306 字

20221014

一 Executor执行器

Executor执行器,在Java中作为顶级的执行器接口,通常是作为一个异步执行器来执行指定的任务。在Executor接口中只定义了一个方法,用于执行指定的任务:

/**
 * 执行器
 */
public interface Executor {
    /**
     * 执行给定的线程
     * @param command 线程
     */
    void execute(Runnable command);
}

Java中的线程池ThreadPoolExecutor就是一个Executor的实现。虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础。Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。
如下,是Executor家族常见的一些类和接口:
ThreadPoolExecutor1-1

1.1 ExecutorService

Executor只是一个执行器,没有提供服务的功能,比如停止服务;也没有提供有返回值异步执行任务的函数,而这些功能在ExecutorService中提供了。ExecutorService 代码如下:

/**
 * 这是一个{@link Executor}的管理器, 并且提供了一些额外的方法
 */
public interface ExecutorService extends Executor {
    /**
     * 停止线程, 这里启动有序停止, 执行以前提交的任务, 但不接受新任务。如果重复调用这个方法, 则没有用其他效果。
     * 这个方法并不会等待以前的提交的任务, 而是使用{@link #awaitTermination(long, TimeUnit)}去做的
     */
    void shutdown();

    /**
     * 尝试停止所有正在执行的任务、停止正在等待的任务, 并返回正在等待任务的任务列表。
     * 这个方法并不会等待以前的提交的任务, 而是使用{@link #awaitTermination(long, TimeUnit)}去做的
     * @return 等待任务的任务列表
     */
    List<Runnable> shutdownNow();

    /**
     * 检查当前线程池是否已经shutdown
     * @return true: shutdown
     */
    boolean isShutdown();

    /**
     * 是否是终止状态
     * @return true/false
     */
    boolean isTerminated();

    /**
     * 阻塞: 直到所有任务在关闭请求(shutdown)后完成执行, 或者超时发生, 或者当前线程被中断
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return true: 这个执行器被终止; false: 终止完成前发生超时
     * @throws InterruptedException 异常
     */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 提交一个任务到线程池
     * @param task 任务
     * @param <T> 回调数据
     * @return Future
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个任务到线程池
     * @param task 任务
     * @param result 回调的对象
     * @param <T> 类型
     * @return Future
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个任务到线程池
     * @param task 任务
     * @return Future
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定的所有的任务
     * @param tasks 任务
     * @param <T> 类型
     * @return Future集合
     * @throws InterruptedException 异常
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    /**
     * 执行所有的任务
     * @param tasks 任务集合
     * @param timeout 超时时间
     * @param unit 单位
     * @param <T> 类型
     * @return Future集合
     * @throws InterruptedException 异常
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 执行给定的任务, 但是这里只返回一个已经执行成功的结果
     * @param tasks 任务
     * @param <T> 类型
     * @return 执行成功的结果
     * @throws InterruptedException 异常
     * @throws ExecutionException 异常
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /**
     * 执行给定的任务, 但是这里只返回一个已经执行成功的结果
     * @param tasks 任务
     * @param timeout 超时时间
     * @param unit 时间单位
     * @param <T> 类型
     * @return 执行成功的结果
     * @throws InterruptedException 异常
     * @throws ExecutionException 异常
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService接口提供了关闭服务、等待服务关闭、提交任务并获取返回值等功能,在Executor之上扩展了服务的功能。而AbstractExecutorService是对ExecutorService做了基本的实现。接下来我们对ThreadPoolExecutor线程池做详细的解析。

二 ThreadPoolExecutor源码解析

ThreadPoolExecutor,线程池执行器,JDK中提供的标准的线程池类,通过该类我们可以很轻松实现线程池化的功能,从而避免系统无限制创建线程而影响性能。ThreadPoolExecutor是一个灵活的、稳定的线程池,也允许用户进行很多定制。

2.1 ThreadPoolExecutor使用示例

如下示例:

public class ThreadPoolExecutorTest {

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(2);//监视器
        LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(8);//阻塞队列
        //创建线程池, 指定阻塞队列和线程创建工厂
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, blockingQueue, new ThreadFactory() {
            AtomicInteger index = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                final Thread thread = new Thread(r);
                thread.setName("线程" + index.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                latch.countDown();
                throw new RejectedExecutionException(r.toString() + " reject");
            }
        });

        //执行一个异步任务
        executor.execute(() -> {
            try {
                System.out.println(getTime() + " - " + Thread.currentThread().getName() + "执行异步任务");
            } finally {
                latch.countDown();
            }
        });

        //执行可拿结果的异步任务
        final Future<String> future = executor.submit(() -> {
            try {
                System.out.println(getTime() + " - " + Thread.currentThread().getName() + "可回调结果的执行异步任务");
                sleep(2, TimeUnit.SECONDS);
                return "结果";
            } finally {
                latch.countDown();
            }
        });
        final String resp = future.get();//会阻塞, 直到上述线程2秒后返回结果才会被唤醒
        System.out.println(getTime() + " - " + "主线程获取结果: " + resp);

        latch.await();
        executor.shutdown();//停机
    }

    public static void sleep(int timeout, TimeUnit timeUnit) {
        try {
            timeUnit.sleep(timeout);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static String getTime() {
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
    }
}

运行效果如下(个人运行效果顺序可能不同):

2023-05-06 17:38:42.189 - 线程1可回调结果的执行异步任务
2023-05-06 17:38:42.189 - 线程0执行异步任务
2023-05-06 17:38:44.189 - 主线程获取结果: 结果

上述的示例,演示了ThreadPoolExecutor的构造过程、执行任务、可回调结果的执行任务、关机、拒绝处理等操作。接下来我们看看ThreadPoolExecutor的原理。

2.2 ThreadPoolExecutor介绍

ThreadPoolExecutor的继承关系图如下:
ThreadPoolExecutor2.2-1

ThreadPoolExecutor具有ExecutorService和Executor接口的功能,当然,作为一个线程池,其最主要的功能就是管理线程的能力,同时ThreadPoolExecutor也是ExecutorService的子类,所以它可以提供类似服务的功能,比如停机等。

2.2.1 相关全局变量

ThreadPoolExecutor的全局变量可归类为两种,一种是ThreadPoolExecutor运行过程中自身需要的,不可通过外部变更,只能通过运行过程随着线程池的逻辑变更而改变,另一种是线程池的一些参数,是可由用户自定义的参数,通过这一系列参数可对线程池做线程数量控制、阻塞队列大小控制等操作。

我们先看第一种全局变量的一些参数,即ThreadPoolExecutor自身运行过程中维护的:

/**
 * 主要的线程池控制状态, 这是一个原子的Integer, 并且这代表了两个字段的概念:
 * workCount: 有效的线程数, 低29位表示有效的线程数
 * runState: 线程的运行状态。有{@link #RUNNING}、{@link #SHUTDOWN}、{@link #STOP}、{@link #TIDYING}和{@link #TERMINATED}等状态.使用高3位表示
 * 默认的状态是RUNNING
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 29: int类型的位数(4字节32位) - 3 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 线程池允许的最大容量, 即低29位全为1, 其二进制为: 00011111 11111111 11111111 11111111 */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

//线程池在运行过程中的一些状态, 如下是从高到低的(RUNNING为负数, 其他为非负数) Start
/** 值为负数, 能接受新提交的任务, 并且也能处理阻塞队列中的任务 */
private static final int RUNNING = -1 << COUNT_BITS;
/** 值为0, 关闭状态, 不再接受新提交的任务, 但可以继续处理阻塞队列中已保存的任务 */
private static final int SHUTDOWN = 0 << COUNT_BITS;
/** 不能接受新的任务, 也不处理队列中的任务, 会中断正在处理任务的线程 */
private static final int STOP = 1 << COUNT_BITS;
/** 所有的任务都已终止, workCount为0 */
private static final int TIDYING = 2 << COUNT_BITS;
/** 在terminated()方法执行完后进入该状态 */
private static final int TERMINATED = 3 << COUNT_BITS;
//线程池在运行过程中的一些状态, 如下是从高到低的(RUNNING为负数, 其他为非负数) End

ThreadPoolExecutor使用一个ctl 变量表示两层含义,低29位表示正在运行的线程数,高3位表示线程池本身的状态。使用相关位操作计算这两个值。其状态有:RUNNING、SHUTDOWN、STOP、TIDYINT、TERMINATED状态,不同的状态有不同的含义,注意,RUNNING是负数,其他状态值都是非负数。可通过上述代码备注知悉,后续在线程池运行原理再详细介绍这些状态。
其实通过一个变量表示多个含义,这在开发中是常见的,这种模式有节省内存、在现代CPU架构中运算速度比乘除要快等优点。
ctl初始的状态是RUNNING状态

还有一些变量可以理解为可定制变量或线程池过程中变量,如下:

/** 阻塞队列 */
private final BlockingQueue<Runnable> workQueue;

/** 主要的锁 */
private final ReentrantLock mainLock = new ReentrantLock();

/** 池子里面的线程任务数, 访问的话需要通过mainLock加锁访问 */
private final HashSet<Worker> workers = new HashSet<Worker>();

/** 条件等待队列 */
private final Condition termination = mainLock.newCondition();

/** 跟踪最大的线程池大小, 访问需要通过mainLock加锁访问 */
private int largestPoolSize;

/** 完成的任务数。当任务线程被终止时(termination)才会更新。访问需要通过mainLock加锁访问 */
private long completedTaskCount;

/** 线程创建工厂 */
private volatile ThreadFactory threadFactory;

/** 拒绝策略 */
private volatile RejectedExecutionHandler handler;

/** 线程等待执行的纳秒, 当超过{@link #corePoolSize}或者{@link #allowCoreThreadTimeOut}字段为true的时候使用此字段 */
private volatile long keepAliveTime;

/** 如果为false(默认为false), 核心的线程数内的线程即使在空闲状态也保持活动; 如果为false, 核心线程数的线程使用keepAliveTime超时 */
private volatile boolean allowCoreThreadTimeOut;

/** 核心线程数 */
private volatile int corePoolSize;

/** 最大线程数 */
private volatile int maximumPoolSize;

/** 默认的拒绝策略, AbortPolicy: 丢弃并抛出RejectedExecutionException异常 */
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

通过备注,我们先理解一下这些变量的部分含义:

  • corePoolSize:核心线程数,当allowCoreThreadTimeOut为false的时候,在这个范围内的线程不会被回收
  • workQueue:阻塞队列,当线程池中运行的线程超过了corePoolSize,此时再运行新的线程将添加到这个阻塞队列中被阻塞,稍后等有线程空闲了再来执行阻塞队列的任务
  • maximumPoolSize:最大的线程数,当线程池中运行的线程数超过了corePoolSize,并且阻塞队列也满了的时候,则会创建新的线程去运行新的任务,此时就需要判断线程池中运行的数量不能超过maximumPoolSize大小,如果超过了,则当前任务会运行失败,会走到拒绝处理器处理
  • threadFactory:线程创建工厂,用户可指定线程的创建规则
  • handler:拒绝处理器,线程池中运行的线程数量超过了maximumPoolSize大小,则会调用拒绝处理
  • keepAliveTime:从阻塞队列获取任务的超时时间
  • allowCoreThreadTimeOut:当线程池中线程运行数量为corePoolSize时,并且没有新的任务运行,这个参数控制是否回收corePoolSize以内的线程

文章到这里,此时可以对ThreadPoolExecutor的一些全局变量有了认识,而对于其含义记不住也没关系,等后续介绍其原理的时候再往里套参数,就可以很容易理解这些参数的具体含义了。

2.2.2 位运算

什么是位运算,以及位运算的运算原理,我这里不做介绍了,具体看 3、位操作 文章。
上述我们介绍ThreadPoolExecutor相关变量的时候,知道变量 ctl 是表示两层含义的,那么具体怎么计算则是通过位运算操作的。

2.2.2.1 计算运行线程数

计算运行线程数,其实就是将高3位清零即可,低29位保持不变,使用函数workerCountOf(int) 函数计算:

/**
 * 计算出workerCount, ctl保存的低29位是workCount
 * @param c 一般是从ctl.get()获取的值
 * @return workerCount
 */
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

CAPACITY通过上述我们知道是 (1 << 29) - 1,二进制位为:00011111 1111111111 1111111111 1111111111。
这里我做一下简单的提示,具体请详细理解 3、位操作 文章。
例如我现在有一个数是2147483671,2147483671的四字节二进制是10000000 00000000 00000000 00010111,计算过程如下:

    10000000 00000000 00000000 00010111
&   00011111 11111111 11111111 11111111
 =  00000000 00000000 00000000 00010111

通过计算发现高3位全置为0了,低29位保持不变,计算的结果就是运行的线程数了。

2.2.2.2 计算线程池运行状态

通过计算运行线程数我们其实就知道怎么计算运行状态了,其实就是将低29位置为0,高三位保持不变,使用函数runStateOf(int) 计算:

/**
 * 计算出runState, ctl保存的高3位是runState
 * @param c 一般是从ctl.get()获取的值
 * @return runState
 */
private static int runStateOf(int c) {
    return c & ~CAPACITY;//将低29位置为0, 高3位就是其状态值
}

2.2.2.3 计算运行线程数和运行状态的十进制数

通过两个数还原成一个数,如果阅读过了 3、位操作 文章就可以理解了,其实就是使用按位或运算即可,使用函数ctlOf(int, int) 来计算:

/**
 * runState与workCount的十进制
 * @param rs runState
 * @param wc workCount
 * @return 十进制数
 */
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

2.2.3 线程池状态

通过上文我们也知道ThreadPoolExecutor线程池有五个状态,分别是:RUNNING、SHUTDOWN、STOP、TIDYINT、TERMINATED,其中只有RUNNING是负数,其他状态是非负数。STOP和TIDYINT可以理解为中间状态,SHUTDOWN状态表示不接受新的任务,但是会将当前线程池的任务,即阻塞队列的任务全部运行完成 才会最终变为TERMINATED状态。
各种状态控制的功能也不尽相同,我们稍后在看execute执行原理解析的时候再详细介绍这些状态的作用。

2.2.4 构造函数

ThreadPoolExecutor有多个构造函数,但是所有的构造函数最终都指向同一个构造函数:

/**
 * 构造器
 * @param corePoolSize    核心线程数
 * @param maximumPoolSize 最大线程数
 * @param keepAliveTime   超时时间
 * @param unit            时间单位
 * @param workQueue       阻塞队列
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}

/**
 * 构造器
 * @param corePoolSize    核心线程数
 * @param maximumPoolSize 最大线程数
 * @param keepAliveTime   超时时间
 * @param unit            时间单位
 * @param workQueue       阻塞队列
 * @param threadFactory   线程创建工厂
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}

/**
 * 构造器
 * @param corePoolSize    核心线程数
 * @param maximumPoolSize 最大线程数
 * @param keepAliveTime   超时时间
 * @param unit            时间单位
 * @param workQueue       阻塞队列
 * @param handler         拒绝策略
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}

/**
 * 构造器
 * @param corePoolSize    核心线程数
 * @param maximumPoolSize 最大线程数
 * @param keepAliveTime   超时时间
 * @param unit            时间单位
 * @param workQueue       阻塞队列
 * @param threadFactory   创建线程工厂
 * @param handler         拒绝策略
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    //判断corePoolSize、maxPoolSize、keepAliveTime不能小于0并且maxPoolSize不能小于corePoolSize, 否则抛出异常 Start
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    //判断corePoolSize、maxPoolSize、keepAliveTime不能小于0并且maxPoolSize不能小于corePoolSize, 否则抛出异常 End
    //判断阻塞队列、创建工厂、拒绝策略不能为空, 否则抛出异常 Start
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    //判断阻塞队列、创建工厂、拒绝策略不能为空, 否则抛出异常 End

    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();//安全策略

    //赋值 Start
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    //赋值 End
}

构造函数就是对上述的全局变量赋值,这里的ThreadFactory和RejectedExecutionHandler可使用默认的,对于默认的线程创建工厂和拒绝处理器可自行查看源码。

2.3 execute执行原理解析

execute(Runnable) 方法是核心的执行方法,像submit(Runnable, T)等都是通过执行execute(Runnable) 完成异步操作的,execute(Runnable) 代码如下:

/**
 * 执行给定的线程
 * @param command 线程
 */
public void execute(Runnable command) {
    //判断参数不能为空 Start
    if (command == null)
        throw new NullPointerException();
    //判断参数不能为空 End

    /*
     * 这里处理主要分成三步:
     * 1、如果运行的线程小于corePoolSize, 尝试启动一个新的线程来执行。调用addWork()方法会原子检查runState和workerCount,
     * 这样可以防止错误, 在不应该添加线程的情况下添加;
     * 2、如果任务可以成功放入队列, 然后我们依然需要检查一遍是否应该添加一个线程(因为自上次检查以后可能有已经销毁的线程了)或者
     * 自进入这个方法后线程池已经关闭了。所以我们需要重新检查状态并且在必要时回滚队列。或者启动新的线程
     * 3、如果我们不能将任务放入队列的话, 我们会尝试添加一个新的线程。如果失败, 我们知道线程已经shutdown或者饱和, 所以会拒绝这个任务
     */
    int c = ctl.get();

    //运行的线程小于corePoolSize的情况 Start
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))//添加任务成功并启动则直接退出
            return;
        c = ctl.get();//添加任务失败(可能在其他地方已经添加任务成功了, 导致workerCount超过了corePoolSize)
    }
    //运行的线程小于corePoolSize的情况 End

    if (isRunning(c) && workQueue.offer(command)) {//线程池是RUNNING状态并且存放阻塞队列成功
        int recheck = ctl.get();//再次检查状态
        if (!isRunning(recheck) && remove(command))//如果线程池不是RUNNING并且从阻塞队列中移除成功则走拒绝操作
            reject(command);
        else if (workerCountOf(recheck) == 0)//如果没有运行的线程则调用addWorker添加
            addWorker(null, false);
    } else if (!addWorker(command, false))//如果添加失败则走拒绝操作
        reject(command);
}

通过上述备注,我们已经很清楚这个方法的大概流程,大致流程如下所示:
ThreadPoolExecutor2.3-1

结合上述流程图再去理解execute(Runnable) 方法就很清晰了,判断是否是RUNNING状态的方法 isRunning(int) 代码如下:

/**
 * 判断是否是运行状态
 * @param c 状态值
 * @return true: RUNNING
 */
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

只有RUNNING状态的是负数,SHUTDOWN状态是0,所以如果小于SHUTDOWN的话则证明是RUNNING状态的。

而上述的核心方法是调用addWorker(Runnable, boolean) 方法,在这个方法里面会有创建线程并启动的逻辑,并且通过上述代码我们知道第一个Runnable形参可能是null,第二个形参boolean表示当前线程与corePoolSize还是maximumPoolSize进行大小判断,所以我们暂时可以带着第一个形参是否为空的问题来阅读addWorker(Runnable, boolean) 方法:

/**
 * 添加任务并启动任务。这个方法是核心的处理方法。
 * 1、首先起两个无限循环, 这两个无限循环主要是对线程池状态进行处理的:
 *  1.1 第一层无限循环, 判断线程池的runState状态, 主要做的就是判断当前的runState是不是已经超过了SHUTDOWN状态, 如果是的话继续判断如果是SHUTDOWN
 *  状态的话, 那么判断阻塞队列是否为空:
 *      1.1.1 如果runState状态是STOP、TIDYING、TERMINATED的话则直接return false
 *      1.1.2 如果runState是SHUTDOWN状态, 并且阻塞队列为空的话, 那么也进行return false。因为如果是SHUTDOWN状态的话, 是需要处理阻塞队列
 *      里面的任务的, 所以这里做了这个判断操作
 *  1.2 第二层无限循环, 主要是判断线程池的核心数量或者最大数量, 如果当前workerCount超过了核心数量或者最大数量(通过core参数)的话则
 *  直接return false(线程池已满, 由{@link #execute(Runnable)}方法继续处理); 如果workerCount没有超过指定数据的话, 那么对
 *  workerCount + 1操作(CAS), 由于CAS可能失败, 所以这里通过第二层无限循环保证CAS操作成功, 成功了则break 外层循环
 * 2、创建一个{@link Worker}对象, 并将firstTask放入, 然后再次判断当前的runState状态:
 *  2.1 如果是RUNNING状态则添加到运行线程池中, 并start操作。start操作主要是调用{@link #runWorker(Worker)}
 *  2.2 如果状态是SHUTDOWN状态的话, 那么其实任务是从{@link #runWorker(Worker)} -> {@link #processWorkerExit(Worker, boolean)}
 *  进入的, 而此时的firstTask是为空的, 如果firstTask是为空的话, 那么也可以正常添加并start。如果firstTask不为空的话, 那么证明线程是从
 *  {@link #execute(Runnable)}进入的(在调用{@link #execute(Runnable)}时没有SHUTDOWN, 但是此时已经SHUTDOWN了)所以不再运行新的任务
 * 3、最后调用Thread.start()运行线程, 即最终会调用到{@link #runWorker(Worker)}方法中
 * @param firstTask 任务线程
 * @param core      是否是核心线程
 * @return 是否添加成功
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    /*
     * 启用两个无限循环检查ctl计算出来的runState的状态。只有在以下几种情况可以结束循环或者退出方法:
     * 1、如果线程池处于SHUTDOWN及更高的级别的话, 则直接return false
     * 2、如果core为true的话, 当线程数超过了corePoolSize则直接return false; 如果core为false的话, 则线程数超过了maxPoolSize的话, 也直接return false
     * 3、通过CAS方式对线程数自增1, 如果成功则跳出循环
     * 所以说这两个循环只要是对runState和workerCount的检测
     */
    //跳出循环的标志
    retry:
    for (; ; ) {//无限循环
        int c = ctl.get();//ctl的值
        int rs = runStateOf(c);//计算出运行状态runState

        /*
         * 这个判断是很重要的:
         * 1、判断线程池的状态是否是SHUTDOWN之后的状态, 如果是SHUTDOWN(包含SHUTDOWN、STOP、TIDYING和TERMINATED)则进一步判断;
         * 如果是SHUTDOWN以前(只有RUNNING)则不会进行return操作
         * 2、如果状态是SHUTDOWN状态, 并且阻塞队列不为空的话, 那么证明需要处理阻塞队列的任务, 也不会进行return操作。
         * 所以从这里可以看出, 如果是SHUTDOWN状态的话, 是需要处理阻塞队列中的任务的
         */
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (; ; ) {//再无限循环
            int wc = workerCountOf(c);//计算出workerCount
            //如果线程数已经超过了指定的线程数(core为true则超过corePoolSize, 否则超过maxPoolSize)的话也直接返回添加失败 Start
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //如果线程数已经超过了指定的线程数(core为true则超过corePoolSize, 否则超过maxPoolSize)的话也直接返回添加失败 End

            //如果通过CAS改将ctl的值改变成功(+1)则跳出循环 Start
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //如果通过CAS改将ctl的值改变成功(+1)则跳出循环 End

            c = ctl.get();//重新读取ctl的值
            //如果重新读取的ctl的值的runState与外层循环的runState不一致的话, 则跳出当前循环, 继续外层循环 Start
            if (runStateOf(c) != rs)
                continue retry;
            //如果重新读取的ctl的值的runState与外层循环的runState不一致的话, 则跳出当前循环, 继续外层循环 End
        }
    }

    boolean workerStarted = false;//任务线程是否启动
    boolean workerAdded = false;//任务线程是否添加
    Worker w = null;//任务
    try {
        w = new Worker(firstTask);//创建一个任务
        final Thread t = w.thread;//线程, 即Worker对象本身
        if (t != null) {//理论情况不会为空(为空可能是通过线程工厂创建的线程为空)
            //这里加锁的目的是最后计算线程池的相关状态 Start
            final ReentrantLock mainLock = this.mainLock;//锁
            mainLock.lock();//加锁
            try {
                /*
                 * 通过加锁再次检查: 避免在ThreadFactory创建线程时出现故障或者在获得锁之前shutdown
                 */
                int rs = runStateOf(ctl.get());

                /*
                 * 这里再次判断状态是否为RUNNING或者是SHUTDOWN, 因为是SHUTDOWN状态的话, 是不在接收新的任务, 即firstTask会为空,
                 * 而是从runWorker -> processWorkerExit()调用的当前方法, 此时firstTask是为null的
                 */
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {//线程状态为RUNNING或者线程是SHUTDOWN状态并且传递的线程为空
                    //检查线程是否已经激活了, 如果已经激活的话则直接抛出异常 Start
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //检查线程是否已经激活了, 如果已经激活的话则直接抛出异常 End

                    workers.add(w);//添加到HashSet集合中
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;//添加到HashSet成功
                }
            } finally {
                mainLock.unlock();//释放锁
            }
            //这里加锁的目的是最后计算线程池的相关状态 End

            //添加成功则启动线程, 此时这个t其实是一个Worker, 所以运行的是Worker.run()方法 Start
            if (workerAdded) {
                t.start();//启动线程
                workerStarted = true;//启动状态置为true
            }
            //添加成功则启动线程, 此时这个t其实是一个Worker, 所以运行的是Worker.run()方法 End
        }
    } finally {
        //最后检查是否启动成功, 没有启动成功则运行addWorkerFailed()方法处理 Start
        if (!workerStarted)
            addWorkerFailed(w);
        //最后检查是否启动成功, 没有启动成功则运行addWorkerFailed()方法处理 End
    }
    return workerStarted;
}

addWorker(Runnable, boolean) 方法表示将任务添加到线程中启动,方法一开始是判断状态,通过 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) 代码判断如果超过了SHUTDOWN的值,即当前状态可能是SHUTDOWN、STOP、TIDYINT、TERMINATED状态,并且判断如果是SHUTDOWN状态的话,阻塞队列不为空则退出方法,并返回false,这里我们可以得知,当线程池是状态SHUTDOWN时,则不接受新的任务,并且需要消费完阻塞队列中所有的任务(暂时不会终止线程池);在第二层for循环中,我们可以看出addWorker(Runnable, boolean) 第二个形参的作用,如果是true的话,则将当前线程池的任务数量与corePoolSize做对比,否则与maximumPoolSize做对比,并且在第二层for循环调用compareAndIncrementWorkerCount(int) 方法对workerCount进行CAS增加1的操作,如果CAS操作成功则退出双层for循环。compareAndIncrementWorkerCount(int) 代码如下:

/**
 * 尝试以CAS方式改变ctl的值。这里主要是增加workerCount值(workCount+1)
 * @param expect 希望的值
 * @return true: 更新成功; false: 更新失败
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

接下来就是创建Worker对象,并将Worker对象持有的线程启动,那么接下来我们看Worker对象是什么。

2.3.1 Worker实例

Worker对象是ThreadPoolExecutor的内部类,表示一个任务,其继承关系如下:
ThreadPoolExecutor2.3.1-1

可以看出Worker不仅是一个Runnable子类,也是AbstractQueuedSynchronizer的子类,所以其本身就具有锁的功能。所以此时我们就可以对Worker对象进行解析:

  • Worker对象是怎么作为线程启动的
  • Worker对象是怎么获取锁以及释放锁的

Worker对象的全局变量如下:

/** 线程 */
final Thread thread;
/** 要运行的初始任务, 可能为空 */
Runnable firstTask;
/** 完成任务数 */
volatile long completedTasks;

2.3.1.1 Worker中的线程

通过上述的全局变量我们可以知道,Worker持有Thread的引用,即相当于一个线程类,在上文addWorker(Runnable, boolean) 方法就是启动这个线程的,那么我们首先看它的构造过程,Worker只有一个构造函数:

/**
 * 构造器
 * @param firstTask 要运行的初始任务
 */
Worker(Runnable firstTask) {
    setState(-1); //默认状态
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker在初始化的时候,首先是将AQS的state指定为-1,至于为什么是-1,等稍后我们看runWorker运行任务时的流程就知道了。从这里可以看出使用了线程创建的工厂来创建线程,并将this作为其任务运行,我们知道Thread启动的时候是调用Runnable的run()方法,所以我们追踪到run() 方法:

@Override
public void run() {
    runWorker(this);
}

run() 方法主要将功能委托给了ThreadPoolExecutor.runWorker(Worker) 方法了,稍后我们详解这个方法。

2.3.1.2 加锁解锁逻辑

Worker使用的是独占锁的逻辑,所以其加锁的逻辑我们主要看 tryAcquire(int) 方法,解锁的逻辑看 tryRelease(int) 方法(至于为什么看这两个方法,具体请看 1、AbstractQueuedSynchronizer原理分析 文章)。

/**
 * 加锁.只有无锁时才会加锁成功
 * @param unused 锁计数
 * @return true: 加锁成功; false: 加锁失败
 */
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {//只有无锁时才能加锁成功
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
/**
 * 释放锁。这里不抛异常的话就会成功
 * @param unused 锁计数
 * @return true/false
 */
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

加锁和解锁逻辑都很简单,加锁主要是将AQS的state从值为0变为1,CAS成功则表示加锁成功,否则表示加锁失败;解锁是直接将state值置为0后返回成功
需要注意一点的是,我们知道初始化时state值为-1,如果一开始就对Worker加锁的话,是会加锁失败的,所以接下来我们看runWorker运行任务的逻辑具体是怎么实现的,就明白了。

2.3.2 runWorker运行任务

runWorker(Worker) 代码如下:

/**
 * 运行任务线程。注意, 这里是一个while循环获取, 在while循环里面:
 * 1、判断Worker.firstTask线程, 如果可以获取到线程, 那么会调用firstTask.run()方法; 最终会在finally中将firstTask置为null, 继续循环;
 * 2、如果firstTask为空, 即获取不到线程, 那么会调用{@link #getTask()}方法去获取线程。而在这个方法里面就会保持核心线程数,
 * 具体请看: {@link #getTask()}备注。如果此时还是获取不到的话, 那么就会到finally方法, 调用{@link #processWorkerExit(Worker, boolean)}处理
 * @param w Work
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();//当前线程
    Runnable task = w.firstTask;//要运行的任务
    w.firstTask = null;//置为空
    w.unlock();//先释放锁
    boolean completedAbruptly = true;//是否是突然完成(比如没有进入try代码块, 类似突然终止线程)
    try {
        //while循环(可能会阻塞), 主体逻辑 Start
        while (task != null || (task = getTask()) != null) {
            w.lock();//加锁
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);//run()方法之前执行
                Throwable thrown = null;
                try {
                    task.run();//调用firstTask.run()方法
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);//run()方法之后执行
                }
            } finally {
                task = null;
                w.completedTasks++;//完成的任务数加
                w.unlock();//释放锁
            }
        }
        //while循环(可能会阻塞), 主体逻辑 End
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);//退出处理
    }
}

首先是调用Worker.unlock() 逻辑,其实就是将其state值置为0,后面使用加锁逻辑就可以加锁成功了。接着使用一个while循环,循环条件是任务不能为空,我们知道这里的task是可能为空的,如果firstTask为空则从调用getTask()接口从阻塞队列中获取任务,对于阻塞队列我们知道 (具体可以阅读 7、BlockingQueue之ArrayBlockingQueue源码解析 文章),如果阻塞队列为空的话,来获取数据则会被阻塞。稍后我们阅读getTask() 方法逻辑。
然后就是对状态进行判断,判断完状态之后就是真正的执行用户的任务了:

  • 加锁
  • 调用beforeExecute(Thread, Runnable) 方法前置处理
  • 调用用户的任务的run()方法
  • 执行afterExecute(Runnable, Throwable)方法后置处理
  • 解锁

运行任务的时候,有两个空方法,即beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) 方法,这两个方法的设计目的是为了扩展的,可以方便用户对运行任务时的扩展。如果从while循环中出来的话,最后就会调用processWorkerExit(Worker, boolean) 方法结束当前线程的生命周期,即会结束Worker对象的生命周期。
runWorker(Worker) 方法,对任务的获取有两种:

  • 用户调用execute(Runnable) 等方法传递的Runnable
  • 调用addWorker(Runnable, boolean) 方法时第一个形参Runnable为空,则从阻塞队列中获取

第一种情况,我们通过execute(Runnable) 方法知道,如果是用户调用的话是不可能是为空的,那么就是第二种程序本身调用addWorker(Runnable, boolean) 方法时传递的第一个形参为空。如果Runnable为空的话,就会进入getTask() 方法获取任务来执行。

2.3.3 getTask从阻塞队列获取任务

  • getTask() 方法代码如下:
/**
 * 获取线程任务。注意这里, 是阻塞获取的, 也是保持核心线程的一个重要方法, 该方法主要是在{@link #addWorker(Runnable, boolean)}的while循环条件中调用的:
 * 1、首先判断线程池的runState状态, 如果runState状态>=SHUTDOWN状态, 并且runState>=STOP状态或者阻塞队列为空的话, 那么证明线程池已经
 * 非RUNNING了, 所以判断了阻塞队列是否为空, 如果为空的话则return null, 即不再处理任务了。但是如果是SHUTDOWN状态, 并且队列不为空的话则会
 * 继续往下走代码逻辑
 * 2、通过两个参数控制超时的动作: {@link #allowCoreThreadTimeOut} 参数控制是否超时(默认为false) 或者 workerCount是否超过了核心线程数
 *  2.1 如果workerCount超过了最大线程数, 那么线程池不应该处理了, 所以返回null。由{@link #addWorker(Runnable, boolean)}方法可知, 如果
 *  getTask()也返回null的话, 则会调用{@link #processWorkerExit(Worker, boolean)}处理
 * 3、从阻塞队列中获取线程任务, 如果由步骤[2]获取到的参数是true的话, 那么则使用queue.poll(time, unit)获取, 如果超时获取不到则返回null;
 * 如果由步骤[2]获取到的参数是false的话, 那么则使用queue.take()获取, 永远阻塞获取。所以从这里我们可以得知几个信息:
 *  3.1 如果{@link #allowCoreThreadTimeOut}为true或者workerCount超过了核心线程数的话, 调用阻塞队列是可能超时的, 超时返回null; 由
 *  {@link #addWorker(Runnable, boolean)}可知, 返回null之后调用了{@link #processWorkerExit(Worker, boolean)}会销毁线程。即如果
 *  {@link #allowCoreThreadTimeOut}为true的话, 核心线程是可以被销毁的; 为false的话, 核心线程是不会被销毁的, 因为以一直阻塞直到拿到线程(也就是
 *  说不会调用{@link #processWorkerExit(Worker, boolean)}方法)
 *  3.2 如果{@link #allowCoreThreadTimeOut}为false的话(默认为false), 则核心线程数不会被销毁; 但是超过了核心线程数的线程, 即:
 *  workerCount在 (corePoolSize, maxPoolSize)之间的话, 则会销毁超过了maxPoolSize的线程(因为调用的是poll(time, unit)方法), 超时
 *  返回null, 然后在{@link #addWorker(Runnable, boolean)} 中获取不到的时候会调用{@link #processWorkerExit(Worker, boolean)}
 *  销毁线程的处理
 * @return 任务线程
 */
private Runnable getTask() {
    boolean timedOut = false; //从阻塞队列中获取任务是否超时

    for (; ; ) {//一直循环
        int c = ctl.get();
        int rs = runStateOf(c);//runState状态

        //检查线程是否已经shutdown或者阻塞队列是否为空, 如果是的话则返回空 Start
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //检查线程是否已经shutdown或者阻塞队列是否为空, 如果是的话则返回空 End

        int wc = workerCountOf(c);//当前的线程数

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //如果当前线程已经超过了maxPoolSize或者已经超时并且队列是空的则继续循环 Start
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))//对workerCount递减成功则直接返回空
                return null;
            continue;
        }
        //如果当前线程已经超过了maxPoolSize或者已经超时并且队列是空的则继续循环 End

        try {
            //从阻塞队列中获取线程 Start
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            //从阻塞队列中获取线程 End
            timedOut = true;//未获取到则超时
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

在这个方法中,主要是通过阻塞队列获取任务,注意代码:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

这一行代码是获取一个boolean变量timed,如果为true的话,调用的是BlockQueue.poll(long, TimeUnit) 方法,否则调用的是BlockQueue.take() 方法,第一个方法是阻塞指定长的时间如果队列还是空则返回空,第二个方法是无限期的阻塞,直到队列有数据进来后被唤醒。如果返回为空,通过上文runWorker(Worker) 的while循环得知,会就退出while循环,进入processWorkerExit方法处理结束Worker生命周期。所以说allowCoreThreadTimeOut参数可以控制线程池核心线程数以内的线程是否可以被回收的能力。那么processWorkerExit(Worker, boolean) 方法具体怎么处理线程生命周期的,我们往下看。

2.3.4 processWorkerExit结束Worker生命周期

processWorkerExit(Worker, boolean) 代码如下:

/**
 * 当任务执行完成时的退出处理。该方法是在finally中调用的, 所以一定会走到这里。是在{@link #runWorker(Worker)}的finally中调用的
 * @param w 任务
 * @param completedAbruptly 是否是突然完成(比如未走try代码块, 类似强制停机这种), 即用户的真实任务未运行
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果是突然完成的, 那么workerCount未调整, 所以这里对workerCount做递减操作 Start
    if (completedAbruptly)
        decrementWorkerCount();
    //如果是突然完成的, 那么workerCount未调整, 所以这里对workerCount做递减操作 End

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;//完成的任务数
        workers.remove(w);//删除任务
    } finally {
        mainLock.unlock();
    }

    tryTerminate();//看看线程池状态是否是终止, 如果是终止的话则改变相关状态

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {//RUNNING或者SHUTDOWN状态
        /*
         * 这个if主要做的是通过allowCoreThreadTimeOut参数设置是否为true, 如果为true的话(上面workers.remove()已经销毁了线程), 则直接退出。即销毁了核心线程数以内的线程;
         * 如果为false的话, 会创建一个新的Worker, 但是这个Worker的firstTask是null, 所以保持一个空的线程放到核心的池子中
         */
        if (!completedAbruptly) {//非突然停止
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);//添加一个空的线程放到池子中
    }
}

这里我们可以看到,allowCoreThreadTimeOut控制着核心线程的生命周期

  • allowCoreThreadTimeOut为true的话,则会直接return,即Worker对象的生命周期结束,也就是线程池销毁了一个线程
  • 如果为false,则判断当前线程的数量是否大于等于corePoolSize的大小,如果大于等于corePoolSize,那么也会结束当前Worker对象的生命周期,这种情况就是销毁超过corePoolSize的大小,其实也就是销毁(corePoolSize, maximumPoolSize] 之间的线程;如果当前线程的数量没有超过corePoolSize大小,那么就调用addWorker(null, false) 方法进行递归,而addWorker(Runnable, boolean) 方法我们在上文已经介绍过了,这里就是添加一个空闲线程,其任务为空,等待有新的任务进入而使用这个Worker线程运行任务

程序读到这里,其实对于线程池运行线程逻辑、管理线程的原理已经结束了。

2.4 拒绝处理器

上述execute(Runnable) 方法我们知道,在一些情况下会进入拒绝处理:

  • 线程数已经超过了corePoolSize,并且存放阻塞队列成功,但是此时线程池停机了,并且从阻塞队列移除任务成功,则进入reject(Runnable) 拒绝处理
  • 线程数已经超过了corePoolSize,但是此时线程池停机了或者说存放阻塞队列失败了(比如阻塞队列已满),最后调用addWorker(Runnable, boolea) 方法失败(比如当前线程超过了maximumPoolSize大小),也会进入reject(Runnable) 拒绝处理。

reject(Runnable) 代码如下:

/**
 * 拒绝操作。主要是调用RejectExecutionHandler.rejectedExecution方法进行操作
 * @param command 线程
 */
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

这里将功能交给了RejectedExecutionHandler做处理,而通过上述的全局变量可以得知,如果我们没有传递handler参数的话,则会使用AbortPolicy作为默认的RejectedExecutionHandler进行拒绝处理。
我们首先看一下RejectedExecutionHandler接口:

/**
 * 线程池中的任务不能运行时的处理器
 */
public interface RejectedExecutionHandler {

    /**
     * 处理
     * @param r 线程
     * @param executor 线程池
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

RejectedExecutionHandler只定义了一个方法,用于拒绝时调用。而在JDK中默认为我们提供了4中拒绝处理器,我们也可以实现RejectedExecutionHandler接口定义自己的拒绝处理器。
ThreadPoolExecutor2.4-1

2.4.1 AbortPolicy终止处理

AbortPolicy也是ThreadPoolExecutor默认的拒绝处理器:

/**
 * 丢弃任务并抛出异常
 */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * 构造器
     */
    public AbortPolicy() {
    }

    /**
     * 抛出异常{@link RejectedExecutionException}
     * @param r 线程
     * @param e 线程池
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " +
                e.toString());
    }
}

AbortPolicy会抛出RejectedExecutionException异常,此时这个任务也就会被终止。

2.4.2 DiscardPolicy丢弃处理

DiscardPolicy表示直接丢弃,不做任何处理:

/**
 * 直接丢弃任务, 什么都不做
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * 构造器
     */
    public DiscardPolicy() {
    }

    /**
     * 不做任何操作
     * @param r 线程
     * @param e 线程池
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

不做任何处理,通过上述的execute(Runnable) 方法原理我们可以得知,当前用户传递的Runnable没有被运行就直接退出方法了。

2.4.3 CallerRunsPolicy普通任务处理

CallerRunsPolicy会被当做普通任务运行:

/**
 * 如果线程池是RUNNING状态的, 则执行run()方法
 */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * 构造器
     */
    public CallerRunsPolicy() {
    }

    /**
     * 处理
     * @param r 线程
     * @param e 线程池
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();//运行run()方法
        }
    }
}

CallerRunsPolicy会调用Runnable.run()方法运行,注意,这里不是在线程池中运行,而是调用execute(Runnable) 方法的线程运行(比如说主线程),并且只是作为一个普通的函数运行,并非启动线程运行,所以对于长时任务会阻塞调用线程

2.4.4 DiscardOldestPolicy丢弃最早的任务并运行当前任务处理

DiscardOldestPolicy表示丢弃阻塞队列中最早的任务,并将当前任务作为一个线程运行:

/**
 * 当线程池是RUNNING状态时, 从阻塞队列中获取一个最早放入阻塞队列的线程并运行该线程
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * 构造器
     */
    public DiscardOldestPolicy() {
    }

    /**
     * 处理
     * @param r 线程
     * @param e 线程池
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

注意,这里的Runnable并不会立马被执行,首先将阻塞队列获取最早的任务并丢弃,然后执行execute(Runnable) 方法,我们知道这个方法处理就是如果线程数超过了corePoolSize,依然会将Runnable存放到阻塞队列中,等待有空闲线程再来处理阻塞队列中的任务。

2.5 重置参数

我们上文提到了,线程池有很多参数,比如corePoolSize、maximumPoolSize等,在线程池运行之后,我们依然可以对这些参数进行重新赋值,即调用相关的setXX(XX) 方法。这里我们主要介绍两个:setCorePoolSize和setMaximumPoolSize方法,而其他的一些方法可自行阅读源码。

2.5.1 setCorePoolSize设置核心线程数

setCorePoolSize(int) 代码如下:

/**
 * 重置核心线程数大小
 * @param corePoolSize 新的核心线程数大小
 */
public void setCorePoolSize(int corePoolSize) {
    //不能小于0 Start
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    //不能小于0 End

    int delta = corePoolSize - this.corePoolSize;//新的核心线程数与旧的核心线程数的差值
    this.corePoolSize = corePoolSize;//赋值
    if (workerCountOf(ctl.get()) > corePoolSize)//如果当前线程池持有的线程数量超过了新的核心线程数的大小
        interruptIdleWorkers();//中断当前线程池的所有线程
    else if (delta > 0) {//新的线程数大于旧的线程数, 则需要创建新的线程去处理
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {//创建新的线程
            if (workQueue.isEmpty())
                break;
        }
    }
}

设置核心线程数的时候,需要考虑几个问题:

  • 当前线程池保持的核心线程数超过了新的核心线程数怎么办
  • 如果新的核心线程数超过了原来的核心线程数的时候怎么办

而对于上述第一个问题,我们可以对线程做中断处理,但是中断处理的时候需要注意如果已经在运行的线程不应当中断;而第二个问题,在else if(delta > 0){} 代码块中已经体现,就是调用addWorker(null. true) 方法添加空闲线程等待。

interruptIdleWorkers(boolean) 中断代码如下:

/**
 * 将等待的线程的状态置为中断。如果线程已经在运行的话则不作中断操作
 * @param onlyOne 是否只中断一个
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加锁
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {//线程未中断并且加锁成功(表示线程没有在运行)
                try {
                    t.interrupt();//中断
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)//如果只中断一个的话, 直接退出循环
                break;
        }
    } finally {
        mainLock.unlock();//释放锁
    }
}

注意这里,会通过Worker.tryLock() 方法尝试加锁,而上文runWorker(Worker) 方法我们可以得知,当获取到任务后,就会对Worker加锁,如果runWorker(Worker)加锁成功,那么此处就不会加锁成功,所以此处就不会对Worker进行销毁。

2.5.2 setMaximumPoolSize设置最大线程数

setMaximumPoolSize(int) 代码如下:

/**
 * 设置最大线程数
 * @param maximumPoolSize 最大线程数
 */
public void setMaximumPoolSize(int maximumPoolSize) {
    //参数校验 Start
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    //参数校验 End

    this.maximumPoolSize = maximumPoolSize;//赋值
    if (workerCountOf(ctl.get()) > maximumPoolSize)//当前运行的线程超过了最大线程数, 则中断
        interruptIdleWorkers();//中断所有线程
}

与上述setCorePoolSize(int) 比较类似,当前运行的线程超过了新的maximumPoolSize大小,则调用interruptIdleWorkers() 做中断处理:

/**
 * 中断所有的线程
 */
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

最终调用的也是interruptIdleWorkers(boolean)

2.6 关闭线程池

关闭线程池有两个方法:

  • shutdown():将线程池的状态置为SHUTDOWN,不接收新的任务,将当前线程池和阻塞队列所有的任务运行完
  • shutdownNow():将线程池的状态置为STOP,中断所有任务,清空阻塞队列
    两个方法的代码分别如下:
/**
 * 停止线程, 这里启动有序停止, 执行以前提交的任务, 但不接受新任务。如果重复调用这个方法, 则没有用其他效果。
 * 这个方法并不会等待以前的提交的任务, 而是使用{@link #awaitTermination(long, TimeUnit)}去做的
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加锁
    try {
        checkShutdownAccess();//检查权限
        advanceRunState(SHUTDOWN);//将状态改为SHUTDOWN
        interruptIdleWorkers();//中断所有等待的线程
        onShutdown(); //onShutdown, 空方法, 留给子类覆写
    } finally {
        mainLock.unlock();//释放锁
    }
    tryTerminate();
}
/**
 * 尝试停止所有正在执行的任务、停止正在等待的任务, 并返回正在等待任务的任务列表。
 * 这个方法并不会等待以前的提交的任务, 而是使用{@link #awaitTermination(long, TimeUnit)}去做的
 * @return 等待任务的任务列表
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//检查权限
        advanceRunState(STOP);//将状态改为STOP
        interruptWorkers();//中断
        tasks = drainQueue();//清除阻塞队列中所有的线程
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

两个方法的执行逻辑整体上差不多:

  • 检查权限
  • 调用advanceRunState(int) 更新ThreadPoolExecutor的状态,shutdown()传递的是SHUTDOWN, shutdownNow()传递的是STOP
  • 中断线程。shutdown()调用的是interruptIdleWorkers() 方法(上文已介绍);而shutdownNow()调用的是interruptWorkers() 方法,并且会调用drainQueue()清空阻塞队列。
  • 调用tryTerminate()方法

2.6.1 advanceRunState更新状态

advanceRunState(int) 代码如下:

/**
 * 将runState状态改变, 通常是改变为SHUTDOWN或者STOP。如果要改成TIDYING或者TERMINATED的话需要调用{@link #tryTerminate()}方法
 * @param targetState 状态.SHUTDOWN或者STOP
 */
private void advanceRunState(int targetState) {
    for (; ; ) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

直接使用自旋对runState做CAS操作。shutdown()传递的是SHUTDOWN状态,通过上述线程池的运行原理我们可以得知,如果是SHUTDOWN状态的话,则会运行完阻塞队列所有的任务才会终止线程池;shutdownNow() 传递的是STOP,而如果是SHUTDOWN状态以上的话(包括STOP、TIDYINT、TERMINATED)则就会直接停止

2.6.2 interruptWorkers终止线程(包括正在运行的任务)

shutdownNow() 调用的方法是interruptWorkers() 方法:

/**
 * 将所有的线程置为中断, 不管线程是否已经启动都会中断
 */
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();//线程启动的情况也中断
    } finally {
        mainLock.unlock();
    }
}

Worker.interrtuptIfStarted() 代码如下:

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

表示不管线程是不是正在运行任务,都直接中断。

2.6.3 drainQueue清空阻塞队列

drainQueue() 代码如下:

/**
 * 清除阻塞队列中的所有线程并放到集合中返回
 * @return 阻塞队列中所有的线程
 */
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

将阻塞队列的所有任务移除。

2.6.4 tryTerminate终止操作

tryTerminate() 代码如下:

/**
 * 尝试终止线程池, 这里起一个无限循环, 具体流程如下:
 * 1、如果当前线程池的状态是RUNNING的话, 那么就直接return, 不做终止操作
 * 2、如果当前状态是超过了TIDYING状态的话, 那么证明当前线程池的状态已经是TERMINATED状态了, 也不用再操作了, 直接return
 * 3、如果当前状态是SHUTDOWN状态, 因为SHUTDOWN状态是需要将阻塞队列中的任务给运行完成, 所以这里如果阻塞队列不为空的话, 也需要return操作(不做终止操作),
 * 然后通过{@link #runWorker(Worker)}的while循环从任务队列取任务, 直到阻塞队列为空获取不到。然后会进入{@link #processWorkerExit(Worker, boolean)}
 * 调用当前方法进行终止操作
 * 4、如果没有进入上述三个步骤, 那么表示是可以终止的, 然后将runState状态置为TIDYING, 如果通过CAS改变成功的话,
 * 那么调用{@link #terminated()}方法, 该方法是空方法, 留给子类覆写的
 * 5、最后将线程池的状态置为TERMINATED, 线程池终止
 */
final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();//ctl状态值
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            /*
             * 如果是运行状态或者已经是TIDYING状态或者是SHUTDOWN状态并且阻塞队列不为空的话则直接return
             * 1、运行(RUNNING)状态就不做终止处理了
             * 2、如果是TIDYING以上的话, 那么说明状态已经是TERMINATED, 也是需要return退出
             * 3、如果是SHUTDOWN状态, 因为SHUTDOWN状态需要处理队列中的任务, 所以判断队列不为空的话也不做terminate处理。而是继续在getTask
             * and task.run操作(循环, 具体可以看addWorker()操作)
             */
            return;
        if (workerCountOf(c) != 0) { //当前有任务在运行, 直接中断
            interruptIdleWorkers(ONLY_ONE);//中断线程
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//将状态置为TIDYING
                try {
                    terminated();//空方法, 留给子类覆写
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));//最后将状态置为TERMINATED
                    termination.signalAll();
                }
                return;//退出
            }
        } finally {
            mainLock.unlock();//释放锁
        }
    }
}

最终线程池通过调用tryTerminate() 方法终止线程池。

2.7 其他方法

ThreadPoolExecutor还有很多其他的方法,这里我介绍几个可能用到的

2.7.1 预启动

我们通过上文得知,在线程池创建的时候并不会启动核心线程数,除非调用execute(Runnable) 或submit(Runnable)等方法才会启动核心线程数,如果线程池是刚创建的话,那么第一次运行任务并不会使用到线程池的优点。而ThreadPoolExecutor提供了两个预启动核心线程的方法:
int prestartAllCoreThreads():预启动所有的核心线程,核心线程在池子中等待新的任务
boolean prestartCoreThread():预启动一个核心线程
使用这两个方法的前提需要是allowCoreThreadTimeOut参数为false,否则没有意义

/**
 * 预启动所有的核心线程
 * @return 启动的核心线程数
 */
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))//启动空任务的线程池
        ++n;
    return n;
}
/**
 * 预启动一个核心线程
 * @return true/false
 */
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
}

可以在创建完ThreadPoolExecutor对象后调用这两个方法,以避免第一次运行任务而创建线程的开销。

2.7.2 获取完成的任务数

getCompletedTaskCount() 代码如下:

public long getCompletedTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers)
            n += w.completedTasks;
        return n;
    } finally {
        mainLock.unlock();
    }
}

2.7.3 获取正在运行任务的线程数

getActiveCount() 代码如下:

/**
 * 获取线程池中活跃的线程数, 即线程正在运行任务的线程数
 * @return 活跃的线程数
 */
public int getActiveCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        int n = 0;
        for (Worker w : workers)
            if (w.isLocked())//只有持有锁的线程才是活跃的
                ++n;
        return n;
    } finally {
        mainLock.unlock();
    }
}

使用Worker.isLocked() 判断当前线程是否正在运行任务,因为如果有任务的话,Worker是加锁的。

2.8 总结

到这里线程池的原理已经介绍完毕,而对于ThreadPoolExecutor的一些其他方法可自行阅读,比如submit() 提交任务等。下面我们可以对线程池做一个总结:

  • 线程池通过一个原子的int(4字节,32位)控制线程池的运行状态和当前运行的线程数。高3位表示运行状态,低29位表示线程池的运行线程数量。通过位操作进行计算各自的值
  • 线程池有5种状态,每种状态在功能上有一定区别:
    • RUNNING:负数,表示正在运行的状态
    • SHUTDOWN:0,表示线程池正在停止的过程中,不会接收新的任务,但是会将当前阻塞队列中所有的任务运行完成
    • STOP:值为1 << 29,表示当前线程池基本不工作,不会接收新的任务,也不会运行阻塞队列中的任务,而是将阻塞队列所有的数据清空
    • TIDYINT,值为 2<<29,表示当前线程池停止工作,此时需要运行terminated() 方法,该方法是空方法,留给子类覆写
    • TERMINATED,值为3<<29,表示当前线程池终止了,为这个状态的时候当前的ThreadPoolExecutor对象生命周期就结束了
  • ThreadPoolExecutor使用内部类Worker作为一个工作线程,Worker是一个AQS,也是一个Runnable,通过锁控制保证线程生命周期等操作的安全,Worker对象也就是一个线程。Worker整体设计就是一个消费者模式,生产者是ThreadPoolExecutor,Worker线程通过消费ThreadPoolExecutor中阻塞队列的数据任务来运行用户的任务,而用户通过execute(Runnable) 方法将任务存放到阻塞队列(一定条件)。
  • 通过参数allowCoreThreadTimeOut控制是否销毁核心线程数以内的线程:
    • 如果为true,表示只要有线程空闲的时候,都会被销毁,包括核心线程数以内的线程。这种状态其实不利于线程池的工作,因为线程池最初的目的就是免于创建线程的开销,而回收了核心线程,那么新任务来的时候又要有创建线程开销,违背了线程池的设计初衷
    • 如果为false,表示会回收[corePoolSize, maximumPoolSize) 数量的线程,即核心线程数以内的线程会在池子中不被销毁,等有新的任务进来的时候,直接使用空闲线程运行该任务
  • ThreadPoolExecutor运行的基本流程如下:
    • 如果当前运行的线程数小于corePoolSize的时候,就会创建新的线程运行该任务
    • 如果运行的线程数超过了corePoolSize,就会将任务先存放到阻塞队列中,等待有线程空闲下来就会从阻塞队列中获取任务继续运行
    • 如果运行的线程数超过了corePoolSize,并且阻塞队列已经满了放不下了,那么就会创建新的线程来运行任务,但是线程池的总数不能超过maximumPoolSize,如果超过了maximumPoolSize大小,那么就会创建失败,进入拒绝处理
  • JDK中默认提供了四种拒绝处理:
    • AbortPolicy:会抛出异常终止操作,也是默认的拒绝处理器
    • DiscardPolicy:不做任何操作,丢弃处理
    • CallerRunsPolicy:会运行用户的run()方法,而不是启动线程执行
    • DiscardOldestPolicy:会丢弃阻塞队列中最早的任务,从而运行新的任务,新的任务也不会被立即执行
  • ThreadPoolExecutor留了一些空方法用于扩展,用户可自定义这些空方法而实现一些功能,常见的有:
    • beforeExecute(Thread, Runnable):执行用户的Runnable.run()方法之前处理
    • afterExecute(Runnable, Throwable):执行用户的Runnable.run() 完成后处理
    • terminated():当线程池终止时,状态变为TIDYINT时被调用(状态TERMINATED之前的状态)。
  • 如果在线程池启动后立马调用预启动操作的话,此时还没有任务,需要将allowCoreThreadTimeOut设置为false,否则没有意义,空耗CPU

ThreadPoolExecutor线程池的原理就介绍到这里,具体各个方法的逻辑仔细体会。而对于为什么使用线程池以及线程池在生产环境怎么设置大小、使用规则等情况我们下一章节介绍。

19

评论区