侧边栏壁纸
博主头像
林雷博主等级

斜月沉沉藏海雾,碣石潇湘无限路

  • 累计撰写 132 篇文章
  • 累计创建 47 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

12、时间轮算法和Netty的实现

林雷
2023-05-28 / 0 评论 / 0 点赞 / 386 阅读 / 21,097 字

response

一 时间轮

时间轮方案将现实世界中的时钟概念引入到软件设计中,主要思路是定义一个时钟周期(比如时钟的12小时)和步长(比如时钟的一秒走一次),当指针每走一步的时候,会获取当前时钟刻度上挂载的任务并执行。所以说时间轮算法轮询不再遍历所有任务,而是遍历时间刻度。

1.1 时间轮算法简介

延迟队列的使用场景,时间轮都可以使用,比如:

  • 心跳检测(维护TCP连接)
  • 会话、请求超时
  • 消息延迟推送
  • 业务超时订单(30分钟内支付)

时间轮算法也存在一定的局限,比如按1秒为一个时间刻度,那么一天有86400个刻度,当我添加两个任务,一个是20秒后执行,另一个是86400秒后执行,那么其中大部分轮询都是空轮询,而且会浪费内存空间(每个刻度都有自己的任务队列)。

1.1.1 单层时间轮

单层时间轮就是我们上面介绍的,轮询线程遍历时间刻度,好比指针不断在时钟上旋转、遍历。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickDuration),时间轮的时间格个数是固定的,如下图所示:
TimingWheel-1.1.1-1

如上图所示,相邻bucket到期时间的间隔为1秒,从0秒开始计时,1秒到期的定时任务挂在bucket=1下,2秒到期的定时任务挂在bucket=2下,第18秒到期的定时任务挂在bucket=2下,但是此时的round(圈数)=1,如果指针pointer在第一圈时,不会执行round != 0的任务。任务bucket存储到期的任务,当指针pointer指向哪时,获取bucket所有round为0的任务并执行,而round大于0的则对round递减,bucket通常使用链表存储(数组的可能存在移动,导致耗时)
如果上述的时间轮通过数组实现,可以很方便通过下标定位到定时任务链路,因此,添加、删除、执行定时任务的时间复杂度为O(1)

时间轮常见数据结构模型如下:

  • pointer/tick:指针/刻度,随着时间的推移,指针不停向前移动
  • bucket:时间轮由bucket组成,如上图有12个bucket,每个bucket下挂载了未来要到期的节点
  • tickDuration:刻度间隔,如上图,总共12个bucket,这个值设置为1秒,那么相邻两个bucket的间隔就是1秒
  • round:任务剩余的圈数(通常是任务的数据结构)

时间轮使用一个表盘指针(pointer/tick),用来表示时间轮当前指针跳动的次数,可以用 tickDuration * (pointer + 1)来表示下一次到期的任务。


1.1.2 分层时间轮

单层时间轮存在的缺点,如转动圈数大但是任务量小,会更多的空耗CPU,占用更多的内存等。那么为了解决单层时间轮的不足,就有了多层时间轮。
分层时间轮采用层级联动的方式,具有以下特点:

  • 不做遍历计算剩余的round,每一个bucket下的任务都是应该执行的
  • 当任务执行时间超过当前刻度范围时,进入下一层时间轮范围
  • 定时任务通过升级和降级来转移队列中的位置

如下图所示:
TimingWheel-1.1.2-1

如上图所示,分层时间轮通过升级和降级来联动层级,执行时间超过当前层级最大刻度时,就会进行升级,进入下一层时间轮。
而接下来我们研读的Netty时间轮的实现主要是单层时间轮,而多层时间轮可以阅读kafka的设计。

1.2 时间轮与阻塞队列

