抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

NETTY组件-BYTEBUF

ByteBuf

网络传输的基本单位是字节,在Java NIO中提供了ByteBuffer作为字节缓冲区容器,但该类的API使用起来不太方便,所以Netty实现了ByteBuf作为其替代品。

ByteBuf的优点

  • 相比ByteBuffer使用起来更加简单。
  • 通过内置的复合缓冲区类型实现了透明的zero-copy。
  • 容量可以按需增长。
  • 读和写使用了不同的索引指针。
  • 支持链式调用。
  • 支持引用计数与池化。
  • 可以被用户自定义的缓冲区类型扩展。
  • 它可以被用户自定义的缓冲区类型扩展;

ByteBuffer的实现

在讨论ByteBuf之前,我们先需要了解一下ByteBuffer的实现,这样才能比较深刻地明白它们之间的区别。

ByteBuffer继承于abstract class Buffer(所以还有LongBuffer、IntBuffer等其他类型的实现),本质上它只是一个有限的线性的元素序列,包含了三个重要的属性。

  • Capacity:缓冲区中元素的容量大小,你只能将capacity个数量的元素写入缓冲区,一旦缓冲区已满就需要清理缓冲区才能继续写数据。
  • Position:指向下一个写入数据位置的索引指针,初始位置为0,最大为capacity-1。当写模式转换为读模式时,position需要被重置为0。
  • Limit:在写模式中,limit是可以写入缓冲区的最大索引,也就是说它在写模式中等价于缓冲区的容量。在读模式中,limit表示可以读取数据的最大索引。

Buffer的流程

由于Buffer中只维护了position一个索引指针,所以它在读写模式之间的切换需要调用一个flip()方法来重置指针。使用Buffer的流程一般如下:

  • 写入数据到缓冲区。
  • 调用flip()方法。
  • 从缓冲区中读取数据
  • 调用buffer.clear()或者buffer.compact()清理缓冲区,以便下次写入数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

// 分配一个48字节大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf); // 读取数据到缓冲区
while (bytesRead != -1) {

buf.flip(); // 将position重置为0

while (buf.hasRemaining()) {
System.out.print((char) buf.get()); // 读取数据并输出到控制台
}

buf.clear(); // 清理缓冲区
bytesRead = inChannel.read(buf);
}
aFile.close();

Buffer中核心方法的实现也非常简单,主要就是在操作指针position。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark() {
mark = position; // mark属性是用来标记当前索引位置的
return this;
}

// 将当前索引位置重置为mark所标记的位置
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}

// 翻转这个Buffer,将limit设置为当前索引位置,然后再把position重置为0
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

// 清理缓冲区
// 说是清理,也只是把postion与limit进行重置,之后再写入数据就会覆盖之前的数据了
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

// 返回剩余空间
public final int remaining() {
return limit - position;
}

Java NIO中的Buffer API操作的麻烦之处就在于读写转换需要手动重置指针。而ByteBuf没有这种繁琐性,它维护了两个不同的索引,一个用于读取,一个用于写入。当你从ByteBuf读取数据时,它的readerIndex将会被递增已经被读取的字节数,同样的,当你写入数据时,writerIndex则会递增。readerIndex的最大范围在writerIndex的所在位置,如果试图移动readerIndex超过该值则会触发异常。

ByteBuf中名称以read或write开头的方法将会递增它们其对应的索引,而名称以get或set开头的方法则不会。ByteBuf同样可以指定一个最大容量,试图移动writerIndex超过该值则会触发异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public byte readByte() {
this.checkReadableBytes0(1); // 检查readerIndex是否已越界
int i = this.readerIndex;
byte b = this._getByte(i);
this.readerIndex = i + 1; // 递增readerIndex
return b;
}

private void checkReadableBytes0(int minimumReadableBytes) {
this.ensureAccessible();
if (this.readerIndex > this.writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));
}
}

public ByteBuf writeByte(int value) {
this.ensureAccessible();
this.ensureWritable0(1); // 检查writerIndex是否会越过capacity
this._setByte(this.writerIndex++, value);
return this;
}

private void ensureWritable0(int minWritableBytes) {
if (minWritableBytes > this.writableBytes()) {
if (minWritableBytes > this.maxCapacity - this.writerIndex) {
throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));
} else {
int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);
this.capacity(newCapacity);
}
}
}

// get与set只对传入的索引进行了检查,然后对其位置进行get或set
public byte getByte(int index) {
this.checkIndex(index);
return this._getByte(index);
}

public ByteBuf setByte(int index, int value) {
this.checkIndex(index);
this._setByte(index, value);
return this;
}

