跳至主要內容

LinkedBlockingQueue(链表结构的阻塞队列)

Yaien Blog原创2023年6月5日大约 4 分钟并发线程安全Queue

LinkedBlockingQueue(链表结构的阻塞队列)

LinkedBlockingQueue是java.util.concurrent包中的一个类,它实现了BlockingQueue接口,是一个基于链表结构的阻塞队列,按FIFO(先进先出)排序元素,也是一种典型的生产者和消费者模型的阻塞队列。

原理

  1. 实现的是无界阻塞队列,可以指定容量,默认Integer.MAX_VALUE,先进先出,存取互不干扰
  2. 数据结构选用链表,可指定容量,内存存粗Node元素
  3. 实现两把锁,存取互不干扰,存取操作的是不同的Node对象,但是删除元素时存取元素都会加锁
  4. 入队是从队尾入队,有last指针记录
  5. 出队从队首出,有head指针记录

核心属性

  • Node[] items:链表实现,用来存储队列中的元素。每个节点包含一个元素和指向下一个节点的链接。

  • ReentrantLock takeLock:可重入锁,用于控制元素的移除操作。当多个线程试图移除队列中的元素时,这个锁确保了只有一个线程可以执行该操作。

  • ReentrantLock putLock:可重入锁,用于控制元素的插入操作。当多个线程试图向队列中插入元素时,这个锁确保了只有一个线程可以执行该操作。

  • Condition notEmpty:用于协调消费者线程。当队列为空,消费者线程试图移除元素时,它们会等待这个条件变量。

  • Condition notFull:用于协调生产者线程。当队列已满,生产者线程试图插入元素时,它们会等待这个条件变量。

  • AtomicInteger count:用来记录队列中当前的元素数量。

  • capacity (int):队列的容量,如果在创建队列时没有指定容量,那么容量将等于 Integer.MAX_VALUE。

入队源码

    // 添加元素
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 将元素封装到Node对象中
        LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
        // 获取插入锁
        final ReentrantLock putLock = this.putLock;
        // 获取队列元素数量
        final AtomicInteger count = this.count;
        // 加锁
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 队列满了,阻塞生产者线程到等待队列
            while (count.get() == capacity) {
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 队列元素数量+1
            c = count.getAndIncrement();
            if (c + 1 < capacity) {
                // 队列中还有位置,将notFull移到同步队列等待唤醒
                notFull.signal();
            }
        } finally {
            // 解锁,唤醒阻塞的线程
            putLock.unlock();
        }
        // 唤醒消费者消费
        if (c == 0){
            signalNotEmpty();
        }
    }

出队源码

    // 元素出队
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 获取队列元素数量
        final AtomicInteger count = this.count;
        // 获取出队锁
        final ReentrantLock takeLock = this.takeLock;
        // 加锁
        takeLock.lockInterruptibly();
        try {
            // 队列中元素数量为0,表示没有元素了,阻塞消费者线程
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出队
            x = dequeue();
            // 队列元素数量-1
            c = count.getAndDecrement();
            if (c > 1)
                // 消费者线程转到等待队列,等待唤醒
                notEmpty.signal();
        } finally {
            // 解锁
            takeLock.unlock();
        }
        if (c == capacity)
            // 唤醒生产者线程
            signalNotFull();
        return x;
    }

使用场景

  • 任务队列:在多线程编程中,常常需要使用一个任务队列来存储待处理的任务。例如,在一个网页爬虫程序中,可以创建一个LinkedBlockingQueue,用于存储待爬取的网页URL。一个或多个生产者线程负责从网络上发现新的URL并将它们添加到队列中,一个或多个消费者线程负责从队列中取出URL并下载网页内容。

  • 日志处理:在服务器应用中,可能需要处理大量的日志消息。可以创建一个LinkedBlockingQueue,用于存储待处理的日志消息。一个或多个生产者线程负责生成日志消息并将它们添加到队列中,一个或多个消费者线程负责从队列中取出日志消息并写入日志文件或发送到日志服务器。

  • 资源池: LinkedBlockingQueue也可以用于创建资源池,例如数据库连接池、线程池等。当需要一个资源时,可以从队列中取出,当不再需要这个资源时,可以将它放回队列。这样可以有效地复用资源,提高系统的效率。

思考:线程池为什么使用LinkedBlockingQueue而不是ArrayBlockingQueue呢?
因为LinkedBlockingQueue的入队和出队是两把锁,存取元素互不干扰。
ArrayBlockingQueue则是使用的同一把锁,存取元素时相互排斥。
LinkedBlockingQueue这种锁分离的方式可以有效地减少锁竞争,从而提高线程池的并发性能。

上次编辑于: 2024/3/9 11:11:30
贡献者: yanggl