了解了上述的时间轮,并且阅读过 11、ScheduledThreadPoolExecutor源码解析 文章,可以对时间轮以及阻塞队列的定时做一个对比:

  • 时间轮算法,刻度/指针在不停的跳动,跳动一格,就执行对应bucket上挂载的任务,所以获取任务的时间复杂度是O(1);而ScheduledThreadPoolExecutor中使用的是DelayedWorkQueue延迟阻塞队列,底层是小顶堆的数组结构,当获取任务后存在数组的移动,时间复杂度是O(logN)
  • 时间轮算法,在插入任务时,只需要通过触发时间、tickDuration、tick/pointer和bucket数计算出所在的bucket和剩余的round数,直接插入即可,时间复杂度是O(1);而ScheduledThreadPoolExecutor中的DelayedWorkQueue使用的数组形式的优先级(触发时间排列)队列,插入时数组存在移动的可能,时间复杂度是O(logN)
  • 时间轮算法,在任务间的间隔比较大,bucket较多的情况下存在空耗CPU的情况;相对这种情况,DelayedWorkQueue会阻塞,几乎不存在空跑CPU
  • 任务量较大时建议使用时间轮算法,如集中式的调度系统。

接下来我们看Netty的时间轮算法的实现。

二 Netty时间轮实现

我这里使用的是Netty 4.1.77.final为原型去理解的,相关代码在common项目中,具体可自行去github上克隆代码查阅。
Netty时间轮算法的实现相对来说比较简单,它是以单层时间轮为理论实现的,在会话超时检测、维持心跳等方面有具体的应用。
Netty实现时间轮有三个部分组成:

  • Timer:非JDK中的Timer,而是netty自定义的Timer接口,主要是提供对外的API,用户调用相关API执行任务
  • TimerTask:Timer执行的用户任务接口
  • Timeout:Timeout记录了任务相关的会话信息,如截止时间、剩余轮数、对应的轮子的哪一个bucket等。

2.1 示例

先看一组示例:

public class WheelTimerTest {
    public static void main(String[] args) throws Exception {
        final ExecutorService executorService = Executors.newFixedThreadPool(4);
        //定义定时器, 每1秒中指针跳动一次, 跳动一圈是60秒
        Timer timer = new HashedWheelTimer(new ThreadFactory() {
            final AtomicInteger counter = new AtomicInteger();
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程" + counter.getAndIncrement());
            }
        }, 1, TimeUnit.SECONDS, 60, false, -1, executorService);

        //执行周期任务(每5秒执行一次) Start
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println(getTime() + " - 周期任务执行......");
                timer.newTimeout(this, 4, TimeUnit.SECONDS);
            }
        }, 2, TimeUnit.SECONDS);
        //执行周期任务(每5秒执行一次) End

        //执行一次任务 Start
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println(getTime() + " - 一次性任务执行结束.....");
            }
        }, 2, TimeUnit.SECONDS);
        //执行一次任务 End
    }

    public static String getTime() {
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    }
}

运行效果如下:

2023-05-19 19:00:46 - 一次性任务执行结束.....
2023-05-19 19:00:46 - 周期任务执行......
2023-05-19 19:00:51 - 周期任务执行......
2023-05-19 19:00:56 - 周期任务执行......
2023-05-19 19:01:01 - 周期任务执行......
2023-05-19 19:01:06 - 周期任务执行......

上述示例中使用netty提供的HashedWheelTimer作为Timer的实现类来执行任务的,每一秒跳动一次指针,周期性任务每5秒运行一次(延迟时间+跳动时间),一个时间轮周期是60秒。
从上述示例中也能看出,netty并没有提供周期性任务的函数,只有一次性的任务,而周期性的任务就是在任务中继续将自己(this)放到时间轮里,以便下次执行。

2.2 三大组件

netty的时间轮三大组件,即Timer、TimerTask、Timeout,三个组件分别饰演不同的功能。

2.2.1 Timer

Timer是提供给用户执行任务的定时器接口,其中只有两个函数:

public interface Timer {
    /**
     * 在指定的{@code delay}延迟之后调度一下(只调度一次){@link TimerTask}任务
     * @param task 任务
     * @param delay 延迟时间
     * @param unit 时间单位
     * @return Timeout
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    /**
     * 停止
     * @return Timeout
     */
    Set<Timeout> stop();
}

newTimeout(TimerTask, long, TimeUnit) 表示延迟多久后执行一次用户的任务,而任务使用TimerTask的实现类。netty中唯一的实现类是HashedWheelTimer。

2.2.2 TimerTask

TimerTask表示执行器中要执行的任务接口,原型如下:

public interface TimerTask {
    /**
     * 任务的执行方法
     * @param timeout Timeout
     * @throws Exception 异常
     */
    void run(Timeout timeout) throws Exception;
}

只定义了run(Timeout) 方法,表示运行指定的逻辑。

