跳至主要內容

DelayQueue(无界阻塞队列)

Yaien Blog原创大约 4 分钟并发线程安全Queue

DelayQueue(无界阻塞队列)

DelayQueue是Java并发包java.util.concurrent中的一个类,它实现了BlockingQueue接口。这是一个无界阻塞队列,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。

原理

  1. 使用优先级队列实现的无界阻塞队列
  2. 优先级队列(PriorityQueue)与PriorityBlockingQueue类似,不过没有阻塞功能
  3. 线程安全使用ReentrantLock来实现,通过Condition available控制阻塞条件
  4. 入队不阻塞,并且队列没有边界,与优先级队列入队相同
  5. 出队为空时阻塞,不为空时检查堆顶元素过期时间,小于等于0则出队,否则表示元素还未过期,阻塞
  6. 阻塞时先判断leader线程是否为空(为了保证优先级),不为空表示已经有线程阻塞了,为空则将当前线程设置为leader,并按照过期时间进行阻塞

特性

  • 队列中的元素必须实现Delayed接口。在创建元素时,可以定义该元素的存活时间,当从队列获取元素时,只有满足该存活时间的元素才能被取出。
  • 向队列中插入元素的操作(例如put和offer)永远不会被阻塞。只有当队列为空,或者队列中的元素没有到达其存活时间时,获取元素的操作(例如take和poll)才会被阻塞。

核心属性

  • final PriorityQueue q: 实际存储队列元素的数据结构。它是一个优先队列,队列中的元素按到期时间排序,最早到期的元素在队列的头部。

  • Thread leader: 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程。

  • final transient ReentrantLock lock: 锁,用于控制对队列的并发访问。

  • final Condition available: 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知。

入队源码

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 入队
        q.offer(e);
        if (q.peek() == e) {
            // 若入队的元素位于队列头部,说明当前元素延迟最小
            // 将 leader 置空
            leader = null;
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        }
        return true;
    } finally {
        lock.unlock(); // 解锁,真正唤醒阻塞的线程
    }
}

出队源码

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)   
            if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
                available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
            else {
                long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间             
                if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                    return q.poll();
                
                // 如果delay大于0 ,则下面要阻塞了
                // 将first置为空方便gc
                first = null; 
                // 如果有线程争抢的Leader线程,则进行无限期等待。
                if (leader != null)
                    available.await();
                else {
                    // 如果leader为null,把当前线程赋值给它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待剩余等待时间
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
        if (leader == null && q.peek() != null)
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        // 解锁,真正唤醒阻塞的线程
        lock.unlock();
    }
}

使用场景

  • 任务调度:DelayQueue可以用于任务调度,例如一个任务需要在10分钟后执行,可以将这个任务放入DelayQueue,并设置延迟时间为10分钟,10分钟后这个任务就能从队列中取出,然后执行。Java的ScheduledThreadPoolExecutor就使用了DelayQueue进行任务的调度。

  • 缓存系统:在一个缓存系统中,DelayQueue可以用于存储缓存项,缓存项在创建时设置一个到期时间,到期后缓存项就能从DelayQueue中取出,然后清除。这种方式可以避免需要定时扫描所有缓存项来查找并清除过期的缓存项。

  • 会话管理:在网络编程中,可以使用DelayQueue来管理用户的会话,当用户的会话在一定时间内没有活动(例如没有发送或接收数据),那么这个会话就认为是过期的,可以从DelayQueue中取出并关闭。这种方式可以防止资源的浪费,提高服务器的处理能力。

  • 消息重发:在消息队列中,如果消息没有被正确处理,可以将消息放入DelayQueue并设置一个延迟时间,延迟时间过后这个消息就能从DelayQueue中取出并重新发送。

总的来说,其实只要是需要延迟处理的任务,都可以使用DelayQueue来实现。

上次编辑于:
贡献者: yanggl