并发队列-无界阻塞延迟队列DelayQueue原理探究

1
2
转载自:http://ifeve.com/%E5%B9%B6%E5%8F%91%E9%98%9F%E5%88%97-%E6%97%A0%E7%95%8C%E9%98%BB%E5%A1%9E%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97delayqueue%E5%8E%9F%E7%90%86%E6%8E%A2%E7%A9%B6/
最近在开发中正好有类似场景。

前言

DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。

DelayQueue类图结构

如图DelayQueue中内部使用的是PriorityQueue存放数据,使用ReentrantLock实现线程同步,可知是阻塞队列。另外队列里面的元素要实现Delayed接口,一个是获取当前剩余时间的接口,一个是元素比较的接口,因为这个是有优先级的队列。

offer操作

插入元素到队列,主要插入元素要实现Delayed接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {(2
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,peek并不一定是当前添加的元素,如果(2)为true,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的线程,通知他们队列里面有元素了。

take操作

获取并移除队列首元素,如果队列没有过期元素则等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//获取但不移除队首元素(1)
E first = q.peek();
if (first == null)
available.await();//(2)
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)//(3)
return q.poll();
else if (leader != null)//(4)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;//(5)
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)//(6)
available.signal();
lock.unlock();
}
}

第一次调用take时候由于队列空,所以调用(2)把当前线程放入available的条件队列等待,当执行offer并且添加的元素就是队首元素时候就会通知最先等待的线程激活,循环重新获取队首元素,这时候first假如不空,则调用getdelay方法看该元素海剩下多少时间就过期了,如果delay<=0则说明已经过期,则直接出队返回。否者看leader是否为null,不为null则说明是其他线程也在执行take则把该线程放入条件队列,否者是当前线程执行的take方法,则调用(5)await直到剩余过期时间到(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,该线程会重新竞争得到锁,重新进入循环。

(6)说明当前take返回了元素,如果当前队列还有元素则调用singal激活条件队列里面可能有的等待线程。leader那么为null,那么是第一次调用take获取过期元素的线程,第一次调用的线程调用设置等待时间的await方法等待数据过期,后面调用take的线程则调用await直到signal。

poll操作

获取并移除队头过期元素,否者返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
//如果队列为空,或者不为空但是队头元素没有过期则返回null
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}

一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class DelayedEle implements Delayed {

private final long delayTime; //延迟时间
private final long expire; //到期时间
private String data; //数据

public DelayedEle(long delay, String data) {
delayTime = delay;
this.data = data;
expire = System.currentTimeMillis() + delay;
}

/**
* 剩余时间=到期时间-当前时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}

/**
* 优先队列里面优先级规则
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DelayedElement{");
sb.append("delay=").append(delayTime);
sb.append(", expire=").append(expire);
sb.append(", data='").append(data).append('\'');
sb.append('}');
return sb.toString();
}
}

public static void main(String[] args) {
DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();

DelayedEle element1 = new DelayedEle(1000,"zlx");
DelayedEle element2 = new DelayedEle(1000,"gh");

delayQueue.offer(element1);
delayQueue.offer(element2);

element1 = delayQueue.take();
System.out.println(element1);
}

使用场景

TimerQueue的内部实现
ScheduledThreadPoolExecutor中DelayedWorkQueue是对其的优化使用