ByteBuf同样支持在堆内和堆外进行分配。在堆内分配也被称为支撑数组模式,它能在没有使用池化的情况下提供快速的分配和释放。

1
2
3
4
5
6
7
8
ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);
if (heapBuf.hasArray()) { // 判断是否有一个支撑数组
byte[] array = heapBuf.array();
// 计算第一个字节的偏移量
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes(); // 获得可读字节
handleArray(array, offset, length); // 调用你的处理方法
}

另一种模式为堆外分配,Java NIO ByteBuffer类在JDK1.4时就已经允许JVM实现通过JNI调用来在堆外分配内存(调用malloc()函数在JVM堆外分配内存),这主要是为了避免额外的缓冲区复制操作。

1
2
3
4
5
6
7
8
ByteBuf directBuf = Unpooled.directBuffer(capacity);
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
// 将字节复制到数组中
directBuf.getBytes(directBuf.readerIndex(), array);
handleArray(array, 0, length);
}

ByteBuf还支持第三种模式,它被称为复合缓冲区,为多个ByteBuf提供了一个聚合视图。在这个视图中,你可以根据需要添加或者删除ByteBuf实例,ByteBuf的子类CompositeByteBuf实现了该模式。

一个适合使用复合缓冲区的场景是HTTP协议,通过HTTP协议传输的消息都会被分成两部分——头部和主体,如果这两部分由应用程序的不同模块产生,将在消息发送时进行组装,并且该应用程序还会为多个消息复用相同的消息主体,这样对于每个消息都将会创建一个新的头部,产生了很多不必要的内存操作。使用CompositeByteBuf是一个很好的选择,它消除了这些额外的复制,以帮助你复用这些消息。

1
2
3
4
5
6
7
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ....;
ByteBuf bodyBuf = ....;
messageBuf.addComponents(headerBuf,bodyBuf);
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}

CompositeByteBuf透明的实现了zero-copy,zero-copy其实就是避免数据在两个内存区域中来回的复制。从操作系统层面上来讲,zero-copy指的是避免在内核态与用户态之间的数据缓冲区复制(通过mmap避免),而Netty中的zero-copy更偏向于在用户态中的数据操作的优化,就像使用CompositeByteBuf来复用多个ByteBuf以避免额外的复制,也可以使用wrap()方法来将一个字节数组包装成ByteBuf,又或者使用ByteBuf的slice()方法把它分割为多个共享同一内存区域的ByteBuf,这些都是为了优化内存的使用率。

那么如何创建ByteBuf呢?在上面的代码中使用到了Unpooled,它是Netty提供的一个用于创建与分配ByteBuf的工具类,建议都使用这个工具类来创建你的缓冲区,不要自己去调用构造函数。经常使用的是wrappedBuffer()与copiedBuffer(),它们一个是用于将一个字节数组或ByteBuffer包装为一个ByteBuf,一个是根据传入的字节数组与ByteBuffer/ByteBuf来复制出一个新的ByteBuf。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 通过array.clone()来复制一个数组进行包装
public static ByteBuf copiedBuffer(byte[] array) {
return array.length == 0 ? EMPTY_BUFFER : wrappedBuffer((byte[]) array.clone());
}

// 默认是堆内分配
public static ByteBuf wrappedBuffer(byte[] array) {
return (ByteBuf) (array.length == 0 ? EMPTY_BUFFER : new UnpooledHeapByteBuf(ALLOC, array, array.length));
}

// 也提供了堆外分配的方法
private static final ByteBufAllocator ALLOC;

public static ByteBuf directBuffer(int initialCapacity) {
return ALLOC.directBuffer(initialCapacity);
}

相对底层的分配方法是使用ByteBufAllocator,Netty实现了PooledByteBufAllocator和UnpooledByteBufAllocator,前者使用了jemalloc(一种malloc()的实现)来分配内存,并且实现了对ByteBuf的池化以提高性能。后者分配的是未池化的ByteBuf,其分配方式与之前讲的一致。

1
2
3
4
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
ByteBuf buffer = allocator.directBuffer();
do something.......

