Java中的队列

#java #并发 #最佳实践

总结
  • 并发队列分两类:阻塞队列(基于锁)和非阻塞队列(基于 CAS)
  • 阻塞队列实现了 BlockingQueue 接口,线程安全,队满/队空时自动阻塞
  • 生产环境必须用有界队列,无界队列(如 LinkedBlockingQueue 默认)会 OOM
  • 选型核心:有界用 ArrayBlockingQueue,高吞吐用 LinkedBlockingQueue,延迟任务用 DelayQueue,无锁高性能用 ConcurrentLinkedQueue

1. 整体分类

Java并发队列

JDK 并发队列按实现方式分为两大类:

类型 实现机制 特点
阻塞队列 ReentrantLock + Condition 队满/队空时线程阻塞等待,线程安全
非阻塞队列 CAS 原子操作 不阻塞线程,并发性能更高

阻塞队列底层依赖 AQS 提供的 ReentrantLockCondition 实现等待/唤醒机制;非阻塞队列则依赖 CAS 保证原子性。

2. 阻塞队列

ArrayBlockingQueue

有界数组队列,最常用。初始化必须指定容量。

内部用一把 ReentrantLock + notEmptynotFull 两个 Condition 控制并发:

读写共用同一把锁,生产和消费互斥。

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("task");       // 队满则阻塞
queue.offer("task", 1, TimeUnit.SECONDS); // 超时返回 false
String task = queue.take(); // 队空则阻塞

LinkedBlockingQueue

链表实现,可有界可无界(默认 Integer.MAX_VALUE,生产环境务必指定容量)。

takeLockputLock 两把独立的锁,读写不互斥,吞吐量高于 ArrayBlockingQueue

// 生产环境必须指定容量,否则无界队列可能 OOM
BlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);

线程池newFixedThreadPoolnewSingleThreadExecutor 默认使用的就是无界 LinkedBlockingQueue,这也是为什么生产环境不推荐直接用 Executors 工厂方法的原因之一。

PriorityBlockingQueue

基于最小堆的无界优先级队列,每次出队返回优先级最高的元素。

元素必须实现 Comparable 接口或传入 Comparator。因为是无界队列,put 不会阻塞,只有 take 在队空时阻塞。扩容时先释放锁再用 CAS 保证只有一个线程执行扩容。

PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
// Task 实现 Comparable,优先级高的先出队

DelayQueue

支持延迟获取的无界阻塞队列,底层用 PriorityQueue 存储。

元素必须实现 Delayed 接口,只有延迟时间到期的元素才能被取出。常用于:

public class DelayTask implements Delayed {
    private final long expireTime; // 到期时间戳

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.expireTime, ((DelayTask) o).expireTime);
    }
}

DelayQueue<DelayTask> queue = new DelayQueue<>();
queue.put(new DelayTask(5000)); // 5 秒后才能取出
DelayTask task = queue.take();  // 阻塞直到有到期元素

SynchronousQueue

无缓冲队列,内部不存储任何元素。每一个 put 必须等待一个 take,反之亦然,生产者和消费者必须配对才能完成传递。

线程池newCachedThreadPool 使用的就是 SynchronousQueue:有空闲线程就直接交付任务,没有就新建线程,实现了任务的即时传递。

SynchronousQueue<String> queue = new SynchronousQueue<>();
// 生产者线程
new Thread(() -> queue.put("data")).start();
// 消费者线程,两者必须同时就绪
new Thread(() -> queue.take()).start();

LinkedTransferQueue

无界阻塞队列,可以看作 LinkedBlockingQueue + SynchronousQueue 的结合体。

transfer() 方法:如果有等待的消费者,直接交付;否则入队等待消费者取走。相比 SynchronousQueue 多了内部缓冲,相比 LinkedBlockingQueue 用 CAS 无锁操作性能更好。

3. 非阻塞队列

ConcurrentLinkedQueue

基于单向链表的无界非阻塞队列,用 CAS 保证线程安全,不阻塞线程。

适合读多写少、对延迟敏感的场景。size() 方法需要遍历链表,时间复杂度 O(n),高频调用要注意性能。

ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll(); // 队空返回 null,不阻塞

ConcurrentLinkedDeque

双端队列,支持从头部和尾部同时插入/删除,同样基于 CAS 实现。

适合多生产者多消费者、需要双端操作的场景(如工作窃取算法)。

4. BlockingQueue 核心 API

BlockingQueue

方法

方法 队满/队空时的行为 说明
add(e) 抛出异常 队满抛 IllegalStateException
offer(e) 返回 false 不阻塞
put(e) 阻塞等待 直到有空位
offer(e, t, unit) 超时返回 false 等待指定时间
remove() 抛出异常 队空抛 NoSuchElementException
poll() 返回 null 不阻塞
take() 阻塞等待 直到有元素
poll(t, unit) 超时返回 null 等待指定时间

生产环境推荐用 offer + 超时 或 put,避免用 add / remove 抛异常的方式。

5. 选型指南

场景 推荐队列
通用有界缓冲,读写均衡 ArrayBlockingQueue
高吞吐生产消费,读写分离 LinkedBlockingQueue(指定容量)
按优先级处理任务 PriorityBlockingQueue
延迟/定时任务 DelayQueue
线程间直接交付,无需缓冲 SynchronousQueue
无锁高性能,不需要阻塞 ConcurrentLinkedQueue
需要双端操作 ConcurrentLinkedDeque