09.并发工具类

总结
  • Lock 比 synchronized 多了可中断、超时、公平锁、多条件变量
  • StampedLock 增加乐观读(无锁),性能最高,但不支持重入和条件变量
  • CountDownLatch 一个等多个,CyclicBarrier 多个互相等(可循环)
  • 原子类基于 CAS,LongAdder 高并发累加性能优于 AtomicLong
  • 推荐有界队列,无界队列可能 OOM

Lock 与 Condition

Lock 解决互斥,Condition 解决同步(是 管程 条件变量的显式实现,支持多个等待队列,比 wait/notify 更灵活)。底层原理见 AQS抽象队列同步器原理

Lock 相比 synchronized 的优势

特性 synchronized Lock
响应中断 lockInterruptibly()
超时获取 tryLock(time, unit)
非阻塞获取 tryLock()
多个条件变量 ❌(只有一个 Wait Set) ✅ 多个 Condition

可见性保障:Lock 内部有 volatilestate 变量,结合 happens-before 规则 保证可见性。

可重入锁:同一线程可以重复获取同一把锁(ReentrantLock)。

公平锁 vs 非公平锁:公平锁按等待时间唤醒,线程切换频繁,性能低;默认用非公平锁。

Lock+Condition 可实现异步转同步(如 RPC 调用等待结果),这也是 Guarded Suspension 模式 的实现基础。

Semaphore 信号量

计数器 + 等待队列,三个操作:init(初始值)、down/acquire(-1,<0 则阻塞)、up/release(+1,≤0 则唤醒)。

核心特点:可以允许多个线程同时访问临界区(synchronized/Lock 只允许一个)。

典型场景:对象池,限制最多 N 个线程同时使用资源。

class ObjPool<T, R> {
    final List<T> pool;
    final Semaphore sem;

    ObjPool(int size, T t) {
        pool = new Vector<>();
        for (int i = 0; i < size; i++) pool.add(t);
        sem = new Semaphore(size);
    }

    R exec(Function<T, R> func) throws InterruptedException {
        T t = null;
        sem.acquire();
        try {
            t = pool.remove(0);
            return func.apply(t);
        } finally {
            pool.add(t);
            sem.release();
        }
    }
}

ReadWriteLock 读写锁

三个原则:多读单写,写时禁读。

// 缓存读写示例(含锁降级)
V get(K key) {
    r.lock();
    try {
        V v = m.get(key);
        if (v != null) return v;
        r.unlock();
        w.lock();
        try {
            v = m.get(key); // 双重检查
            if (v == null) { v = load(key); m.put(key, v); }
            r.lock(); // 降级为读锁
        } finally { w.unlock(); }
        return v;
    } finally { r.unlock(); }
}

StampedLock

在读写锁基础上增加乐观读(无锁),性能更高。

模式 说明
写锁 独占,同 WriteLock
悲观读锁 共享,同 ReadLock
乐观读 无锁,允许写操作并发,读完需 validate 验证

注意

long stamp = sl.tryOptimisticRead();
int curX = x; // 读入局部变量
if (!sl.validate(stamp)) { // 验证期间是否有写操作
    stamp = sl.readLock(); // 升级为悲观读锁
    try { curX = x; } finally { sl.unlockRead(stamp); }
}

CountDownLatch 与 CyclicBarrier

CountDownLatch:一个线程等待多个线程完成。计数器不可重置。

CountDownLatch latch = new CountDownLatch(2);
executor.execute(() -> { pos = getPOrders(); latch.countDown(); });
executor.execute(() -> { dos = getDOrders(); latch.countDown(); });
latch.await(); // 等待两个任务都完成

CyclicBarrier:一组线程互相等待,到齐后触发回调,计数器自动重置可循环使用。

CyclicBarrier barrier = new CyclicBarrier(2, () -> {
    executor.execute(() -> check()); // 两个线程都到达后执行
});
CountDownLatch CyclicBarrier
等待方向 一个等多个 多个互相等
计数器 不可重置 自动重置,可循环
回调 支持

并发容器

容器 特点
CopyOnWriteArrayList 写时复制,适合读多写少,可容忍短暂不一致,原理见 CoW 模式
ConcurrentHashMap key 无序,key/value 不能为 null
ConcurrentSkipListMap key 有序,性能比 ConcurrentHashMap 高
CopyOnWriteArraySet 同 CopyOnWriteArrayList
ConcurrentSkipListSet 同 ConcurrentSkipListMap

Queue 分类:阻塞(Blocking)vs 非阻塞;单端(Queue)vs 双端(Deque)。推荐使用有界队列,无界队列可能导致 OOM(线程池 同理)。

无锁方案:原子类

基于 CAS 实现,无需加锁。

类型 代表类
原子化基本类型 AtomicIntegerAtomicLongAtomicBoolean
原子化对象引用 AtomicReferenceAtomicStampedReference(解决 ABA)
原子化数组 AtomicIntegerArray
原子化对象属性更新器 AtomicIntegerFieldUpdater
原子化累加器 LongAdder(仅累加,性能优于 AtomicLong)

线程池

核心参数与工作原理见 线程池ThreadPoolExecutor

Future 与 CompletableFuture

Future 模式与 Promise 模式的深入讲解见 Future模式与Promise模式

ExecutorService 的 submit 方法

Future<?> submit(Runnable task);              // 只能判断是否结束
<T> Future<T> submit(Callable<T> task);       // 可 get() 获取返回值
<T> Future<T> submit(Runnable task, T result);// 通过 result 传出结果

FutureTask:同时实现 Runnable 和 Future,可直接放入 Thread 或线程池:

FutureTask<Integer> task = new FutureTask<>(() -> 1 + 2);
new Thread(task).start(); // 或 executor.submit(task)
Integer result = task.get();

CompletionService:批量异步任务

内部维护阻塞队列,哪个任务先完成先被取出,适合"最快结果优先"场景:

CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
cs.submit(() -> getPriceByS1());
cs.submit(() -> getPriceByS2());
cs.submit(() -> getPriceByS3());

for (int i = 0; i < 3; i++) {
    Integer r = cs.take().get(); // 按完成顺序取出
    executor.execute(() -> save(r));
}

Fork/Join 分治模型

将大任务递归拆分为子任务,子任务结果逐层合并。

class Fibonacci extends RecursiveTask<Integer> {
    private final int n;
    public Fibonacci(int n) { this.n = n; }

    protected Integer compute() {
        if (n <= 1) return n;
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork(); // 异步执行子任务
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join(); // 合并结果
    }
}

ForkJoinPool fjp = new ForkJoinPool(4);
System.out.println(fjp.invoke(new Fibonacci(8)));