跳至主要內容

ArrayBlockingQueue(有界阻塞队列)

Yaien Blog原创大约 6 分钟并发线程安全Queue

ArrayBlockingQueue(有界阻塞队列)

ArrayBlockingQueue 是 java.util.concurrent 包下的一个类,它是 BlockingQueue 接口的一个实现。这是一个由数组支持的有界阻塞队列
队列按照 FIFO (先进先出) 的规则对元素进行排序,队列的头部是在队列中存在时间最长的元素。新的元素插入到队列的尾部,队列检索操作会获取位于队列头部的元素。

原理

  1. 使用有界阻塞队列,队列中元素先进先出,存取元素操作相互排斥
  2. 使用静态数组,容量固定,在构建ArrayBlockingQueue时必须指定长度,并且没有扩容机制
  3. 线程安全使用ReentrantLock来实现,存取的是同一把锁,操作的是同一个数组队形,存取操作相互排斥
  4. 入队是从队首开始添加元素,并记录putIndex,同时唤醒notEmpty(当putIndex到达队尾时设置为0)
  5. 出队也是从队首开始取出元素,并记录takeIndex,同时唤醒notFull(当takeIndex到达队尾时设置为0)

注意:两个Index指针都是从队首像队尾移动,保证队列先进先出原则

ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

核心属性

  • final Object[] items: 这是一个数组,用来保存队列中的元素。

  • int takeIndex: 表示下一个要被获取(或“take”)的元素在数组中的位置。如果队列为空,则没有具体的意义。

  • int putIndex: 表示下一个要添加(或“put”)的元素在数组中的位置。如果队列已满,则没有具体的意义。

  • int count: 表示队列中当前的元素数量。

  • final ReentrantLock lock: 重入锁,用于控制对队列的并发访问。

  • private final Condition notEmpty: 当队列为空时,获取元素的线程可以在这个条件上等待。

  • private final Condition notFull: 当队列已满时,添加元素的线程可以在这个条件上等待。

核心方法

  • void put(E e): 将指定元素插入此队列中,如果队列已满,则阻塞等待可用的空间。
  • E take(): 从队列中取出并删除一个元素,如果队列为空,当前线程则会阻塞,直到有元素可以获取。
  • boolean offer(E e, long timeout, TimeUnit unit): 尝试将元素插入队列,如果队列已满,则等待指定的等待时间。如果在指定的时间内,队列仍然没有可用空间,那么返回
    false。如果插入成功,则返回 true。
  • E poll(long timeout, TimeUnit unit): 尝试从队列中获取并删除第一个元素,并等待指定的时间。如果在指定的时间内,队列仍为空,则返回
    null。
  • int remainingCapacity(): 返回此队列中剩余的可用空间。

入队源码

    // 添加元素
    public void put(E e) throws InterruptedException {
        // 检查元素是否为空
        checkNotNull(e);
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 加可中断锁
        lock.lockInterruptibly();
        try {
            // 用while不用if是为了防止虚假唤醒
            while (count == items.length) {
                // 队列满了,表示已经没有空位了,需要挂起生产者
                notFull.await();
            }
            // 入队
            enqueue(e);
        } finally {
            // 唤醒消费者线程
            lock.unlock();
        }
    }
    
    // 元素入队
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) {
            // 环形数组实现,putIndex指针到数组尽头了,返回头部
            putIndex = 0;
        }
        count++;
        // 条件队列转到同步队列,准备唤醒消费者线程,此时队列中有元素
        notEmpty.signal();
    }

出队源码

    // 取出元素
    public E take() throws InterruptedException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 加可中断锁
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                // 队列空了,表示已经没有元素了,需要挂起消费者
                notEmpty.await();
            }
            // 出队
            return dequeue();
        } finally {
            // 唤醒生产者线程
            lock.unlock();
        }
    }

    // 元素出队
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) {
            // 环形数组,takeIndex 指针到数组尽头了,返回头部
            takeIndex = 0;
        }
        count--;
        if (itrs != null) {
            itrs.elementDequeued();
        }
        // notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
        notFull.signal();
        return x;
    }

思考:为什么ArrayBlockingQueue对数组操作要设计成双指针?

img
img

使用双指针的好处在于可以避免数组的复制操作。

如果使用单指针,每次删除元素时需要将后面的元素全部向前移动,这样会导致时间复杂度为 O(n)。
而使用双指针,我们可以直接将 takeIndex 指向下一个元素,而不需要将其前面的元素全部向前移动。
同样地,插入新的元素时,我们可以直接将新元素插入到 putIndex 所指向的位置,而不需要将其后面的元素全部向后移动。
这样可以使得插入和删除的时间复杂度都是 O(1) 级别,提高了队列的性能。

使用场景

  • 生产者-消费者模式:ArrayBlockingQueue可以在生产者-消费者模式中使用,这是最常见的一种应用场景。例如,你有一个系统需要处理大量的任务,但是系统的处理能力有限,不能立即处理这些任务。你可以创建一个ArrayBlockingQueue,生产者线程将任务放入队列,消费者线程从队列中取出任务进行处理。当队列已满时,生产者线程会被阻塞,直到队列中有空闲的位置;当队列为空时,消费者线程会被阻塞,直到队列中有新的任务。

  • 资源池:ArrayBlockingQueue可以用来实现一个固定大小的资源池,例如数据库连接池、线程池等。资源被放入一个ArrayBlockingQueue,需要资源时从队列中取出,用完后再放回队列。当队列为空时,如果还需要资源,则需要等待,直到有资源被放回队列。

  • 数据流处理:如果你的系统需要处理一个数据流,例如日志文件、网络数据等,你可以创建一个ArrayBlockingQueue,一个线程负责从数据流中读取数据并放入队列,其他线程从队列中取出数据进行处理。这样可以有效地分离数据读取和数据处理两个环节,提高系统的处理能力。

总的来说,它可以应用在任何需要队列的场景,并且需要队列大小有界,或者需要阻塞操作的场景。

上次编辑于:
贡献者: yanggl