2.2.3 Timeout

Timeout表示任务在运行的过程中的一些数据,如关联的Timer、关联的TimerTask对象、是否已到期/过期等功能,可以表示任务运行的一次会话,其原型如下:

public interface Timeout {
    /**
     * 获取关联的{@link Timer}对象
     * @return Timer
     */
    Timer timer();

    /**
     * 获取关联的任务{@link TimerTask} 对象
     * @return TimerTask
     */
    TimerTask task();

    /**
     * 判断关联的任务{@link TimerTask}是否已过期
     * @return true/false
     */
    boolean isExpired();

    /**
     * 判断关联的任务{@link TimerTask}是否已被取消
     * @return true/false
     */
    boolean isCancelled();

    /**
     * 取消
     * @return true/false
     */
    boolean cancel();
}

2.3 HashedWheelTimer的实现

HashedWheelTimer是netty提供的时间轮算法的实现组件,HashedWheelTimer是Timer子类。

2.3.1 相关全局字段

HashedWheelTimer的全局字段有很多,有一些是辅助性的,有一些是功能性的,这里我们不详细介绍辅助性的功能,主要关注功能性的,相关字段如下:

/** 日志 */
static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);

/** 实例的计数器 */
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();

private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
/** 实例的计数限制 */
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);

private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);

/** {@link #workerState}的原子更新器 */
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");

private final ResourceLeakTracker<HashedWheelTimer> leak;
/** 工作任务 */
private final Worker worker = new Worker();
/** 工作线程 */
private final Thread workerThread;

/** {@link #workerState}对应的init */
public static final int WORKER_STATE_INIT = 0;
/** {@link #workerState}对应的start */
public static final int WORKER_STATE_STARTED = 1;
/** {@link #workerState}对应的shutdown */
public static final int WORKER_STATE_SHUTDOWN = 2;

/** 运行状态。0: init, 1: started, 2: shutdown */
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState;

/** 刻度间隔 */
private final long tickDuration;

/** 轮子桶 */
private final HashedWheelBucket[] wheel;
/** 掩码, 是{@link #wheel}的长度 - 1, 为了从{@link #wheel}获取元素时避免数组越界  */
private final int mask;
/** 等待初始化开始时间{@link #startTime} */
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
/** Timeout的队列 */
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
/** 已经被取消的Timeout的队列 */
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
/** pending Timeout的数量 */
private final AtomicLong pendingTimeouts = new AtomicLong(0);
/** 最大的pending Timeout的数量 */
private final long maxPendingTimeouts;
/** 执行器 */
private final Executor taskExecutor;

/** 开始时间 */
private volatile long startTime;

主要的功能性字段如下:

  • worker:是一个Runnable子类,核心的逻辑线程,表示跳动指针、执行任务的线程
  • workerThread:线程类,运行Worker对象
  • workerState:当前Timer的工作状态,其值有0(init)、1(started)、2(shutdown),初始值是0(init)
  • tickDuration:每个刻度的间隔,单位是纳秒
  • wheel:HashedWheelBucket的数组,表示轮子中的每一个桶,内部关联HashedWheelTimeout链表
  • timeouts:HashedWheelTimeout的队列,表示添加的任务队列,Worker线程会消费该队列的内容以执行任务。
  • taskExecutor:任务的执行器,表示运行用户TimerTask.run()方法的执行器,默认的执行器是ImmediateExecutor(不是以线程方式执行用户任务,而是直接调用TimerTask.run()方法)
  • startTime:定时器的开始时间,当第一个任务进入时赋值

2.3.2 构造函数

HashedWheelTimer的构造函数有多个,但是最终都会指向同一个:

/**
 * 构造函数, 这里的线程工厂使用 {@link Executors#defaultThreadFactory()}
 */
public HashedWheelTimer() {
    this(Executors.defaultThreadFactory());
}

/**
 * 构造函数, 这里的线程工厂使用 {@link Executors#defaultThreadFactory()}
 * @param tickDuration 时间间隔
 * @param unit 时间单位
 */
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
    this(Executors.defaultThreadFactory(), tickDuration, unit);
}

/**
 * 构造函数, 这里的线程工厂使用 {@link Executors#defaultThreadFactory()}
 * @param tickDuration 时间间隔
 * @param unit 间隔单位
 * @param ticksPerWheel 每个轮子bucket的数量
 */
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}

