09.并发工具类
- Lock 比 synchronized 多了可中断、超时、公平锁、多条件变量
- StampedLock 增加乐观读(无锁),性能最高,但不支持重入和条件变量
- CountDownLatch 一个等多个,CyclicBarrier 多个互相等(可循环)
- 原子类基于 CAS,LongAdder 高并发累加性能优于 AtomicLong
- 推荐有界队列,无界队列可能 OOM
Lock 与 Condition
Lock 解决互斥,Condition 解决同步(是 管程 条件变量的显式实现,支持多个等待队列,比 wait/notify 更灵活)。底层原理见 AQS抽象队列同步器原理。
Lock 相比 synchronized 的优势:
| 特性 | synchronized | Lock |
|---|---|---|
| 响应中断 | ❌ | ✅ lockInterruptibly() |
| 超时获取 | ❌ | ✅ tryLock(time, unit) |
| 非阻塞获取 | ❌ | ✅ tryLock() |
| 多个条件变量 | ❌(只有一个 Wait Set) | ✅ 多个 Condition |
可见性保障:Lock 内部有 volatile 的 state 变量,结合 happens-before 规则 保证可见性。
可重入锁:同一线程可以重复获取同一把锁(ReentrantLock)。
公平锁 vs 非公平锁:公平锁按等待时间唤醒,线程切换频繁,性能低;默认用非公平锁。
Lock+Condition 可实现异步转同步(如 RPC 调用等待结果),这也是 Guarded Suspension 模式 的实现基础。
Semaphore 信号量
计数器 + 等待队列,三个操作:init(初始值)、down/acquire(-1,<0 则阻塞)、up/release(+1,≤0 则唤醒)。
核心特点:可以允许多个线程同时访问临界区(synchronized/Lock 只允许一个)。
典型场景:对象池,限制最多 N 个线程同时使用资源。
class ObjPool<T, R> {
final List<T> pool;
final Semaphore sem;
ObjPool(int size, T t) {
pool = new Vector<>();
for (int i = 0; i < size; i++) pool.add(t);
sem = new Semaphore(size);
}
R exec(Function<T, R> func) throws InterruptedException {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
ReadWriteLock 读写锁
三个原则:多读单写,写时禁读。
- 只有写锁支持条件变量,读锁调用
newCondition()抛异常 - 支持锁降级(写锁内获取读锁,再释放写锁),不支持锁升级
// 缓存读写示例(含锁降级)
V get(K key) {
r.lock();
try {
V v = m.get(key);
if (v != null) return v;
r.unlock();
w.lock();
try {
v = m.get(key); // 双重检查
if (v == null) { v = load(key); m.put(key, v); }
r.lock(); // 降级为读锁
} finally { w.unlock(); }
return v;
} finally { r.unlock(); }
}
StampedLock
在读写锁基础上增加乐观读(无锁),性能更高。
| 模式 | 说明 |
|---|---|
| 写锁 | 独占,同 WriteLock |
| 悲观读锁 | 共享,同 ReadLock |
| 乐观读 | 无锁,允许写操作并发,读完需 validate 验证 |
注意:
- 不支持重入,不支持条件变量
- 不要调用中断操作(会导致 CPU 飙升),需要中断用
readLockInterruptibly/writeLockInterruptibly
long stamp = sl.tryOptimisticRead();
int curX = x; // 读入局部变量
if (!sl.validate(stamp)) { // 验证期间是否有写操作
stamp = sl.readLock(); // 升级为悲观读锁
try { curX = x; } finally { sl.unlockRead(stamp); }
}
CountDownLatch 与 CyclicBarrier
CountDownLatch:一个线程等待多个线程完成。计数器不可重置。
CountDownLatch latch = new CountDownLatch(2);
executor.execute(() -> { pos = getPOrders(); latch.countDown(); });
executor.execute(() -> { dos = getDOrders(); latch.countDown(); });
latch.await(); // 等待两个任务都完成
CyclicBarrier:一组线程互相等待,到齐后触发回调,计数器自动重置可循环使用。
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
executor.execute(() -> check()); // 两个线程都到达后执行
});
| CountDownLatch | CyclicBarrier | |
|---|---|---|
| 等待方向 | 一个等多个 | 多个互相等 |
| 计数器 | 不可重置 | 自动重置,可循环 |
| 回调 | 无 | 支持 |
并发容器
| 容器 | 特点 |
|---|---|
CopyOnWriteArrayList |
写时复制,适合读多写少,可容忍短暂不一致,原理见 CoW 模式 |
ConcurrentHashMap |
key 无序,key/value 不能为 null |
ConcurrentSkipListMap |
key 有序,性能比 ConcurrentHashMap 高 |
CopyOnWriteArraySet |
同 CopyOnWriteArrayList |
ConcurrentSkipListSet |
同 ConcurrentSkipListMap |
Queue 分类:阻塞(Blocking)vs 非阻塞;单端(Queue)vs 双端(Deque)。推荐使用有界队列,无界队列可能导致 OOM(线程池 同理)。
无锁方案:原子类
基于 CAS 实现,无需加锁。
| 类型 | 代表类 |
|---|---|
| 原子化基本类型 | AtomicInteger、AtomicLong、AtomicBoolean |
| 原子化对象引用 | AtomicReference、AtomicStampedReference(解决 ABA) |
| 原子化数组 | AtomicIntegerArray 等 |
| 原子化对象属性更新器 | AtomicIntegerFieldUpdater 等 |
| 原子化累加器 | LongAdder(仅累加,性能优于 AtomicLong) |
线程池
核心参数与工作原理见 线程池ThreadPoolExecutor。
Future 与 CompletableFuture
Future 模式与 Promise 模式的深入讲解见 Future模式与Promise模式。
ExecutorService 的 submit 方法:
Future<?> submit(Runnable task); // 只能判断是否结束
<T> Future<T> submit(Callable<T> task); // 可 get() 获取返回值
<T> Future<T> submit(Runnable task, T result);// 通过 result 传出结果
FutureTask:同时实现 Runnable 和 Future,可直接放入 Thread 或线程池:
FutureTask<Integer> task = new FutureTask<>(() -> 1 + 2);
new Thread(task).start(); // 或 executor.submit(task)
Integer result = task.get();
CompletionService:批量异步任务
内部维护阻塞队列,哪个任务先完成先被取出,适合"最快结果优先"场景:
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
cs.submit(() -> getPriceByS1());
cs.submit(() -> getPriceByS2());
cs.submit(() -> getPriceByS3());
for (int i = 0; i < 3; i++) {
Integer r = cs.take().get(); // 按完成顺序取出
executor.execute(() -> save(r));
}
take():队列空则阻塞poll():队列空返回 nullpoll(timeout, unit):超时返回 null
Fork/Join 分治模型
将大任务递归拆分为子任务,子任务结果逐层合并。
ForkJoinPool:分治任务池,内部多个任务队列,子任务提交到当前工作线程的队列ForkJoinTask(RecursiveTask有返回值 /RecursiveAction无返回值)
class Fibonacci extends RecursiveTask<Integer> {
private final int n;
public Fibonacci(int n) { this.n = n; }
protected Integer compute() {
if (n <= 1) return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork(); // 异步执行子任务
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join(); // 合并结果
}
}
ForkJoinPool fjp = new ForkJoinPool(4);
System.out.println(fjp.invoke(new Fibonacci(8)));