8.Netty的ByteBuf

#netty #review

总结
  • ByteBuf 用双指针(readerIndex/writerIndex)解决了 NIO ByteBuffer 读写切换麻烦的问题,容量还能动态扩展
  • 分三个维度:Heap/Direct、Pooled/Unpooled、Unsafe/非Unsafe,I/O 场景用 PooledDirectByteBuf,业务数据用 UnpooledHeapByteBuf
  • 基于引用计数管理生命周期,retain() +1,release() -1,归零触发回收或归还对象池
  • 池化 ByteBuf 必须手动 release(),否则内存泄漏;非池化的 JVM GC 会兜底,但堆外内存也建议手动释放
  • 释放原则:谁最后使用,谁负责释放;传递给下一个组件的,由接收方释放
  • 继承 SimpleChannelInboundHandler 可以自动释放 Inbound ByteBuf

1. 为什么不用 NIO ByteBuffer?

NIO ByteBuffer

NIO 原生的 ByteBuffer 用起来有两个明显的痛点:

ByteBuf 针对这两点做了改进:

Netty ByteBuf

2. ByteBuf 分类

ByteBuf代码结构

三个维度正交组合:

Heap / Direct:堆内 vs 堆外。Heap 在 JVM 堆上分配,底层是字节数组,GC 自动管理;Direct 是堆外内存,不受 JVM 堆限制,底层依赖 JDK 的 ByteBuffer,减少数据拷贝次数。

Netty 收发数据默认用 DirectBuffer,做 Socket 读写时不需要把堆内数据再拷一份到堆外,省掉了一次内存拷贝。

Pooled / Unpooled:池化 vs 非池化。Pooled 从预分配的内存池里取,用完放回去,适合高频分配场景;Unpooled 每次直接向系统申请,用完由 GC 回收。

Unsafe / 非 Unsafe:操作方式不同。Unsafe 通过 JDK 的 Unsafe 对象直接操作物理内存(offset + index),非 Unsafe 通过数组下标操作,更安全。

怎么选

场景 推荐类型 原因
业务数据处理 UnpooledHeapByteBuf 实现简单,GC 自动回收,不用担心内存泄漏
I/O 读写 PooledDirectByteBuf 避免堆内到堆外的拷贝,性能更好,但必须手动 release()

3. 引用计数与内存管理

ByteBuf 实现了 ReferenceCounted 接口,用引用计数管理生命周期:

ByteBuf buffer = ctx.alloc().directBuffer();
assert buffer.refCnt() == 1;

buffer.retain();          // refCnt = 2
buffer.release();         // refCnt = 1
buffer.release();         // refCnt = 0,触发回收

引用计数归零时,Pooled ByteBuf 会归还到对象池,Unpooled ByteBuf 会被释放。如果一个 ByteBuf 已经不可达但引用计数还大于 0,它就既不会被 GC 回收,也不会归还对象池——内存泄漏就这么来的

3.1 三类 ByteBuf 的计数行为

独立 ByteBufcopy()readBytes(int length) 等):新建时计数器初始化为 1,使用完必须主动 release()

共享底层 buffer 且计数增加retainedSlice()decode() 内部调用等):底层共用同一块内存,但会调 retain() 使计数 +1,使用完也要主动 release()

共享底层 buffer 且计数不变duplicate()slice()order()):没有自己的计数器,共用父 ByteBuf 的计数,计数不会自动增加。如果要把这类衍生 ByteBuf 传给其他组件,必须先手动 retain(),再在接收方 release()

ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);

try {
    while (parent.isReadable(16)) {
        ByteBuf derived = parent.readSlice(16);
        derived.retain();   // 传出去之前先 retain
        process(derived);   // process 内部负责 release
    }
} finally {
    parent.release();
}

注意:duplicate() 写入数据会影响原始 ByteBuf,容易出问题,用的时候要小心。

4. 谁来释放?

基本原则:谁最后使用,谁负责释放

4.1 ChannelHandler 中的释放规则

Inbound 消息

最省心的方式是继承 SimpleChannelInboundHandler,它会在 channelRead 里自动帮你释放:

// SimpleChannelInboundHandler 源码
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            channelRead0(ctx, (I) msg);  // 子类实现业务逻辑
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);  // 自动释放
        }
    }
}

Outbound 消息:应用程序写出的 ByteBuf 由 Netty 负责释放,不需要手动处理。

5. 内存泄漏检测

Netty 内置了泄漏检测机制,通过 JVM 参数控制检测级别:

-Dio.netty.leakDetection.level=advanced
级别 说明
DISABLED 关闭检测,不推荐
SIMPLE 抽样 1%,只提示是否泄漏(默认)
ADVANCED 抽样 1%,提示泄漏位置
PARANOID 检测所有 ByteBuf,提示泄漏位置,性能开销大,适合排查阶段

出现泄漏时日志里会有 LEAK 关键字,找到对应代码手动 release(),或者用 ReferenceCountUtil.release(msg) 处理:

ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records:
Created at:
  io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:872)
  com.example.TwgMessageDecoder.decode(TwgMessageDecoder.java:76)

6. 核心 API

6.1 指针操作

方法 说明
readerIndex() 当前读指针位置
writerIndex() 当前写指针位置
markReaderIndex() 保存当前读指针
resetReaderIndex() 恢复到上次保存的读指针

6.2 数据读写

read/write 系列会移动指针,get/set 系列不会移动指针,只是按 index 直接操作。

6.3 内存管理

7. 实战示例

public static void main(String[] args) {
    ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.buffer(6, 10);
    printByteBufInfo("初始化 buffer(6, 10)", buffer);

    buffer.writeBytes(new byte[]{1, 2});
    printByteBufInfo("写入 2 字节", buffer);

    buffer.writeInt(100);
    printByteBufInfo("写入 int 100", buffer);

    buffer.writeBytes(new byte[]{3, 4, 5});
    printByteBufInfo("写入 3 字节", buffer);

    byte[] read = new byte[buffer.readableBytes()];
    buffer.readBytes(read);
    printByteBufInfo("读取所有字节后", buffer);

    System.out.println("getInt(2): " + buffer.getInt(2));   // 不移动指针
    buffer.setByte(1, 0);
    System.out.println("getByte(1): " + buffer.getByte(1)); // 不移动指针
    printByteBufInfo("get/set 操作后", buffer);
}

private static void printByteBufInfo(String step, ByteBuf buffer) {
    System.out.println("------ " + step + " -----");
    System.out.println("readerIndex: " + buffer.readerIndex());
    System.out.println("writerIndex: " + buffer.writerIndex());
    System.out.println("readableBytes: " + buffer.readableBytes());
    System.out.println("writableBytes: " + buffer.writableBytes());
    System.out.println("capacity: " + buffer.capacity());
    System.out.println("maxCapacity: " + buffer.maxCapacity());
}