/**
 * 构造函数, 这里的tickDuration时间间隔设置为100毫秒
 * @param threadFactory 线程创建工厂
 */
public HashedWheelTimer(ThreadFactory threadFactory) {
    this(threadFactory, 100, TimeUnit.MILLISECONDS);
}

/**
 * 构造函数, 这里的每个轮子的是有512个bucket
 * @param threadFactory 线程创建工厂
 * @param tickDuration 刻度间隔
 * @param unit 时间单位
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
    this(threadFactory, tickDuration, unit, 512);
}

/**
 * 构造函数
 * @param threadFactory 线程创建工厂
 * @param tickDuration 刻度间隔
 * @param unit 时间单位
 * @param ticksPerWheel 每个轮子的刻度
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}

/**
 * 构造函数
 * @param threadFactory 线程创建工厂
 * @param tickDuration 刻度间隔
 * @param unit 时间单位
 * @param ticksPerWheel 每个轮子bucket的数量
 * @param leakDetection true表示leak detection总是被启用, false表示只有当worker thread不是daemon线程时才会被启用
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}

/**
 * 构造函数, 这里使用的执行器是{@link ImmediateExecutor}
 * @param threadFactory 线程创建工厂
 * @param tickDuration 刻度间隔
 * @param unit 时间单位
 * @param ticksPerWheel 每个轮子bucket的数量
 * @param leakDetection true表示leak detection总是被启用, false表示只有当worker thread不是daemon线程时才会被启用
 * @param maxPendingTimeouts 最大的Timeout pending数量
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
            maxPendingTimeouts, ImmediateExecutor.INSTANCE);
}

/**
 * 构造函数, 这里使用的执行器是{@link ImmediateExecutor}
 * @param threadFactory 线程创建工厂
 * @param tickDuration 刻度间隔
 * @param unit 时间单位
 * @param ticksPerWheel 每个轮子bucket的数量
 * @param leakDetection true表示leak detection总是被启用, false表示只有当worker thread不是daemon线程时才会被启用
 * @param maxPendingTimeouts 最大的Timeout pending数量
 * @param taskExecutor 执行任务的执行器
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                        boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) {
    //参数校验 Start
    checkNotNull(threadFactory, "threadFactory");
    checkNotNull(unit, "unit");
    checkPositive(tickDuration, "tickDuration");
    checkPositive(ticksPerWheel, "ticksPerWheel");
    this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
    //参数校验 End

    wheel = createWheel(ticksPerWheel);//创建轮子, 是2的倍数
    mask = wheel.length - 1;//掩码

    long duration = unit.toNanos(tickDuration);//纳秒

    if (duration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    }

    if (duration < MILLISECOND_NANOS) {
        logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
                tickDuration, MILLISECOND_NANOS);
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        this.tickDuration = duration;
    }

    workerThread = threadFactory.newThread(worker);//创建线程

    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

    this.maxPendingTimeouts = maxPendingTimeouts;

    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

不同的构造函数使用的默认值不同,但是最终都会指向最后一个构造函数。最后一个构造函数会对相关变量进行赋值

2.3.2.1 创建桶

在构造函数中,会创建HashedWheelBucket数组,表示在一个时间轮周期内有多少个小格,即表示指针转动一圈的次数。
使用函数createWheel(int) 创建桶,代码如下:

/**
 * 创建轮子桶, 桶的数量是2的倍数, 并且是大于等于{@code ticksPerWheel}
 * @param ticksPerWheel 桶的数量参考
 * @return HashedWheelBucket数组
 */
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");//不能越界

    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//桶的数量
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];//创建数组
    for (int i = 0; i < wheel.length; i++) {
        wheel[i] = new HashedWheelBucket();//填充数组
    }
    return wheel;
}

/**
 * 计算数量, 结果为2的倍数, 并且大于或等于{@code ticksPerWheel}
 * @param ticksPerWheel 参考数量
 * @return 2的倍数
 */
private static int normalizeTicksPerWheel(int ticksPerWheel) {
    int normalizedTicksPerWheel = 1;
    while (normalizedTicksPerWheel < ticksPerWheel) {
        normalizedTicksPerWheel <<= 1;
    }
    return normalizedTicksPerWheel;
}