​ 为了优化内存使用率,Netty提供了一套手动的方式来追踪不活跃对象,像UnpooledHeapByteBuf这种分配在堆内的对象得益于JVM的GC管理,无需额外操心,而UnpooledDirectByteBuf是在堆外分配的,它的内部基于DirectByteBuffer,DirectByteBuffer会先向Bits类申请一个额度(Bits还拥有一个全局变量totalCapacity,记录了所有DirectByteBuffer总大小),每次申请前都会查看是否已经超过-XX:MaxDirectMemorySize所设置的上限,如果超限就会尝试调用Sytem.gc(),以试图回收一部分内存,然后休眠100毫秒,如果内存还是不足,则只能抛出OOM异常。堆外内存的回收虽然有了这么一层保障,但为了提高性能与使用率,主动回收也是很有必要的。由于Netty还实现了ByteBuf的池化,像PooledHeapByteBuf和PooledDirectByteBuf就必须依赖于手动的方式来进行回收(放回池中)。

​ Netty使用了引用计数器的方式来追踪那些不活跃的对象。引用计数的接口为ReferenceCounted,它的思想很简单,只要ByteBuf对象的引用计数大于0,就保证该对象不会被释放回收,可以通过手动调用release()与retain()方法来操作该对象的引用计数值递减或递增。用户也可以通过自定义一个ReferenceCounted的实现类,以满足自定义的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package io.netty.buffer;

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// 由于ByteBuf的实例对象会非常多,所以这里没有将refCnt包装为AtomicInteger
// 而是使用一个全局的AtomicIntegerFieldUpdater来负责操作refCnt
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// 每个ByteBuf的初始引用值都为1
private volatile int refCnt = 1;

public int refCnt() {
return this.refCnt;
}

protected final void setRefCnt(int refCnt) {
this.refCnt = refCnt;
}

public ByteBuf retain() {
return this.retain0(1);
}

// 引用计数值递增increment,increment必须大于0
public ByteBuf retain(int increment) {
return this.retain0(ObjectUtil.checkPositive(increment, "increment"));
}

public static int checkPositive(int i, String name) {
if (i <= 0) {
throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)");
} else {
return i;
}
}

// 使用CAS操作不断尝试更新值
private ByteBuf retain0(int increment) {
int refCnt;
int nextCnt;
do {
refCnt = this.refCnt;
nextCnt = refCnt + increment;
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
} while (!refCntUpdater.compareAndSet(this, refCnt, nextCnt));

return this;
}

public boolean release() {
return this.release0(1);
}

public boolean release(int decrement) {
return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));
}

private boolean release0(int decrement) {
int refCnt;
do {
refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
} while (!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));

if (refCnt == decrement) {
this.deallocate();
return true;
} else {
return false;
}
}

protected abstract void deallocate();
}

分配

堆缓冲区

​ 最常用的ByteBuf 模式是将数据存储在JVM 的堆空间中。这种模式被称为支撑数组(backing array),它能在没有使用池化的情况下提供快速的分配和释放。可以由hasArray()来判断检查ByteBuf 是否由数组支撑。如果不是,则这是一个直接缓冲区

直接缓冲区

直接缓冲区是另外一种ByteBuf 模式。

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。

ByteBufAllocator

Netty 通过interface ByteBufAllocator分配我们所描述过的任意类型的ByteBuf 实例。

名称 描述
buffer() 返回一个基于堆或者直接内存存储的ByteBuf
heapBuffer() 返回一个基于堆内存存储的ByteBuf
directBuffer() 返回一个基于直接内存存储的ByteBuf
compositeBuffer() 返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的CompositeByteBuf
ioBuffer() 返回一个用于套接字的I/O 操作的ByteBuf,当所运行的环境具有sun.misc.Unsafe 支持时,返回基于直接内存存储的ByteBuf,否则返回基于堆内存存储的ByteBuf;当指定使用PreferHeapByteBufAllocator 时,则只会返回基于堆内存存储的ByteBuf。

可以通过Channel(每个都可以有一个不同的ByteBufAllocator 实例)或者绑定到ChannelHandler 的ChannelHandlerContext 获取一个到ByteBufAllocator 的引用。

Netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocatorUnpooled-ByteBufAllocator。前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。

Netty4.1默认使用了PooledByteBufAllocator。

Unpooled缓冲区

Netty 提供了一个简单的称为Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。

  • buffer()

    返回一个未池化的基于堆内存存储的ByteBuf

  • directBuffer()

    返回一个未池化的基于直接内存存储的ByteBuf

  • wrappedBuffer()

    返回一个包装了给定数据的ByteBuf

  • copiedBuffer()

    返回一个复制了给定数据的ByteBuf

  • Unpooled

    该类还可用于ByteBuf 同样可用于那些并不需要Netty 的其他组件的非网络项目。

随机访问索引/顺序访问索引/读写操作

