ArrayBlockingQueue(有界阻塞队列)
ArrayBlockingQueue(有界阻塞队列)
ArrayBlockingQueue 是 java.util.concurrent 包下的一个类,它是 BlockingQueue 接口的一个实现。这是一个由数组支持的有界阻塞队列。
队列按照 FIFO (先进先出) 的规则对元素进行排序,队列的头部是在队列中存在时间最长的元素。新的元素插入到队列的尾部,队列检索操作会获取位于队列头部的元素。
原理
- 使用有界阻塞队列,队列中元素先进先出,存取元素操作相互排斥
- 使用静态数组,容量固定,在构建ArrayBlockingQueue时必须指定长度,并且没有扩容机制
- 线程安全使用ReentrantLock来实现,存取的是同一把锁,操作的是同一个数组队形,存取操作相互排斥
- 入队是从队首开始添加元素,并记录putIndex,同时唤醒notEmpty(当putIndex到达队尾时设置为0)
- 出队也是从队首开始取出元素,并记录takeIndex,同时唤醒notFull(当takeIndex到达队尾时设置为0)
注意:两个Index指针都是从队首像队尾移动,保证队列先进先出原则
ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
核心属性
final Object[] items: 这是一个数组,用来保存队列中的元素。
int takeIndex: 表示下一个要被获取(或“take”)的元素在数组中的位置。如果队列为空,则没有具体的意义。
int putIndex: 表示下一个要添加(或“put”)的元素在数组中的位置。如果队列已满,则没有具体的意义。
int count: 表示队列中当前的元素数量。
final ReentrantLock lock: 重入锁,用于控制对队列的并发访问。
private final Condition notEmpty: 当队列为空时,获取元素的线程可以在这个条件上等待。
private final Condition notFull: 当队列已满时,添加元素的线程可以在这个条件上等待。
核心方法
- void put(E e): 将指定元素插入此队列中,如果队列已满,则阻塞等待可用的空间。
- E take(): 从队列中取出并删除一个元素,如果队列为空,当前线程则会阻塞,直到有元素可以获取。
- boolean offer(E e, long timeout, TimeUnit unit): 尝试将元素插入队列,如果队列已满,则等待指定的等待时间。如果在指定的时间内,队列仍然没有可用空间,那么返回
false。如果插入成功,则返回 true。 - E poll(long timeout, TimeUnit unit): 尝试从队列中获取并删除第一个元素,并等待指定的时间。如果在指定的时间内,队列仍为空,则返回
null。 - int remainingCapacity(): 返回此队列中剩余的可用空间。
入队源码
// 添加元素
public void put(E e) throws InterruptedException {
// 检查元素是否为空
checkNotNull(e);
// 获取锁
final ReentrantLock lock = this.lock;
// 加可中断锁
lock.lockInterruptibly();
try {
// 用while不用if是为了防止虚假唤醒
while (count == items.length) {
// 队列满了,表示已经没有空位了,需要挂起生产者
notFull.await();
}
// 入队
enqueue(e);
} finally {
// 唤醒消费者线程
lock.unlock();
}
}
// 元素入队
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) {
// 环形数组实现,putIndex指针到数组尽头了,返回头部
putIndex = 0;
}
count++;
// 条件队列转到同步队列,准备唤醒消费者线程,此时队列中有元素
notEmpty.signal();
}
出队源码
// 取出元素
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 加可中断锁
lock.lockInterruptibly();
try {
while (count == 0) {
// 队列空了,表示已经没有元素了,需要挂起消费者
notEmpty.await();
}
// 出队
return dequeue();
} finally {
// 唤醒生产者线程
lock.unlock();
}
}
// 元素出队
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) {
// 环形数组,takeIndex 指针到数组尽头了,返回头部
takeIndex = 0;
}
count--;
if (itrs != null) {
itrs.elementDequeued();
}
// notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
notFull.signal();
return x;
}
思考:为什么ArrayBlockingQueue对数组操作要设计成双指针?
使用双指针的好处在于可以避免数组的复制操作。
如果使用单指针,每次删除元素时需要将后面的元素全部向前移动,这样会导致时间复杂度为 O(n)。
而使用双指针,我们可以直接将 takeIndex 指向下一个元素,而不需要将其前面的元素全部向前移动。
同样地,插入新的元素时,我们可以直接将新元素插入到 putIndex 所指向的位置,而不需要将其后面的元素全部向后移动。
这样可以使得插入和删除的时间复杂度都是 O(1) 级别,提高了队列的性能。
使用场景
生产者-消费者模式:ArrayBlockingQueue可以在生产者-消费者模式中使用,这是最常见的一种应用场景。例如,你有一个系统需要处理大量的任务,但是系统的处理能力有限,不能立即处理这些任务。你可以创建一个ArrayBlockingQueue,生产者线程将任务放入队列,消费者线程从队列中取出任务进行处理。当队列已满时,生产者线程会被阻塞,直到队列中有空闲的位置;当队列为空时,消费者线程会被阻塞,直到队列中有新的任务。
资源池:ArrayBlockingQueue可以用来实现一个固定大小的资源池,例如数据库连接池、线程池等。资源被放入一个ArrayBlockingQueue,需要资源时从队列中取出,用完后再放回队列。当队列为空时,如果还需要资源,则需要等待,直到有资源被放回队列。
数据流处理:如果你的系统需要处理一个数据流,例如日志文件、网络数据等,你可以创建一个ArrayBlockingQueue,一个线程负责从数据流中读取数据并放入队列,其他线程从队列中取出数据进行处理。这样可以有效地分离数据读取和数据处理两个环节,提高系统的处理能力。
总的来说,它可以应用在任何需要队列的场景,并且需要队列大小有界,或者需要阻塞操作的场景。