创建的逻辑保证HashedWheelBucket的数组的大小是2的倍数(通过函数normalizeTicksPerWheel(int) 保证),这里使用的是左移操作,位操作这里不做介绍,具体请看 3、位操作 文章。

2.3.2.2 创建线程

构造函数创建Thread(workerThread)使用的是工厂创建的,线程工厂我们可以自定义,也可以使用默认的线程工厂,线程的任务为Worker对象

workerThread = threadFactory.newThread(worker);//创建线程

而构造函数的其他代码是一些辅助性的字段的赋值,有兴趣可自行阅读。

2.3.3 HashedWheelTimeout

HashedWheelTimeout可以表示一次任务的会话,是一个链表的结构,这里我们先简单看一下其全局变量以及构造函数,至于具体的运行组织逻辑我们稍后再看。
全局变量如下:

/** {@link #state}状态值, 0, 表示init */
private static final int ST_INIT = 0;
/** {@link #state}状态值, 1, 表示cancelled, 取消 */
private static final int ST_CANCELLED = 1;
/** {@link #state}状态值, 2, 表示expired, 到期或过期了 */
private static final int ST_EXPIRED = 2;
/** {@link #state}的原子更新器 */
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

/** 关联的Timer */
private final HashedWheelTimer timer;
/** 关联的任务 */
private final TimerTask task;
/** 截止时间 */
private final long deadline;

/** 状态, 默认为INIT, 0 */
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;

/**
 * 剩余的轮数, 是在{@link Worker#transferTimeoutsToBuckets()}中计算的, 在被添加到{@link HashedWheelBucket}之前计算出来的
 * 比如一个时间轮, 有4个桶, 每个桶的刻度间隔是2s, 如果这个时候存放的延迟任务是10s后执行, 那么时间轮一圈下来不应当执行这个任务,
 * 1圈后的一格才能执行, 这个就表示剩余的圈数
 */
long remainingRounds;

/** 下一个元素 */
HashedWheelTimeout next;
/** 上一个元素 */
HashedWheelTimeout prev;

/** 对应的桶 */
HashedWheelBucket bucket;

其构造函数如下:

/**
 * 构造函数
 * @param timer Timer
 * @param task 任务
 * @param deadline 截止时间
 */
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
    this.timer = timer;
    this.task = task;
   this.deadline = deadline;
}

2.3.4 HashedWheelBucket

HashedWheelBucket表示一个轮子的格子,例如现实世界的钟表,12点到1点之间的格子即一个HashedWheelBucket。内部封装了HashedWheelTimeout的链表头和链表尾:

/** 链表头 */
private HashedWheelTimeout head;
/** 链表尾 */
private HashedWheelTimeout tail;

2.3.5 时间轮图样

通过上述对一些类的简单介绍,我们可以对netty的时间轮做一个综合的图解,如下所示:
TimingWheel-2.3.5-1

  • startTime:表示时间轮启动第一个任务的开始时间,通过startTime和tickDuration就能计算出下一次tick的时间点
  • bucket:一圈有多少个bucket,每个bucket内部维护HashedWheelTimeout双向链表,当tick转动到指定的bucket就获取对应的HashedWheelTimeout执行任务;
  • tick:相当于指针,每次跳动都会自增1,跳动一圈相当于跳动HashedWheelBucket数组的容量,上图中表示转动一圈会跳动4次。通过按位与(Array.length - 1)计算即可得到当前数组的索引
  • tickDuration:刻度间隔,表示tick跳动一次所消耗的时间。

有了上述的内容和任务的触发时间,我们就可以计算出很多东西,如任务的剩余圈数、任务所在的HashedWheelBucket索引的位置(添加任务)。

例如,现在的startTime为2023-05-19 00:00:00,tickDuration为2秒,HashedWheelBucket数组容量为4(变量为length,即转动一圈是8秒), 我添加一个任务下一次执行时间为2023-05-19 00:00:10(变量为deadline),tick从0开始转动。通过这些信息,怎么计算任务所在的HashedWheelBucket数组的位置?怎么计算任务的剩余圈数?


