一 ScheduledThreadPoolExecutor源码解析
在JDK中,定时器有多种实现,如Timer,Timer是单线程消费的,如果在执行任务期间某个TimerTask耗时过久,那么会影响其他任务的调度;ScheduledThreadPoolExecutor,线程池版本的定时器,内部实现上其实与Timer类似,但是由于是线程池版本,解决了Timer的很多问题。
1.1 示例
在了解ScheduledThreadPoolExecutor原理之前,我们先看一组示例:
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledThreadPoolExecutor fixDelayExecutor = new ScheduledThreadPoolExecutor(2);
ScheduledThreadPoolExecutor fixRateExecutor = new ScheduledThreadPoolExecutor(2);
//固定延迟任务 Start
fixDelayExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(getTime() + " - 固定延迟任务");
sleep(2, TimeUnit.SECONDS);
}
}, 2, 3, TimeUnit.SECONDS);
//固定延迟任务 End
//固定频率任务 Start
fixRateExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(getTime() + " - 固定频率任务");
sleep(2, TimeUnit.SECONDS);
}
}, 2, 3, TimeUnit.SECONDS);
//固定频率任务 End
//非周期任务 Start
fixRateExecutor.schedule(new Runnable() {
@Override
public void run() {
System.out.println(getTime() + " - 非周期任务");
}
}, 2, TimeUnit.SECONDS);
//非周期任务 End
}
private static String getTime() {
return DateTime.now().toString("yyyy-MM-dd HH:mm:ss.SSS");
}
public static void sleep(int timeout, TimeUnit timeUnit) {
try {
timeUnit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果如下(每个人的运行结果可能不同):
2023-05-17 15:49:53.125 - 非周期任务
2023-05-17 15:49:53.125 - 固定频率任务
2023-05-17 15:49:53.125 - 固定延迟任务
2023-05-17 15:49:56.120 - 固定频率任务
2023-05-17 15:49:58.162 - 固定延迟任务
2023-05-17 15:49:59.113 - 固定频率任务
2023-05-17 15:50:02.121 - 固定频率任务
2023-05-17 15:50:03.178 - 固定延迟任务
2023-05-17 15:50:05.113 - 固定频率任务
2023-05-17 15:50:08.122 - 固定频率任务
2023-05-17 15:50:08.182 - 固定延迟任务
2023-05-17 15:50:11.115 - 固定频率任务
2023-05-17 15:50:13.194 - 固定延迟任务
2023-05-17 15:50:14.120 - 固定频率任务
从运行结果来看:
- schedule(Runnable, long, TimeUnit):非周期性任务,只会达到条件时做一次调度
- scheduleWithFixedDelay(Runnable, long, long TimeUnit):固定延迟的周期性任务,从结果来看,当第一次触发之后,第二次触发是任务的运行时间+固定的延迟时间
- scheduleAtFixedRate(Runnable, long, long, TimeUnit):固定频率的周期性任务,当第一次触发之后,之后的触发是按照固定的频率触发的,与任务本身运行时间长短无关
了解了基本使用之后,我们进行原理解析。
1.2 简介
ScheduledThreadPoolExecutor是一个调度的线程池,所以其具有线程池的全部功能,其继承关系如下:
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,而ThreadPoolExecutor我们在 9、Executor及ThreadPoolExecutor源码解析 已经详细解析过了,这里不再重复。同时,ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,ScheduledExecutorService是对ExecutorService的扩展,ExecutorService我们知道没有调度的功能,而ScheduledExecutorService设计的目的是对ExecutorService在调度层面的扩展。
1.2.1 ScheduledExecutorService
ScheduledExecutorService 的接口定义如下:
public interface ScheduledExecutorService extends ExecutorService {
/**
* 运行指定的命令一次
* @param command 命令
* @param delay 延迟多久后执行
* @param unit 延迟的时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 运行指定的命令一次
* @param callable 命令
* @param delay 延迟多久后执行
* @param unit 延迟的时间单位
* @return ScheduledFuture
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 以固定的速率周期性调度指定的命令
* @param command 命令
* @param initialDelay 启动当前服务后延迟多久开始第一次执行命令
* @param period 速率, 周期性调度时的速率
* @param unit initialDelay和period的时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* 周期性调度指定的命令。这个与上述{@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)}是有区别的,
* 上述{@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)}如果本身任务运行时长超过了指定的速率, 那么
* 下次运行命令时依然是按照指定的速率, 而当前方法不是, 当前方法是按照上次命令执行之后按照指定的延迟(delay)继续执行的
* @param command 命令
* @param initialDelay 启动当前服务后延迟多久开始第一次执行命令
* @param delay 延迟
* @param unit initialDelay和delay的时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
通过上述的代码备注,很容易理解ScheduledExecutorService的设计思想。定义了调度命令的相关方法,而schedule(Runnable, long, TimeUnit)和schedule(Callable, long, TimeUnit)的区别就是是否有返回值。
1.2.2 全局变量及构造函数
我们先了解ScheduledThreadPoolExecutor的全局变量:
/** 如果在SHUTDOWN时取消/取消定期任务, 则为false */
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/** 如果在SHUTDOWN时取消非定期任务, 则为false */
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/** ScheduledFutureTask.cancel之后是否应当从队列中移除 */
private volatile boolean removeOnCancel = false;
/** 序列号生成器 */
private static final AtomicLong sequencer = new AtomicLong();
sequencer表示序号生成器,是全局的生成唯一的序号,该序号的作用是在优先级队列作为优先级的依据之一。
有四个构造函数,分别如下:
/**
* 构造函数
* @param corePoolSize 核心数
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
/**
* 构造函数
* @param corePoolSize 核心数
* @param threadFactory 线程工厂
*/
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
/**
* 构造函数
* @param corePoolSize 核心数
* @param handler 拒绝处理器
*/
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
/**
* 构造函数
* @param corePoolSize 核心数
* @param threadFactory 线程工厂
* @param handler 拒绝处理器
*/
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}
注意,这里使用的阻塞队列是 DelayedWorkQueue 实例,DelayedWorkQueue 是ScheduledThreadPoolExecutor的内部类实现的BlockingQueue接口,主要的调度逻辑便是通过延迟队列实现的。
先了解上述的全局字段以及构造函数,对于不理解的暂时忽略,继续往下看原理部分。
1.3 任务调度
上述示例部分我们介绍了四个调度的方法,作为线程池,本身也有一些执行任务的方法,如execute(Runnnable)、submit(Runnable)等,execute和submit相关方法调用的都是schedule相关方法,如下:
/**
* 执行给定的线程
* @param command 线程
*/
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
/**
* 执行给定的线程
* @param task 线程
*/
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
/**
* 执行给定的线程
* @param task 线程
* @param result 返回值
*/
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
/**
* 执行给定的线程
* @param task 线程
*/
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
schedule(Runnable, long, TimeUnit)、schedule(Callable, long, TimeUnit)、scheduleAtFixedRate(Runnable, long, long, TimeUnit)和scheduleWithFixedDelay(Runnable, long, long, TimeUnit) 代码分别如下:
/**
* 运行指定的命令一次
* @param command 命令
* @param delay 延迟多久后执行
* @param unit 延迟的时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//非空校验 Start
if (command == null || unit == null)
throw new NullPointerException();
//非空校验 End
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));//装饰任务
delayedExecute(t);//延迟执行
return t;
}
/**
* 运行指定的命令一次
* @param callable 命令
* @param delay 延迟多久后执行
* @param unit 延迟的时间单位
* @return ScheduledFuture
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
//非空校验 Start
if (callable == null || unit == null)
throw new NullPointerException();
//非空校验 End
RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//装饰任务
delayedExecute(t);//延迟执行
return t;
}
/**
* 以固定的速率周期性调度指定的命令
* @param command 命令
* @param initialDelay 启动当前服务后延迟多久开始第一次执行命令
* @param period 速率, 周期性调度时的速率
* @param unit initialDelay和period的时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
//非空校验 Start
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//非空校验 End
//装饰任务 Start
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//装饰任务 End
sft.outerTask = t;
delayedExecute(t);//执行
return t;
}
/**
* 周期性调度指定的命令。这个与上述{@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)}是有区别的,
* 上述{@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)}如果本身任务运行时长超过了指定的速率, 那么
* 下次运行命令时依然是按照指定的速率, 而当前方法不是, 当前方法是按照上次命令执行之后按照指定的延迟(delay)继续执行的
* @param command 命令
* @param initialDelay 启动当前服务后延迟多久开始第一次执行命令
* @param delay 延迟
* @param unit initialDelay和delay的时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
//非空校验 Start
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//非空校验 End
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
这四个方法的处理逻辑几乎是一样的,都是先创建RunnableScheduledFuture对象,然后交给delayedExecute(RunnableScheduledFuture) 方法处理。我们先不看schedule的两个方法,而只看scheduleWithFixedDelay() 和scheduleAtFixedRate() 两个方法,这两个方法的区别就是在创建ScheduledFutureTask对象时,最后一个参数不一样,scheduleAtFixedRate() 方法封装时是正数;而scheduleWithFixedDelay() 封装时是负数。
1.3.1 获取下一次执行时间
使用方法triggerTime(long, TimeUnit) 获取任务下一次的执行时间,代码如下:
/**
* 计算下一次触发时间
* @param delay 延迟
* @param unit 单位
* @return 纳秒时间
*/
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* 计算触发时间
* @param delay 延迟(纳秒)
* @return 纳秒时间
*/
long triggerTime(long delay) {
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 当{@code delay}超过了{@link Long#MAX_VALUE >> 1}时会调用该方法
* @param delay delay
* @return 延迟纳秒
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();//获取队列第一个元素(不移除)
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);//获取延迟
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
/**
* 获取当前时间的纳秒
* @return 当前时间的纳秒
*/
final long now() {
return System.nanoTime();
}
下一次的执行时间,通常情况是 now() + delay,只是这里处理了一些极值的情况。
1.3.2 delayedExecute延迟执行
调度任务最终会执行delayedExecute(RunnableScheduledFuture) 方法:
/**
* 延迟执行
* @param task 任务
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())//SHUTDOWN则触发拒绝处理
reject(task);
else {
super.getQueue().add(task);//存放到延迟队列
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))//状态检测
task.cancel(false);
else//状态正常
ensurePrestart();//运行任务
}
}
逻辑很简单:
- 如果线程池已经SHUTDOWN, 则调用reject(Runnable) 拒绝处理,而通过 9、Executor及ThreadPoolExecutor源码解析-2.4 拒绝处理器 部分可以了解其实就是交给了线程池的拒绝处理器处理
- 如果线程池状态正常,则将任务添加到阻塞队列中,并且调用ensurePrestart()方法
ensurePrestart() 代码如下:
/**
* 确保启动, 这里会调用{@link #addWorker(Runnable, boolean)}方法, 而在{@link #addWorker(Runnable, boolean)}方法中,
* 如果传递的参数Runnable为null的话, 则会从阻塞队列中取数据
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());//当前线程数
if (wc < corePoolSize)//小于核心数
addWorker(null, true);
else if (wc == 0)//没有线程
addWorker(null, false);
}
调用了线程池的 addWorker(Runnable, boolean) 方法,而在 9、Executor及ThreadPoolExecutor源码解析-2.3 execute执行原理解析 部分可以了解,如果调用addWorker(Runnable, boolean) 方法,在一定的条件下,会创建一个空的线程消费阻塞队列中的任务。
通过调度流程解析,可以了解到,ScheduledThreadPoolExecutor的调度逻辑是在阻塞队列和任务中,其中阻塞队列是 DelayedWorkQueue对象,而任务对象是 ScheduledFutureTask 对象。
1.4 线程池任务
了解了触发时间的计算之后,就是封装ScheduledFutureTask对象了,而ScheduledFutureTask对象是RunnableScheduledFuture的子类,ScheduledFutureTask的继承关系如下:
通过上述继承关系图可知,ScheduledFutureTask是一个Runnable,也是一个Comparable,而Runnable通常来说就是线程池的任务,实现Comparable是为了确定在优先级阻塞队列的位置。
并且通过上述图,我们可以了解一下Future的架构。
1.4.1 RunnableFuture
RunnableFuture是一个Runnable接口和Future接口的子类,而作为Java程序员来说Future我们很熟悉,可以在线程运行结束后获取线程返回的结果。Future定义的方法有:
public interface Future<V> {
/**
* 尝试取消正在执行的任务。这个尝试可能会失败(因为任务可能已经执行完成)。
* @param mayInterruptIfRunning 如果线程正在运行中是否需要被中断。
* @return 取消执行任务的结果
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 检查在线程完成之前这个任务是否被取消
* @return true/false
*/
boolean isCancelled();
/**
* 检查这个任务是否计算完成。
* @return true/false
*/
boolean isDone();
/**
* 等待任务执行完成, 然后返回这个任务执行的结果
* @return 结果
* @throws InterruptedException 中断异常
* @throws ExecutionException 执行异常
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待任务执行完成, 然后返回这个任务执行的结果。如果等待timeout时间之后没有拿到结果, 则抛{@code TimeoutException}异常
* @param timeout 超时时间
* @param unit 时间单位
* @return 结果
* @throws InterruptedException 中断异常
* @throws ExecutionException 执行异常
* @throws TimeoutException 超时异常
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture在Future的基础上扩展出了Runnable功能:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* run()方法
*/
void run();
}
run() 方法是Runnable中的run() 方法,当线程启动执行任务时,调用的就是该方法。
1.4.2 Delayed和ScheduledFuture
Delayed接口在Java中表示延迟的意思,它是Comparable的子类,具有排序的能力,其只有一个方法:
public interface Delayed extends Comparable<Delayed> {
/**
* 获取此对象剩余的延迟, 以给定的单位为准
* @param unit 单位
* @return 延迟
*/
long getDelay(TimeUnit unit);
}
通过getDelay(TimeUnit) 方法获取延迟任务当前到下一次执行时间的间隔大小。
ScheduledFuture并没有任何扩展功能,只是作为调度任务的一个标识:
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
1.4.3 RunnableScheduledFuture
RunnableScheduledFuture 只有一个方法:
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
/**
* 是否是周期性的任务
* @return true/false
*/
boolean isPeriodic();
}
通过isPeriodic() 方法定义当前的任务是否是周期性的任务,返回结果为true表示当前任务是周期性任务,否则为一次性任务。
1.4.4 FutureTask
FutureTask是一个普通的类(非接口非抽象类),是对Future功能的一个完整的实现,这里我们不作为主体内容介绍,后续有时间可以单独对其进行源码分析。
FutureTask整体设计是这样的:
- 存在多个状态,NEW表示初始状态,COMPLETING表示正在完成中,但是还没有完成,NORMAL表示已经完成,用户可以获取结果了,EXCEPTIONAL表示用户的线程任务运行出现了异常,CANCELLED表示任务被取消,INTERRUPTING表示正在中断,INTERRUPTED表示已经中断
- 当用户调用get()方法获取结果的时候,如果状态是NEW或者COMPLETING的话则表示线程任务没有运行结束,需要阻塞;如果状态是NORMAL的话则正常返回;如果CANCELLED、INTERRUPTING、INTERRUPTED则抛出异常CancellationException,如果状态是EXCEPTIONAL,则抛出异常ExecutionException
- 正常运行任务会调用其中的run()方法,方法正常执行完成则将状态从NEW --> COMPLETING, 最后置为NORMAL状态,并且会唤醒被阻塞的线程(get()方法时被阻塞的线程)
- 如果run()方法运行出现了异常,则将状态置为EXCEPTIONAL
大致的流程如上,具体的代码也很简单,可自行阅读。
1.4.5 ScheduledFutureTask
ScheduledFutureTask是当前调度线程池需要的任务,具有Future、Runnable、Delayed、Comparable等全部功能。对上述接口做了具体的实现。那么我们重点分析ScheduledFutureTask对象的组织逻辑。
1.4.5.1 全局变量和构造函数
ScheduledFutureTask的全局变量如下:
/** 断开FIFO连接的序列号 */
private final long sequenceNumber;
/** 任务执行的时间, 单位为纳秒 */
private long time;
/** 周期性任务的周期, 单位为纳秒。负值表示fixed-delay执行, 0表示非周期性任务 */
private final long period;
/** 周期性任务重新入队的实际任务, 实际上就是{@code this} */
RunnableScheduledFuture<V> outerTask = this;
/** 当前对象在{@link DelayedWorkQueue}队列中的索引位置 */
int heapIndex;
- sequenceNumber是通过ScheduledThreadPoolExecutor.sequencer生成的,用作任务比较大小时的条件之一
- time表示下一次任务的执行时间
- period表示周期性任务的频率,负数表示fixedDelay执行,0表示非周期性任务
- heapIndex:表示它在队列(底层是数组)中的数组索引,是为了快速定位用的
构造函数有三个:
/**
* 构造函数
* @param r 任务
* @param result 结果
* @param ns 纳秒
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 构造函数
* @param r 任务
* @param result 结果
* @param ns 纳秒
* @param period 周期
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 构造函数
* @param callable 任务
* @param ns 纳秒
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
在构造函数中指定了用户的任务,以及下一次触发的时间,如果不是周期性任务那么period为0;如果是周期性任务会指定period的值,而经过上述 “1.3 任务调度” 部分可以知道,如果是fixed-delay方式则period为负数,如果是fixed-rate方式period为正数。那么判断是否是周期性任务就通过period字段判断即可,isPeriodic() 代码如下:
/**
* 是否是周期性的任务
* @return true/false
*/
public boolean isPeriodic() {
return period != 0;
}
构造方法中会生成唯一一个sequenceNumber序列号,该序列号是作为compareTo() 方法的指标之一。
1.4.5.2 compareTo比较
在延迟阻塞队列中,通常的做法是优先级队列,因为对于一个延迟3秒执行和一个延迟2秒执行的任务,延迟2秒执行的任务应当先被线程运行,而延迟3秒执行的任务后被执行,这种情况使用优先级队列正合适。而优先级队列是需要有比较条件的,所以ScheduledFutureTask需要重写compareTo(Delayed) 方法, 其代码如下:
/**
* 比较
* @param other Delayed
* @return int
*/
public int compareTo(Delayed other) {
if (other == this)//同一个对象
return 0;
//ScheduledFutureTask比较 Start
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;//时间差
if (diff < 0)//当前对象的时间在other时间之前
return -1;
else if (diff > 0)//当前时间在other时间之后
return 1;
else if (sequenceNumber < x.sequenceNumber)//比较sequenceNumber
return -1;
else//其他情况直接返回1
return 1;
}
//ScheduledFutureTask比较 End
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
这里的比较是按照下一次的执行时间比较,如果当前对象的执行时间在待比较的任务的执行时间之前,则返回-1,表示比other小;如果时间相同则按照sequenceNumber比较,而sequenceNumber通常是先放入线程池的小。
如果待比较的other对象不是ScheduledFutureTask对象,则通过getDelay()方法比较大小。ScheduledFutureTask.getDelay(TimeUnit) 代码如下:
/**
* 获取此对象剩余的延迟, 以给定的单位为准
* @param unit 单位
* @return 延迟, 这里返回的是纳秒
*/
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
下一次的执行时间减去当下,就表示剩余的延迟时间。
1.4.5.3 run运行任务
ScheduledFutureTask作为一个Runnable,线程池执行任务时调用的是Runnable.run() 方法,所以我们看其run() 方法:
/**
* 任务运行的逻辑
*/
public void run() {
boolean periodic = isPeriodic();//是否是周期性任务
if (!canRunInCurrentRunState(periodic))//非RUNNING也非SHUTDOWN
cancel(false);//断掉线程池
else if (!periodic)//非周期性任务
ScheduledFutureTask.super.run();//运行用户任务
else if (ScheduledFutureTask.super.runAndReset()) {//周期性任务, 运行用户指令并返回用户运行运行的成功与否
setNextRunTime();//设置下一次运行时间
reExecutePeriodic(outerTask);//周期性任务重新入队
}
}
1.先判断线程池的状态,通过方法canRunInCurrentRunState(boolean) 判断,这个方法返回结果是否是RUNNING或者SHUTDOWN,如果既不是RUNNING也不是SHUTDOWN,那么证明线程池是STOP、TIDYING、TERMINATED其中一个,不应当执行任务,会调用cancel(boolean) 取消任务。
canRunInCurrentRunState(boolean) 代码如下:
/**
* 判断当前线程池是否是RUNNING或者SHUTDOWN状态
* @param periodic 是否是周期性的
* @return true/false
*/
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}
/**
* 检查是否是RUNNING状态或者SHUTDOWN状态。
* @param shutdownOK 如果是true的话, 那么如果是SHUTDOWN状态的也返回true
* @return true/false
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
2.如果当前任务是非周期性的(isPeriod()方法),那么执行一下其run() 方法即可。
3.如果是周期性任务,则会调用FutureTask.runAndReset() 方法,在这个方法中会运行用户的任务并返回用户的结果,同时状态不会置为NORMAL,而是NEW,因为周期性任务需要不断重复执行,所以为NEW。
FutureTask.runAndReset() 代码如下:
/**
* 运行并重置
* @return true/false
*/
protected boolean runAndReset() {
//设置当前线程为运行线程 Start
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
//设置当前线程为运行线程 End
boolean ran = false;//是否正常运行用户指令没有出异常
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call();//运行用户命令
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;//任务运行完成将其置为null
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
4.如果用户的任务执行成功,那么会调用 setNextRunTime() 设置下一次的执行时间;同时也会调用reExecutePeriodic(RunnableScheduledTask) 将任务重新入队,因为下一次的执行时间变了,所以在优先级的队列中位置可能也不一样。
setNextRunTime() 代码如下:
/**
* 设置下一次运行的时间
*/
private void setNextRunTime() {
long p = period;
if (p > 0)//大于0表示周期性任务
time += p;
else//小于0表示固定延迟, 等于0表示非周期性任务(这里不存在等于0的情况)
time = triggerTime(-p);
}
设置下一次的执行时间,如果period大于0,表示是fixed-rate模式,以固定频率执行任务,所以是通过上一次的执行时间加上周期频率;如果period小于0(能走到这里period不会等于0),表示是fixed-delay模式,是以固定的延迟执行任务,即当前任务运行完成的时间+固定的延迟,也就是triggerTime(long) 方法返回的结果值。
reExecutePeriodic(RunnableScheduledTask) 代码如下:
/**
* 周期性任务重新入队
* @param task 任务
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {//线程池是RUNNING或者SHUTDOWN状态
super.getQueue().add(task);//入队
if (!canRunInCurrentRunState(true) && remove(task))//线程池不是RUNNING也不是SHUTDOWN, 注销
task.cancel(false);
else//线程池正常
ensurePrestart();
}
}
将任务添加到阻塞队列,并通过ensurePrestart() 保证线程池中有线程在消费阻塞队列任务。ensurePrestart() 在上述已经介绍过了,这里不做重复介绍。
1.4.6 装饰任务
在上述的 “1.3 调度任务” 部分创建好ScheduledFutureTask后,会调用decorateTask(Runnable, RunnableScheduledFuture)或decorateTask(Callable, RunnableScheduledFuture) 方法装饰任务,这两个方法是protected的,如果用户需要自定义装饰,可重写这两个方法,而ScheduledThreadPoolExecutor中的实现是直接返回RunnableScheduledFuture,两个方法代码分别如下:
/**
* 装饰任务。这里直接返回{@code task}
* @param runnable 任务
* @param task 任务
* @return RunnableScheduledFuture
*/
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
/**
* 装饰任务。这里直接返回{@code task}
* @param callable 任务
* @param task 任务
* @return RunnableScheduledFuture
*/
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
1.5 DelayedWorkQueue延迟队列
DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类,实现了BlockingQueue接口,是ScheduledThreadPoolExecutor调度线程池的阻塞队列。内部使用数组的结构,是一个优先级的队列结构。作为一个队列,其最重要的功能只有两个:插入元素和获取元素。
1.5.1 全局变量
/** 初始化的队列大小: 16 */
private static final int INITIAL_CAPACITY = 16;
/** 队列 */
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
/** 重入锁 */
private final ReentrantLock lock = new ReentrantLock();
/** size大小 */
private int size = 0;
/**
* 当队列头的执行时间未到时, 说明所有的任务执行时间都未到, 如果此时有多个线程来获取任务, 那么这个线程表示为第一个线程,
* 后续线程判断这个值不为空表示队列头的执行时间未到(不用再从数组中获取数据并计算剩余的执行时间的操作), 后续线程被阻塞
*/
private Thread leader = null;
/** 等待条件队列 */
private final Condition available = lock.newCondition();
其中queue数组表示队列的元素,数组默认大小是16。
1.5.2 插入元素
通过 7、BlockingQueue之ArrayBlockingQueue源码解析 我们知道插入元素的方法有多个,add(E)、offer(E, long, TimeUnit)、put(E)、offer(E),而DelayedWorkQueue是一个无界队列,所以在插入元素的操作上不会因为空间不足而阻塞。这四个方法的原型如下:
/**
* add元素
* @param e 元素
* @return true/false
*/
public boolean add(Runnable e) {
return offer(e);
}
/**
* 存放元素。其中{@code timeout}与{@code unit}这里没用
* @param e 元素
* @param timeout 如果队列满了等待的超时时间
* @param unit 时间单位
* @return true/false
*/
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* put元素
* @param e 元素
*/
public void put(Runnable e) {
offer(e);
}
/**
* 向队列添加元素
* @param x 任务
* @return true/false
*/
public boolean offer(Runnable x) {
//非空校验 Start
if (x == null)
throw new NullPointerException();
//非空校验 End
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//需要扩容数组 Start
if (i >= queue.length)
grow();
//需要扩容数组 End
size = i + 1;
if (i == 0) {//第一个元素
queue[0] = e;
setIndex(e, 0);
} else {//非第一个元素
siftUp(i, e);//存放元素
}
//如果是第一个元素则激活条件队列(获取数据的线程) Start
if (queue[0] == e) {
leader = null;
available.signal();
}
//如果是第一个元素则激活条件队列(获取数据的线程) End
} finally {
lock.unlock();
}
return true;
}
前面三个方法最终调用的是 offer(E) 方法,在offer(E) 方法中,先判断数组还有没有空间,如果没有的话调用 grow() 进行扩容,扩容的操作其实是比较耗时的操作,因为涉及到数组的移动;然后判断当前插入的元素是否是队列头,如果是则直接插入,如果不是则调用siftUp(int, Runnable) 方法插入,因为不是数组头的话,是需要判断当前元素在队列中的位置的,插入后可能需要涉及到数组的移动。
siftUp(int, Runnable) 代码如下:
/**
* 从指定值{@code k}的中间值开始, 往数组的左移动, 如果{@code key}
* 的时间在数组的时间之前, 则将其放入指定位置
* @param k k
* @param key RunnableScheduledFuture
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
/*
* 这个算法就是寻找放入元素的位置, 即寻找到key在数组中哪个元素之前
*/
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
这个算法也不难,使用类似二分法从中间开始,判断通过compareTo() 方法确定其在数组中的位置并移动数组,而compareTo() 我们在上文中介绍过了,主要是通过下一次的执行时间进行排列,距离当前时间越近的在前,远的在后。
如果是第一个位置插入的元素,则会唤醒available 条件队列被阻塞的线程,其实就是全局变量线程 leader。
1.5.3 获取元素
获取元素的方法有四个:peek()、poll()、poll(long, TimeUnit)、take()、,这四个方法代码分别如下:
/**
* 获取第一个位置的元素(不移除元素)
* @return RunnableScheduledFuture
*/
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
/**
* 获取队列中第一个元素。
* 1、如果队列中第一个元素的延迟时间没有到, 那么直接返回{@code null}
* 2、如果队列中第一个元素存在, 并且已经过了或者到了延迟时间, 那么对size做递减操作, 并重新计算队列中最后一个元素的位置
* @return RunnableScheduledFuture
*/
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];//获取队列第一个元素
if (first == null || first.getDelay(NANOSECONDS) > 0)//第一个元素为空或者第一个元素的延迟时间还没有到
return null;
else//第一个元素不为空并且第一个元素的时间已经到了或者已经过了, 则返回该元素
return finishPoll(first);
} finally {
lock.unlock();
}
}
/**
* 拿取元素
* @param timeout 超时时间
* @param unit 时间单位
* @return RunnableScheduledFuture
* @throws InterruptedException 异常
*/
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//主体逻辑, 无限循环 Start
for (; ; ) {
RunnableScheduledFuture<?> first = queue[0];//获取第一个元素
if (first == null) {//第一个元素为空
if (nanos <= 0)//没有等待时间, 直接返回
return null;
else//存在等待时间
nanos = available.awaitNanos(nanos);//等待(释放锁)
} else {//第一个元素不为空
//第一个元素的延迟时间已到 Start
long delay = first.getDelay(NANOSECONDS);//获取延迟
if (delay <= 0)
return finishPoll(first);//返回元素
//第一个元素的延迟时间已到 End
//第一个元素延迟时间未到 Start
if (nanos <= 0)//用户没有指定超时时间, 直接return
return null;
first = null;//置为空(first已无用, 释放内存而已)
//等待逻辑 Start
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
//等待逻辑 End
//第一个元素延迟时间未到 End
}
}
//主体逻辑, 无限循环 End
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
/**
* 拿取元素
* @return RunnableScheduledFuture
* @throws InterruptedException 异常
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//主体逻辑, 无限循环 Start
for (; ; ) {
RunnableScheduledFuture<?> first = queue[0];//获取队列第一个元素
if (first == null)//第一个元素为空, 则阻塞等待(释放锁)
available.await();
else {//第一个元素不为空
long delay = first.getDelay(NANOSECONDS);//延迟时间
//第一个元素已经到了延迟或者已经过了延迟了 Start
if (delay <= 0)
return finishPoll(first);
//第一个元素已经到了延迟或者已经过了延迟了 End
//第一个元素不为空并且没有到执行时间 Start
first = null;//置为空(first已无用, 释放内存而已)
if (leader != null)//有线程(如果其他线程进来, 那么leader是不为空的), 等待
available.await();//等待(释放锁)
else {//没有leader
Thread thisThread = Thread.currentThread();//当前线程
leader = thisThread;//赋值
try {
available.awaitNanos(delay);//等待(释放锁)
} finally {
if (leader == thisThread)
leader = null;
}
}
//第一个元素不为空并且没有到执行时间 End
}
}
//主体逻辑, 无限循环 End
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
这里注意一下,其中peek() 方法是不管队列中的任务时间是否已到,是直接返回队头的元素,不过在线程池中我们知道用到的不是这个方法。
其他三个方法整体上是一样的,只是poll(long, TimeUnit)和take() 方法多了一步对leader的赋值和阻塞操作,而leader赋值之后,其他线程获取元素的时候不需要重新计算第一个线程的剩余时间,直接判断不为空即可。其他的逻辑是:
- 获取数组的第一个元素
- 判断第一个元素的剩余时间,如果已到或者已经过了执行时间了,那么调用finishPoll(RunnableScheduledFuture) 方法,这个方法是将数组中的元素减去,并移动数组
任务的剩余时间是通过getDelay(TimeUnit) 方法获取的,这个方法我们在上文中已经介绍过了,不再重复介绍。而finishPoll(RunnableScheduledFuture) 也很简单,代码如下:
/**
* 将队列中最后一个元素弹出, 并将这个元素(队列最后一个元素)重新计算位置插入
* 返回的值是用户的参数{@code f}
* @param f RunnableScheduledFuture
* @return f
*/
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
到这里,ScheduledThreadPoolExecutor的主体逻辑我们已经介绍完毕了。
1.6 小结
ScheduledThreadPoolExecutor的总结如下:
- ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor的子类,具有线程池的全部功能;也是ScheduledExecutorService的子类,具有任务调度的功能。
- 线程池内部的任务使用ScheduledFutureTask类,该类实现了Comparable接口,其中compareTo(Delayed) 方法的主要逻辑是如果当前对象下一次的执行时间在待对比的对象的执行时间之前,则比待对比的对象小,所以对比的是下一次的执行时间;其中的run() 方法执行时,如果是一次性任务,则调用一次目标命令的run()方法;如果是周期性任务,调用完目标run()方法后,会重置下一次执行时间并将其重新放入阻塞队列;而重置下一次执行时间有两种模式:
- fixed-rate:固定频率,以上一次的执行时刻为准,加上指定的频率值就是下一次的执行时间
- fixed-delay:固定延迟,任务的run()方法运行完成之后,加上固定的延迟就是下一次的执行时间
- 线程池的队列是ScheduledThreadPoolExecutor的内部类DelayedWorkQueue,是一个无界队列。
- 添加任务时,会通过compareTo()方法确定其在队列中的位置,涉及到数组的移动
- 获取任务时,也会涉及到数组的移动。底层的获取逻辑的时间复杂度是O(logN)的
- 对于任务数比较多的系统,如集中式的任务调度系统中,其调度的完整性较时间轮算法低
- 如果ScheduledThreadPoolExecutor(1) 这种与Timer在任务调度上区别不大
评论区