如同在普通的Java 字节数组中一样,ByteBuf 的索引是从零开始的:第一个字节的索引是0,最后一个字节的索引总是capacity() - 1。使用那些需要一个索引值参数(随机访问,也即是数组下标)的方法(的其中)之一来访问数据既不会改变readerIndex 也不会改变writerIndex。如果有需要,也可以通过调用readerIndex(index)或者writerIndex(index)来手动移动这两者。顺序访问通过索引访问

读/写操作
  • get()和set()操作

从给定的索引开始,并且保持索引不变;get+数据字长(bool.byte,int,short,long,bytes)

  • read()和write()操作

    从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。

其他操作
  • isReadable()

    如果至少有一个字节可供读取,则返回true

  • isWritable()

    如果至少有一个字节可被写入,则返回true

  • readableBytes()

    返回可被读取的字节数

  • writableBytes()

    返回可被写入的字节数

  • capacity()

    返回ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直到达到maxCapacity()

  • maxCapacity()

    返回ByteBuf 可以容纳的最大字节数

  • hasArray()

    如果ByteBuf 由一个字节数组支撑,则返回true

  • array()

    如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个UnsupportedOperationException 异常

可丢弃字节

​ 为可丢弃字节的分段包含了已经被读过的字节。通过调用discardRead-Bytes()方法,可以丢弃它们并回收空间。这个分段的初始大小为0,存储在readerIndex 中,会随着read 操作的执行而增加(get*操作不会移动readerIndex)。

​ 缓冲区上调用discardReadBytes()方法后,可丢弃字节分段中的空间已经变为可写的了。频繁地调用discardReadBytes()方法以确保可写分段的最大化,但是请注意,这将极有可能会导致内存复制,因为可读字节必须被移动到缓冲区的开始位置。建议只在有真正需要的时候才这样做,例如,当内存非常宝贵的时候。

可读字节

​ ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的readerIndex 值为0。

可写字节

​ 可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的writerIndex 的默认值为0。任何名称以write 开头的操作都将从当前的writerIndex 处开始写数据,并将它增加已经写入的字节数。

索引管理

​ 调用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()来标记和重置ByteBuf 的readerIndex 和writerIndex。

​ 也可以通过调用readerIndex(int)或者writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个IndexOutOfBoundsException。

​ 可以通过调用clear()方法来将readerIndex 和writerIndex 都设置为0。注意,这并不会清除内存中的内容。

查找操作

在ByteBuf中有多种可以用来确定指定值的索引的方法。最简单的是使用indexOf()方法。

较复杂的查找可以通过调用forEach Byte()。

代码展示了一个查找回车符(\r)的例子。

1
2
ByteBuf byteBuf = ...;
int index = byteBuf.forEachByte(ByteProcessor.FIND_CR);

派生缓冲区

派生缓冲区为ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

  • duplicate();

  • slice();

  • slice(int, int);

  • Unpooled.unmodifiableBuffer(…);

  • order(ByteOrder);

  • readSlice(int)。

    ​ 每个这些方法都将返回一个新的ByteBuf 实例,它具有自己的读索引、写索引和标记索引。其内部存储和JDK 的ByteBuffer 一样也是共享的。

    ByteBuf 复制 如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int, int)方法。不同于派生缓冲区,由这个调用所返回的ByteBuf 拥有独立的数据副本。

引用计数

​ 引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。Netty 在第4 版中为ByteBuf引入了引用计数技术, interface ReferenceCounted。

工具类

ByteBufUtil 提供了用于操作ByteBuf 的静态的辅助方法。因为这个API 是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。

​ 这些静态方法中最有价值的可能就是hexdump()方法,它以十六进制的表示形式打印ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记录ByteBuf 的内容。十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。

​ 另一个有用的方法是boolean equals(ByteBuf, ByteBuf),它被用来判断两个ByteBuf实例的相等性。

资源释放

​ 当某个ChannelInboundHandler 的实现重写channelRead()方法时,它要负责显式地释放与池化的ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法ReferenceCountUtil.release()

​ Netty 将使用WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用SimpleChannelInboundHandler,SimpleChannelInboundHandler 会自动释放资源。

  1. 对于入站请求,Netty的EventLoo在处理Channel的读操作时进行分配ByteBuf,对于这类ByteBuf,需要我们自行进行释放,有三种方式,或者使用SimpleChannelInboundHandler,或者在重写channelRead()方法使用ReferenceCountUtil.release()或者使用ctx.fireChannelRead继续向后传递。

  2. 对于出站请求,不管ByteBuf是否由我们的业务创建的,当调用了write或者writeAndFlush方法后,Netty会自动替我们释放,不需要我们业务代码自行释放。

评论