一 时间轮
时间轮方案将现实世界中的时钟概念引入到软件设计中,主要思路是定义一个时钟周期(比如时钟的12小时)和步长(比如时钟的一秒走一次),当指针每走一步的时候,会获取当前时钟刻度上挂载的任务并执行。所以说时间轮算法轮询不再遍历所有任务,而是遍历时间刻度。
1.1 时间轮算法简介
延迟队列的使用场景,时间轮都可以使用,比如:
- 心跳检测(维护TCP连接)
- 会话、请求超时
- 消息延迟推送
- 业务超时订单(30分钟内支付)
时间轮算法也存在一定的局限,比如按1秒为一个时间刻度,那么一天有86400个刻度,当我添加两个任务,一个是20秒后执行,另一个是86400秒后执行,那么其中大部分轮询都是空轮询,而且会浪费内存空间(每个刻度都有自己的任务队列)。
1.1.1 单层时间轮
单层时间轮就是我们上面介绍的,轮询线程遍历时间刻度,好比指针不断在时钟上旋转、遍历。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickDuration),时间轮的时间格个数是固定的,如下图所示:
如上图所示,相邻bucket到期时间的间隔为1秒,从0秒开始计时,1秒到期的定时任务挂在bucket=1下,2秒到期的定时任务挂在bucket=2下,第18秒到期的定时任务挂在bucket=2下,但是此时的round(圈数)=1,如果指针pointer在第一圈时,不会执行round != 0的任务。任务bucket存储到期的任务,当指针pointer指向哪时,获取bucket所有round为0的任务并执行,而round大于0的则对round递减,bucket通常使用链表存储(数组的可能存在移动,导致耗时)
如果上述的时间轮通过数组实现,可以很方便通过下标定位到定时任务链路,因此,添加、删除、执行定时任务的时间复杂度为O(1)。
时间轮常见数据结构模型如下:
- pointer/tick:指针/刻度,随着时间的推移,指针不停向前移动
- bucket:时间轮由bucket组成,如上图有12个bucket,每个bucket下挂载了未来要到期的节点
- tickDuration:刻度间隔,如上图,总共12个bucket,这个值设置为1秒,那么相邻两个bucket的间隔就是1秒
- round:任务剩余的圈数(通常是任务的数据结构)
时间轮使用一个表盘指针(pointer/tick),用来表示时间轮当前指针跳动的次数,可以用 tickDuration * (pointer + 1)来表示下一次到期的任务。
1.1.2 分层时间轮
单层时间轮存在的缺点,如转动圈数大但是任务量小,会更多的空耗CPU,占用更多的内存等。那么为了解决单层时间轮的不足,就有了多层时间轮。
分层时间轮采用层级联动的方式,具有以下特点:
- 不做遍历计算剩余的round,每一个bucket下的任务都是应该执行的
- 当任务执行时间超过当前刻度范围时,进入下一层时间轮范围
- 定时任务通过升级和降级来转移队列中的位置
如下图所示:
如上图所示,分层时间轮通过升级和降级来联动层级,执行时间超过当前层级最大刻度时,就会进行升级,进入下一层时间轮。
而接下来我们研读的Netty时间轮的实现主要是单层时间轮,而多层时间轮可以阅读kafka的设计。
1.2 时间轮与阻塞队列
了解了上述的时间轮,并且阅读过 11、ScheduledThreadPoolExecutor源码解析 文章,可以对时间轮以及阻塞队列的定时做一个对比:
- 时间轮算法,刻度/指针在不停的跳动,跳动一格,就执行对应bucket上挂载的任务,所以获取任务的时间复杂度是O(1);而ScheduledThreadPoolExecutor中使用的是DelayedWorkQueue延迟阻塞队列,底层是小顶堆的数组结构,当获取任务后存在数组的移动,时间复杂度是O(logN)
- 时间轮算法,在插入任务时,只需要通过触发时间、tickDuration、tick/pointer和bucket数计算出所在的bucket和剩余的round数,直接插入即可,时间复杂度是O(1);而ScheduledThreadPoolExecutor中的DelayedWorkQueue使用的数组形式的优先级(触发时间排列)队列,插入时数组存在移动的可能,时间复杂度是O(logN)
- 时间轮算法,在任务间的间隔比较大,bucket较多的情况下存在空耗CPU的情况;相对这种情况,DelayedWorkQueue会阻塞,几乎不存在空跑CPU
- 任务量较大时建议使用时间轮算法,如集中式的调度系统。
接下来我们看Netty的时间轮算法的实现。
二 Netty时间轮实现
我这里使用的是Netty 4.1.77.final为原型去理解的,相关代码在common项目中,具体可自行去github上克隆代码查阅。
Netty时间轮算法的实现相对来说比较简单,它是以单层时间轮为理论实现的,在会话超时检测、维持心跳等方面有具体的应用。
Netty实现时间轮有三个部分组成:
- Timer:非JDK中的Timer,而是netty自定义的Timer接口,主要是提供对外的API,用户调用相关API执行任务
- TimerTask:Timer执行的用户任务接口
- Timeout:Timeout记录了任务相关的会话信息,如截止时间、剩余轮数、对应的轮子的哪一个bucket等。
2.1 示例
先看一组示例:
public class WheelTimerTest {
public static void main(String[] args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(4);
//定义定时器, 每1秒中指针跳动一次, 跳动一圈是60秒
Timer timer = new HashedWheelTimer(new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程" + counter.getAndIncrement());
}
}, 1, TimeUnit.SECONDS, 60, false, -1, executorService);
//执行周期任务(每5秒执行一次) Start
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(getTime() + " - 周期任务执行......");
timer.newTimeout(this, 4, TimeUnit.SECONDS);
}
}, 2, TimeUnit.SECONDS);
//执行周期任务(每5秒执行一次) End
//执行一次任务 Start
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(getTime() + " - 一次性任务执行结束.....");
}
}, 2, TimeUnit.SECONDS);
//执行一次任务 End
}
public static String getTime() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
}
运行效果如下:
2023-05-19 19:00:46 - 一次性任务执行结束.....
2023-05-19 19:00:46 - 周期任务执行......
2023-05-19 19:00:51 - 周期任务执行......
2023-05-19 19:00:56 - 周期任务执行......
2023-05-19 19:01:01 - 周期任务执行......
2023-05-19 19:01:06 - 周期任务执行......
上述示例中使用netty提供的HashedWheelTimer作为Timer的实现类来执行任务的,每一秒跳动一次指针,周期性任务每5秒运行一次(延迟时间+跳动时间),一个时间轮周期是60秒。
从上述示例中也能看出,netty并没有提供周期性任务的函数,只有一次性的任务,而周期性的任务就是在任务中继续将自己(this)放到时间轮里,以便下次执行。
2.2 三大组件
netty的时间轮三大组件,即Timer、TimerTask、Timeout,三个组件分别饰演不同的功能。
2.2.1 Timer
Timer是提供给用户执行任务的定时器接口,其中只有两个函数:
public interface Timer {
/**
* 在指定的{@code delay}延迟之后调度一下(只调度一次){@link TimerTask}任务
* @param task 任务
* @param delay 延迟时间
* @param unit 时间单位
* @return Timeout
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* 停止
* @return Timeout
*/
Set<Timeout> stop();
}
newTimeout(TimerTask, long, TimeUnit) 表示延迟多久后执行一次用户的任务,而任务使用TimerTask的实现类。netty中唯一的实现类是HashedWheelTimer。
2.2.2 TimerTask
TimerTask表示执行器中要执行的任务接口,原型如下:
public interface TimerTask {
/**
* 任务的执行方法
* @param timeout Timeout
* @throws Exception 异常
*/
void run(Timeout timeout) throws Exception;
}
只定义了run(Timeout) 方法,表示运行指定的逻辑。
2.2.3 Timeout
Timeout表示任务在运行的过程中的一些数据,如关联的Timer、关联的TimerTask对象、是否已到期/过期等功能,可以表示任务运行的一次会话,其原型如下:
public interface Timeout {
/**
* 获取关联的{@link Timer}对象
* @return Timer
*/
Timer timer();
/**
* 获取关联的任务{@link TimerTask} 对象
* @return TimerTask
*/
TimerTask task();
/**
* 判断关联的任务{@link TimerTask}是否已过期
* @return true/false
*/
boolean isExpired();
/**
* 判断关联的任务{@link TimerTask}是否已被取消
* @return true/false
*/
boolean isCancelled();
/**
* 取消
* @return true/false
*/
boolean cancel();
}
2.3 HashedWheelTimer的实现
HashedWheelTimer是netty提供的时间轮算法的实现组件,HashedWheelTimer是Timer子类。
2.3.1 相关全局字段
HashedWheelTimer的全局字段有很多,有一些是辅助性的,有一些是功能性的,这里我们不详细介绍辅助性的功能,主要关注功能性的,相关字段如下:
/** 日志 */
static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);
/** 实例的计数器 */
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
/** 实例的计数限制 */
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);
/** {@link #workerState}的原子更新器 */
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final ResourceLeakTracker<HashedWheelTimer> leak;
/** 工作任务 */
private final Worker worker = new Worker();
/** 工作线程 */
private final Thread workerThread;
/** {@link #workerState}对应的init */
public static final int WORKER_STATE_INIT = 0;
/** {@link #workerState}对应的start */
public static final int WORKER_STATE_STARTED = 1;
/** {@link #workerState}对应的shutdown */
public static final int WORKER_STATE_SHUTDOWN = 2;
/** 运行状态。0: init, 1: started, 2: shutdown */
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState;
/** 刻度间隔 */
private final long tickDuration;
/** 轮子桶 */
private final HashedWheelBucket[] wheel;
/** 掩码, 是{@link #wheel}的长度 - 1, 为了从{@link #wheel}获取元素时避免数组越界 */
private final int mask;
/** 等待初始化开始时间{@link #startTime} */
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
/** Timeout的队列 */
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
/** 已经被取消的Timeout的队列 */
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
/** pending Timeout的数量 */
private final AtomicLong pendingTimeouts = new AtomicLong(0);
/** 最大的pending Timeout的数量 */
private final long maxPendingTimeouts;
/** 执行器 */
private final Executor taskExecutor;
/** 开始时间 */
private volatile long startTime;
主要的功能性字段如下:
- worker:是一个Runnable子类,核心的逻辑线程,表示跳动指针、执行任务的线程
- workerThread:线程类,运行Worker对象
- workerState:当前Timer的工作状态,其值有0(init)、1(started)、2(shutdown),初始值是0(init)
- tickDuration:每个刻度的间隔,单位是纳秒
- wheel:HashedWheelBucket的数组,表示轮子中的每一个桶,内部关联HashedWheelTimeout链表
- timeouts:HashedWheelTimeout的队列,表示添加的任务队列,Worker线程会消费该队列的内容以执行任务。
- taskExecutor:任务的执行器,表示运行用户TimerTask.run()方法的执行器,默认的执行器是ImmediateExecutor(不是以线程方式执行用户任务,而是直接调用TimerTask.run()方法)
- startTime:定时器的开始时间,当第一个任务进入时赋值
2.3.2 构造函数
HashedWheelTimer的构造函数有多个,但是最终都会指向同一个:
/**
* 构造函数, 这里的线程工厂使用 {@link Executors#defaultThreadFactory()}
*/
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
/**
* 构造函数, 这里的线程工厂使用 {@link Executors#defaultThreadFactory()}
* @param tickDuration 时间间隔
* @param unit 时间单位
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
/**
* 构造函数, 这里的线程工厂使用 {@link Executors#defaultThreadFactory()}
* @param tickDuration 时间间隔
* @param unit 间隔单位
* @param ticksPerWheel 每个轮子bucket的数量
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
/**
* 构造函数, 这里的tickDuration时间间隔设置为100毫秒
* @param threadFactory 线程创建工厂
*/
public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
/**
* 构造函数, 这里的每个轮子的是有512个bucket
* @param threadFactory 线程创建工厂
* @param tickDuration 刻度间隔
* @param unit 时间单位
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
/**
* 构造函数
* @param threadFactory 线程创建工厂
* @param tickDuration 刻度间隔
* @param unit 时间单位
* @param ticksPerWheel 每个轮子的刻度
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
/**
* 构造函数
* @param threadFactory 线程创建工厂
* @param tickDuration 刻度间隔
* @param unit 时间单位
* @param ticksPerWheel 每个轮子bucket的数量
* @param leakDetection true表示leak detection总是被启用, false表示只有当worker thread不是daemon线程时才会被启用
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}
/**
* 构造函数, 这里使用的执行器是{@link ImmediateExecutor}
* @param threadFactory 线程创建工厂
* @param tickDuration 刻度间隔
* @param unit 时间单位
* @param ticksPerWheel 每个轮子bucket的数量
* @param leakDetection true表示leak detection总是被启用, false表示只有当worker thread不是daemon线程时才会被启用
* @param maxPendingTimeouts 最大的Timeout pending数量
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
maxPendingTimeouts, ImmediateExecutor.INSTANCE);
}
/**
* 构造函数, 这里使用的执行器是{@link ImmediateExecutor}
* @param threadFactory 线程创建工厂
* @param tickDuration 刻度间隔
* @param unit 时间单位
* @param ticksPerWheel 每个轮子bucket的数量
* @param leakDetection true表示leak detection总是被启用, false表示只有当worker thread不是daemon线程时才会被启用
* @param maxPendingTimeouts 最大的Timeout pending数量
* @param taskExecutor 执行任务的执行器
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) {
//参数校验 Start
checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
//参数校验 End
wheel = createWheel(ticksPerWheel);//创建轮子, 是2的倍数
mask = wheel.length - 1;//掩码
long duration = unit.toNanos(tickDuration);//纳秒
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
workerThread = threadFactory.newThread(worker);//创建线程
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
不同的构造函数使用的默认值不同,但是最终都会指向最后一个构造函数。最后一个构造函数会对相关变量进行赋值
2.3.2.1 创建桶
在构造函数中,会创建HashedWheelBucket数组,表示在一个时间轮周期内有多少个小格,即表示指针转动一圈的次数。
使用函数createWheel(int) 创建桶,代码如下:
/**
* 创建轮子桶, 桶的数量是2的倍数, 并且是大于等于{@code ticksPerWheel}
* @param ticksPerWheel 桶的数量参考
* @return HashedWheelBucket数组
*/
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");//不能越界
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//桶的数量
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];//创建数组
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();//填充数组
}
return wheel;
}
/**
* 计算数量, 结果为2的倍数, 并且大于或等于{@code ticksPerWheel}
* @param ticksPerWheel 参考数量
* @return 2的倍数
*/
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
创建的逻辑保证HashedWheelBucket的数组的大小是2的倍数(通过函数normalizeTicksPerWheel(int) 保证),这里使用的是左移操作,位操作这里不做介绍,具体请看 3、位操作 文章。
2.3.2.2 创建线程
构造函数创建Thread(workerThread)使用的是工厂创建的,线程工厂我们可以自定义,也可以使用默认的线程工厂,线程的任务为Worker对象
workerThread = threadFactory.newThread(worker);//创建线程
而构造函数的其他代码是一些辅助性的字段的赋值,有兴趣可自行阅读。
2.3.3 HashedWheelTimeout
HashedWheelTimeout可以表示一次任务的会话,是一个链表的结构,这里我们先简单看一下其全局变量以及构造函数,至于具体的运行组织逻辑我们稍后再看。
全局变量如下:
/** {@link #state}状态值, 0, 表示init */
private static final int ST_INIT = 0;
/** {@link #state}状态值, 1, 表示cancelled, 取消 */
private static final int ST_CANCELLED = 1;
/** {@link #state}状态值, 2, 表示expired, 到期或过期了 */
private static final int ST_EXPIRED = 2;
/** {@link #state}的原子更新器 */
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
/** 关联的Timer */
private final HashedWheelTimer timer;
/** 关联的任务 */
private final TimerTask task;
/** 截止时间 */
private final long deadline;
/** 状态, 默认为INIT, 0 */
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;
/**
* 剩余的轮数, 是在{@link Worker#transferTimeoutsToBuckets()}中计算的, 在被添加到{@link HashedWheelBucket}之前计算出来的
* 比如一个时间轮, 有4个桶, 每个桶的刻度间隔是2s, 如果这个时候存放的延迟任务是10s后执行, 那么时间轮一圈下来不应当执行这个任务,
* 1圈后的一格才能执行, 这个就表示剩余的圈数
*/
long remainingRounds;
/** 下一个元素 */
HashedWheelTimeout next;
/** 上一个元素 */
HashedWheelTimeout prev;
/** 对应的桶 */
HashedWheelBucket bucket;
其构造函数如下:
/**
* 构造函数
* @param timer Timer
* @param task 任务
* @param deadline 截止时间
*/
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
2.3.4 HashedWheelBucket
HashedWheelBucket表示一个轮子的格子,例如现实世界的钟表,12点到1点之间的格子即一个HashedWheelBucket。内部封装了HashedWheelTimeout的链表头和链表尾:
/** 链表头 */
private HashedWheelTimeout head;
/** 链表尾 */
private HashedWheelTimeout tail;
2.3.5 时间轮图样
通过上述对一些类的简单介绍,我们可以对netty的时间轮做一个综合的图解,如下所示:
- startTime:表示时间轮启动第一个任务的开始时间,通过startTime和tickDuration就能计算出下一次tick的时间点
- bucket:一圈有多少个bucket,每个bucket内部维护HashedWheelTimeout双向链表,当tick转动到指定的bucket就获取对应的HashedWheelTimeout执行任务;
- tick:相当于指针,每次跳动都会自增1,跳动一圈相当于跳动HashedWheelBucket数组的容量,上图中表示转动一圈会跳动4次。通过按位与(Array.length - 1)计算即可得到当前数组的索引
- tickDuration:刻度间隔,表示tick跳动一次所消耗的时间。
有了上述的内容和任务的触发时间,我们就可以计算出很多东西,如任务的剩余圈数、任务所在的HashedWheelBucket索引的位置(添加任务)。
例如,现在的startTime为2023-05-19 00:00:00,tickDuration为2秒,HashedWheelBucket数组容量为4(变量为length,即转动一圈是8秒), 我添加一个任务下一次执行时间为2023-05-19 00:00:10(变量为deadline),tick从0开始转动。通过这些信息,怎么计算任务所在的HashedWheelBucket数组的位置?怎么计算任务的剩余圈数?
对于上述的问题,就是个小学数学问题,只是我们需要将上述信息转换为数学方程式,如下:
我们知道tick转动一圈是4次,8秒,需要转动第5次才能执行该任务,那么任务应当是在数组的索引为1的位置上,并且剩余的圈数是1。
那么我们假设剩余圈数为x,那么x乘以length加上当前已经跳动的tick次数就表示任务总共需要跳动多少次,而deadline除以tickDuration就可以表示其位置,也就是任务总共需要跳动多少次,而所以方程式可以如下:
x * length + tick = deadline / tickDuraion
最后得出剩余圈数 x = (deadline / tickDuraion - tick) / length
而位置就是deadline除以tickDuration,但是需要注意当前tick的值,如果当前tick的值较大则应当是当前tick的值(因为tick是一直在跳动的,可能转动后任务才添加进来的情况),最后得出位置为:
index = max((deadline/tickDuration), tick) & (length -1)
经过上述的分析,再来分析代码就很简单了。
2.3.6 newTimeout执行任务
添加一个任务使用的函数是 newTimeout(TimerTask, long, TimeUnit) ,代码如下:
/**
* 在指定的{@code delay}延迟之后调度一下(只调度一次){@link TimerTask}任务
* @param task 任务
* @param delay 延迟时间
* @param unit 时间单位
* @return Timeout
*/
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
//参数校验 Start
checkNotNull(task, "task");
checkNotNull(unit, "unit");
//参数校验 End
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//不能超过最大的pending数 Start
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//不能超过最大的pending数 End
start();//启动(第一次会阻塞)
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;//计算截止时间, startTime是在start()方法中计算出的
//防止溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);//创建Timeout
timeouts.add(timeout);//添加到队列中
return timeout;
}
先校验相关参数,然后会调用start() 函数启动,而启动的过程其实对startTime赋值,以及驱动线程;最后计算出任务的下次执行时间,创建HashedWheelTimeout并添加到队列timeouts队列中。
start() 函数如下:
/**
* 启动线程{@link #workerThread}, 通过构造函数我们知道workerThread中的任务是{@link #worker},
* 也就是会调用{@link Worker#run()}方法
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {//worker状态
case WORKER_STATE_INIT://当前状态是初始化
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {//更新为启动状态
workerThread.start();//启动
}
break;
case WORKER_STATE_STARTED://当前已启动
break;
case WORKER_STATE_SHUTDOWN://当前已停止
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
//workerThread启动成功后会重置startTime值, 也会唤醒startTimeInitialized Start
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
//ignore
}
}
//workerThread启动成功后会重置startTime值, 也会唤醒startTimeInitialized End
}
当第一次添加任务的时候,HashedWheelTimer的状态值为INIT,所以会将INIT改为STARTED,并启动Worker线程。注意,这里会使用CountDownLatch等待,等待Worker线程释放锁,也就是说第一次添加任务会简单阻塞。那么我们看Worker的启动逻辑。
2.3.7 Worker线程运行
Worker在整个生命周期表示时钟,是对指针tick进行不停的跳动,跳动一次获取对应bucket位置上的Timeout并执行相关任务。其run() 方法如下:
@Override
public void run() {
//初始化startTime开始时间 Start
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}
//初始化startTime开始时间 End
startTimeInitialized.countDown();//唤醒start()方法
//主体逻辑 Start
do {
final long deadline = waitForNextTick();//等待时间轮的指针转动, 并返回时间轮触发任务的时间线
if (deadline > 0) {
int idx = (int) (tick & mask);//wheel数组的下标
processCancelledTasks();//处理已被取消的任务
HashedWheelBucket bucket = wheel[idx];//获取当前刻度对应的桶
transferTimeoutsToBuckets();//将队列中的Timeout传输到对应的桶上
bucket.expireTimeouts(deadline);//运行任务当前时间线以内的
tick++;//下一轮
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
//主体逻辑 End
//程序结束了 Start
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);//清空桶
}
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
//程序结束了 End
}
在线程任务中,对startTime赋值之后,就调用了CountDownLatch.countDown()释放锁了,所以在上述start() 方法中就会唤醒,再往上的话就是newTimeout(TimerTask, long, TimeUnit) 函数就可以计算出任务的下一次触发时间点了。
Worker线程的主体逻辑:
- 只要HashedWheelTimer的状态没有变更,则一直在循环当中
- 使用函数waitForNextTick() 等待指针的下一次跳动,并返回指针下一次跳动的时间点。假如现在是00:00:00,tickDuration为2秒一次,那么下一次被激活应当是00:00:02,而如果时间还没到00:00:02的话则应当被阻塞
- processCancelledTasks()表示处理已经被注销的任务,有兴趣可自行研究
- 获取当前bucket
- 使用函数transferTimeoutsToBuckets()将队列timeouts的所有任务传输到对应的bucket上
- bucket.expireTimeouts(long) 表示执行deadline时间点之前的所有任务(当前的bucket)
2.3.7.1 waitForNextTick等待下次跳动
waitForNextTick() 代码如下:
/**
* 计算下一次的截止时间
* @return 下一次截止时间
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);//下一个刻度的截止时间
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;//触发的时间线
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//当前时间是否到了下一个刻度的运行时间
if (sleepTimeMs <= 0) {//运行
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
if (sleepTimeMs == 0) {
sleepTimeMs = 1;
}
}
try {
Thread.sleep(sleepTimeMs);//未到时间轮的转动截止时间
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
使用Thread.sleep(long) 等待下次指针的跳动
2.3.7.2 transferTimeoutsToBuckets将任务传输到对应的bucket上
transferTimeoutsToBuckets() 代码如下:
/**
* 将Timeout传输到桶中
*/
private void transferTimeoutsToBuckets() {
for (int i = 0; i < 100000; i++) {//传输100000个值(当前队列最大值)
//获取Timeout Start
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
//获取Timeout End
//Timeout状态为已取消 Start
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
//Timeout状态为已取消 End
//计算剩余的圈数 Start
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
//计算剩余的圈数 End
//计算timeout对象应当在哪个桶里 Start
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
//计算timeout对象应当在哪个桶里 End
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
通过函数newTimeout(TimerTask, long, TimeUnit) 函数我们知道,添加任务是将任务添加到timeouts的队列中,那么这里就是将队列的数据传输到对应的bucket上,所以会计算出剩余的圈数和所在的位置,计算过程我们在上文 “2.3.5 时间轮图样” 部分已经详细介绍过了,这里不做算法解析了。
2.3.7.3 执行任务
执行任务是执行当前bucket的任务,即HashedWheelBucket.expireTimeouts(long) ,代码如下:
/**
* 执行所有超时的任务
* @param deadline 截止时间
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;//链表头
//处理所有超时的任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;//链表头部的下一个元素
if (timeout.remainingRounds <= 0) {//到了转动桶
next = remove(timeout);
if (timeout.deadline <= deadline) {//到运行时间了
timeout.expire();//运行任务
} else {
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {//被取消了
next = remove(timeout);//移除
} else {//未到转动轮数
timeout.remainingRounds--;
}
timeout = next;
}
}
如果任务HashedWheelTimeout没有剩余的圈数了,就可以执行了,执行任务会调用HashWheelTimeout.expire() 函数,代码如下:
/**
* 执行任务, 一般是到期了才会调用该方法
*/
public void expire() {
//变更状态 Start
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
//变更状态 End
try {
timer.taskExecutor.execute(this);//执行
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
+ " for execution.", t);
}
}
}
/**
* 任务运行, 这里调用的是{@link #task}的run方法
*/
@Override
public void run() {
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
将this交给了Executor执行,this的run() 函数就会调用TimerTask.run(Timeout) 方法,即真正用户的任务。
到这里netty的时间轮算法就介绍完毕了,对于其他的一些函数可自行阅读其源码。
评论区