作者:突击者 | 来源:互联网 | 2023-08-27 09:26
概述数据结构任务队列任务队列使用的自定义的队列 DelayedWorkQueue,是一个基于二叉堆的数据结构。提供了扩容、插入、弹出等基本操作。其中,堆顶元素时
概述
数据结构
任务队列
任务队列使用的自定义的队列 DelayedWorkQueue,是一个基于二叉堆的数据结构。提供了扩容、插入、弹出等基本操作。
其中,堆顶元素时待执行时间最早的元素。
/**
* 获取任务对象
*/
public RunnableScheduledFuture> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture> first = queue[0];//从数组首部获取最近的待执行任务。
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)// 判断剩余时间是否小于0, >0继续等待剩余时间。
return finishPoll(first);// 堆弹出操作
first = null; // don't retain ref while waiting
if (leader != null)// 只有获得leader节点的线程才会等待delay时间再去尝试获取任务对象,避免其它线程不必要的等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();// 当leader节点获取可执行任务后,唤醒其它等待线程
lock.unlock();
}
}
任务对象:ScheduledFutureTask
提交给线程池的任务被封装成ScheduledFutureTask类型,该类实现了Deleyed对象,用于任务执行时间的控制。
private class ScheduledFutureTask
extends FutureTask implements RunnableScheduledFuture {
/** Sequence number to break ties FIFO */ //用于指定任务添加的序号,当执行时间相同时,控制执行顺序
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
private long time;// 通过具体的待执行时间点转换成的时间戳形式。(Date -> long)
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;//时间的执行周期
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture outerTask = this; //下一个待执行的任务
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex; //在堆中的索引,用于支持快速取消
public long getDelay(TimeUnit unit) {// 计算还有多少时间去执行任务
return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {// 用于将任务保存到队列中时,判断任务的保存顺序,越早执行的,放在前面,实现一样的,比较序号值。
......
}
/**
* Returns {@code true} if this is a periodic (not a one-shot) action.
*
* @return {@code true} if periodic
*/
public boolean isPeriodic() {
return period != 0;
}
/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() { //实际运行
boolean periodic = isPeriodic();// 判断是否周期任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();// 非周期运行一次
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();// 更新time值,确定下次自行时间
reExecutePeriodic(outerTask);// 将任务保存到任务队列中
}
}
}
关于触发时间的计算
/**
* Returns the trigger time of a delayed action.
*/
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay <0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {// 根据延期时间,加上当前时间戳的值,赋值给任务对象的time属性。
return now() +
((delay <(Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}