对于上述的问题,就是个小学数学问题,只是我们需要将上述信息转换为数学方程式,如下:
我们知道tick转动一圈是4次,8秒,需要转动第5次才能执行该任务,那么任务应当是在数组的索引为1的位置上,并且剩余的圈数是1。
那么我们假设剩余圈数为x,那么x乘以length加上当前已经跳动的tick次数就表示任务总共需要跳动多少次,而deadline除以tickDuration就可以表示其位置,也就是任务总共需要跳动多少次,而所以方程式可以如下:
x * length + tick = deadline / tickDuraion
最后得出剩余圈数 x = (deadline / tickDuraion - tick) / length
而位置就是deadline除以tickDuration,但是需要注意当前tick的值,如果当前tick的值较大则应当是当前tick的值(因为tick是一直在跳动的,可能转动后任务才添加进来的情况),最后得出位置为:
index = max((deadline/tickDuration), tick) & (length -1)


经过上述的分析,再来分析代码就很简单了。

2.3.6 newTimeout执行任务

添加一个任务使用的函数是 newTimeout(TimerTask, long, TimeUnit) ,代码如下:

/**
 * 在指定的{@code delay}延迟之后调度一下(只调度一次){@link TimerTask}任务
 * @param task 任务
 * @param delay 延迟时间
 * @param unit 时间单位
 * @return Timeout
 */
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    //参数校验 Start
    checkNotNull(task, "task");
    checkNotNull(unit, "unit");
    //参数校验 End

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    //不能超过最大的pending数 Start
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
    }
    //不能超过最大的pending数 End

    start();//启动(第一次会阻塞)

    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;//计算截止时间, startTime是在start()方法中计算出的

    //防止溢出
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);//创建Timeout
    timeouts.add(timeout);//添加到队列中
    return timeout;
}

先校验相关参数,然后会调用start() 函数启动,而启动的过程其实对startTime赋值,以及驱动线程;最后计算出任务的下次执行时间,创建HashedWheelTimeout并添加到队列timeouts队列中。

start() 函数如下:

/**
 * 启动线程{@link #workerThread}, 通过构造函数我们知道workerThread中的任务是{@link #worker},
 * 也就是会调用{@link Worker#run()}方法
 */
public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {//worker状态
        case WORKER_STATE_INIT://当前状态是初始化
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {//更新为启动状态
                workerThread.start();//启动
            }
            break;
        case WORKER_STATE_STARTED://当前已启动
            break;
        case WORKER_STATE_SHUTDOWN://当前已停止
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    //workerThread启动成功后会重置startTime值, 也会唤醒startTimeInitialized Start
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            //ignore
        }
    }
    //workerThread启动成功后会重置startTime值, 也会唤醒startTimeInitialized End
}

当第一次添加任务的时候,HashedWheelTimer的状态值为INIT,所以会将INIT改为STARTED,并启动Worker线程。注意,这里会使用CountDownLatch等待,等待Worker线程释放锁,也就是说第一次添加任务会简单阻塞。那么我们看Worker的启动逻辑。

2.3.7 Worker线程运行

Worker在整个生命周期表示时钟,是对指针tick进行不停的跳动,跳动一次获取对应bucket位置上的Timeout并执行相关任务。其run() 方法如下:

@Override
public void run() {
    //初始化startTime开始时间 Start
    startTime = System.nanoTime();
    if (startTime == 0) {
        startTime = 1;
    }
    //初始化startTime开始时间 End

    startTimeInitialized.countDown();//唤醒start()方法

    //主体逻辑 Start
    do {
        final long deadline = waitForNextTick();//等待时间轮的指针转动, 并返回时间轮触发任务的时间线
        if (deadline > 0) {
            int idx = (int) (tick & mask);//wheel数组的下标
            processCancelledTasks();//处理已被取消的任务
            HashedWheelBucket bucket = wheel[idx];//获取当前刻度对应的桶
            transferTimeoutsToBuckets();//将队列中的Timeout传输到对应的桶上
            bucket.expireTimeouts(deadline);//运行任务当前时间线以内的
            tick++;//下一轮
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    //主体逻辑 End

    //程序结束了 Start
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);//清空桶
    }

    for (; ; ) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    processCancelledTasks();
    //程序结束了 End
}

在线程任务中,对startTime赋值之后,就调用了CountDownLatch.countDown()释放锁了,所以在上述start() 方法中就会唤醒,再往上的话就是newTimeout(TimerTask, long, TimeUnit) 函数就可以计算出任务的下一次触发时间点了。
Worker线程的主体逻辑:

  • 只要HashedWheelTimer的状态没有变更,则一直在循环当中
  • 使用函数waitForNextTick() 等待指针的下一次跳动,并返回指针下一次跳动的时间点。假如现在是00:00:00,tickDuration为2秒一次,那么下一次被激活应当是00:00:02,而如果时间还没到00:00:02的话则应当被阻塞
  • processCancelledTasks()表示处理已经被注销的任务,有兴趣可自行研究
  • 获取当前bucket
  • 使用函数transferTimeoutsToBuckets()将队列timeouts的所有任务传输到对应的bucket上
  • bucket.expireTimeouts(long) 表示执行deadline时间点之前的所有任务(当前的bucket)

2.3.7.1 waitForNextTick等待下次跳动

waitForNextTick() 代码如下:

/**
 * 计算下一次的截止时间
 * @return 下一次截止时间
 */
private long waitForNextTick() {
    long deadline = tickDuration * (tick + 1);//下一个刻度的截止时间

    for (; ; ) {
        final long currentTime = System.nanoTime() - startTime;//触发的时间线
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//当前时间是否到了下一个刻度的运行时间

        if (sleepTimeMs <= 0) {//运行
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }

        if (PlatformDependent.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
            if (sleepTimeMs == 0) {
                sleepTimeMs = 1;
            }
        }

        try {
            Thread.sleep(sleepTimeMs);//未到时间轮的转动截止时间
        } catch (InterruptedException ignored) {
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

使用Thread.sleep(long) 等待下次指针的跳动

2.3.7.2 transferTimeoutsToBuckets将任务传输到对应的bucket上

transferTimeoutsToBuckets() 代码如下:

/**
 * 将Timeout传输到桶中
 */
private void transferTimeoutsToBuckets() {
    for (int i = 0; i < 100000; i++) {//传输100000个值(当前队列最大值)
        //获取Timeout Start
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        //获取Timeout End

        //Timeout状态为已取消 Start
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            continue;
        }
        //Timeout状态为已取消 End

        //计算剩余的圈数 Start
        long calculated = timeout.deadline / tickDuration;
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        //计算剩余的圈数 End

        //计算timeout对象应当在哪个桶里 Start
        final long ticks = Math.max(calculated, tick);
        int stopIndex = (int) (ticks & mask);
        //计算timeout对象应当在哪个桶里 End

        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

通过函数newTimeout(TimerTask, long, TimeUnit) 函数我们知道,添加任务是将任务添加到timeouts的队列中,那么这里就是将队列的数据传输到对应的bucket上,所以会计算出剩余的圈数和所在的位置,计算过程我们在上文 “2.3.5 时间轮图样” 部分已经详细介绍过了,这里不做算法解析了。

2.3.7.3 执行任务

执行任务是执行当前bucket的任务,即HashedWheelBucket.expireTimeouts(long) ,代码如下:

/**
 * 执行所有超时的任务
 * @param deadline 截止时间
 */
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;//链表头

    //处理所有超时的任务
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;//链表头部的下一个元素
        if (timeout.remainingRounds <= 0) {//到了转动桶
            next = remove(timeout);
            if (timeout.deadline <= deadline) {//到运行时间了
                timeout.expire();//运行任务
            } else {
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {//被取消了
            next = remove(timeout);//移除
        } else {//未到转动轮数
            timeout.remainingRounds--;
        }
        timeout = next;
    }
}

如果任务HashedWheelTimeout没有剩余的圈数了,就可以执行了,执行任务会调用HashWheelTimeout.expire() 函数,代码如下:

/**
 * 执行任务, 一般是到期了才会调用该方法
 */
public void expire() {
    //变更状态 Start
    if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
        return;
    }
    //变更状态 End

    try {
        timer.taskExecutor.execute(this);//执行
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
                    + " for execution.", t);
        }
    }
}

/**
 * 任务运行, 这里调用的是{@link #task}的run方法
 */
@Override
public void run() {
    try {
        task.run(this);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
        }
    }
}

将this交给了Executor执行,this的run() 函数就会调用TimerTask.run(Timeout) 方法,即真正用户的任务
到这里netty的时间轮算法就介绍完毕了,对于其他的一些函数可自行阅读其源码。

0

评论区