抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

NETTY组件-CHANNEL

Channel接口

​ Netty中的Channel与Java NIO的概念一样,都是对一个实体或连接的抽象,但Netty提供了一套更加通用的API。就以网络套接字为例,在Java中OIO与NIO是截然不同的两套API,假设你之前使用的是OIO而又想更改为NIO实现,那么几乎需要重写所有代码。而在Netty中,只需要更改短短几行代码(更改Channel与EventLoop的实现类,如把OioServerSocketChannel替换为NioServerSocketChannel),就可以完成OIO与NIO(或其他)之间的转换。

​ 每个Channel最终都会被分配一个ChannelPipeline和ChannelConfig,前者持有所有负责处理入站与出站数据以及事件的ChannelHandler,后者包含了该Channel的所有配置设置,并且支持热更新,由于不同的传输类型可能具有其特别的配置,所以该类可能会实现为ChannelConfig的不同子类。

常用实现

Channel的产生是为了降低网络传输变成的复杂性,它是传入传出数据的载体,可以打开或者关闭,连接或断开。可以当做它是Socket的升级,大大降低了直接与 Socket 进行操作的复杂性。

Nmae Package Description
NIO io.netty.channel.socket.nio 以Java NIO为基础实现
OIO io.netty.channel.socket.oio 以java.net为基础实现,使用阻塞I/O模型
Epoll io.netty.channel.epoll 由JNI驱动epoll()实现的更高性能的非阻塞I/O,它只能使用在Linux
Local io.netty.channel.local 本地传输,在JVM内部通过管道进行通信
Embedded io.netty.channel.embedded 允许在不需要真实网络传输的环境下使用ChannelHandler,主要用于对ChannelHandler进行测试
选择合适的内置通信传输模式
  • NIO

    io.netty.channel.socket.nio 使用java.nio.channels 包作为基础——基于选择器的方式

  • Epoll

    io.netty.channel.epoll 由 JNI 驱动的 epoll()和非阻塞 IO。这个传输支持只有在Linux 上可用的多种特性,如SO_REUSEPORT,比NIO 传输更快,而且是完全非阻塞的。将NioEventLoopGroup替换为EpollEventLoopGroup , 并且将NioServerSocketChannel.class 替换为EpollServerSocketChannel.class 即可。

  • OIO

    io.netty.channel.socket.oio 使用java.net 包作为基础——使用阻塞流

  • Local

    io.netty.channel.local 可以在VM 内部通过管道进行通信的本地传输Local传输用于在同一个JVM中运行的客户端和服务器程序之间的异步通信,与服务器Channel相关联的SocketAddress并没有绑定真正的物理网络地址,它会被存储在注册表中,并在Channel关闭时注销。因此Local传输不会接受真正的网络流量,也就是说它不能与其他传输实现进行互操作。

  • Embedded

    io.netty.channel.embedded Embedded 传输,允许使用ChannelHandler 而又不需要一个真正的基于网络的传输。在测试ChannelHandler 实现时非常有用

    Embedded传输主要用于对ChannelHandler进行单元测试,ChannelHandler是用于处理消息的逻辑组件,Netty通过将入站消息与出站消息都写入到EmbeddedChannel中的方式(提供了write/readInbound()与write/readOutbound()来读写入站与出站消息)来实现对ChannelHandler的单元测试。

常用操作

​ 基本的I/O 操作(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。在基于Java 的网络编程中,其基本的构造是类Socket。Netty 的Channel 接口所提供的API,被用于所有的I/O 操作。大大地降低了直接使用Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。

​ 由于Channel 是独一无二的,所以为了保证顺序将Channel 声明为java.lang.Comparable 的一个子接口。因此,如果两个不同的Channel 实例都返回了相同的散列码,那么AbstractChannel 中的compareTo()方法的实现将会抛出一个Error。

生命周期状态

  • ChannelUnregistered :Channel 已经被创建,但还未注册到EventLoop

  • ChannelRegistered :Channel 已经被注册到了EventLoop

  • ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了

  • ChannelInactive :Channel 没有连接到远程节点

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。

最重要的方法

  • eventLoop: 返回分配给Channel 的EventLoop

  • pipeline: 返回分配给Channel 的ChannelPipeline

  • isActive: 如果Channel 是活动的,则返回true。活动的意义可能依赖于底层的传输。例如,一个Socket 传输一旦连接到了远程节点便是活动的,而一个Datagram 传输一旦被打开便是活动的。

  • localAddress: 返回本地的SokcetAddress

  • remoteAddress: 返回远程的SocketAddress

  • write: 将数据写到远程节点。这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷

  • flush: 将之前已写的数据冲刷到底层传输,如一个Socket

  • writeAndFlush: 一个简便的方法,等同于调用write()并接着调用flush()

ChannelFuture 接口

​ Netty 中所有的I/O 操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture 接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。

  1. 可以将ChannelFuture看作是将来要执行的操作的结果占位符,什么时候被执行,不知道。但肯定会被执行
  2. 属于同一个Channel的操作(回调函数)都被保证将按照注册的顺序执行。

比如有直接阻塞获取的方式:

1
2
/*绑定到端口,阻塞等待直到连接完成*/
ChannelFuture f = b.bind().sync();

异步监听的方式

1
2
3
4
5
6
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}

ChannelHandler接口

​ 从应用程序开发人员的角度来看,Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 的方法是由网络事件触发的。事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。

​ 举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler 直接冲刷数据然后输出到对端。应用程序的业务逻辑通常实现在一个或者多个ChannelInboundHandler 中。

​ 这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被应用程序的业务逻辑所处理。

​ ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器,该类是基于事件驱动的,它会响应相关的事件然后去调用其关联的回调函数,例如当一个新的连接被建立时,ChannelHandler的channelActive()方法将会被调用。

​ 关于入站消息和出站消息的数据流向定义,如果以客户端为主视角来说的话,那么从客户端流向服务器的数据被称为出站,反之为入站。

​ 入站事件是可能被入站数据或者相关的状态更改而触发的事件,包括:连接已被激活、连接失活、读取入站数据、用户事件、发生异常等。

​ 出站事件是未来将会触发的某个动作的结果的事件,这些动作包括:打开或关闭远程节点的连接、将数据写(或冲刷)到套接字。

ChannelHandler的作用

ChannelHandler的主要用途包括:

  • 对入站与出站数据的业务逻辑处理
  • 记录日志
  • 将数据从一种格式转换为另一种格式,实现编解码器。以一次HTTP协议(或者其他应用层协议)的流程为例,数据在网络传输时的单位为字节,当客户端发送请求到服务器时,服务器需要通过解码器(处理入站消息)将字节解码为协议的消息内容,服务器在发送响应的时候(处理出站消息),还需要通过编码器将消息内容编码为字节。
  • 捕获异常
  • 提供Channel生命周期内的通知,如Channel活动时与非活动时

​ Netty中到处都充满了异步与事件驱动,而回调函数正是用于响应事件之后的操作。由于异步会直接返回一个结果,所以Netty提供了ChannelFuture(实现了java.util.concurrent.Future)来作为异步调用返回的占位符,真正的结果会在未来的某个时刻完成,到时候就可以通过ChannelFuture对其进行访问,每个Netty的出站I/O操作都将会返回一个ChannelFuture。

ChannelFutureListener接口

Netty还提供了ChannelFutureListener接口来监听ChannelFuture是否成功,并采取对应的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel channel = ...
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 6666));
// 注册一个监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// do something....
} else {
// 输出错误信息
Throwable cause = future.cause();
cause.printStackTrace();
// do something....
}
}
});
ChannelFutureListener实现

ChannelFutureListener接口中还提供了几个简单的默认实现,方便我们使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
// 在Future完成时关闭
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
// 如果失败则关闭
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().close();
}

}
};
// 将异常信息传递给下一个ChannelHandler
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}

}
};
}

ChannelHandler接口定义了对它生命周期进行监听的回调函数,在ChannelHandler被添加到ChannelPipeline或者被移除时都会调用这些函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext var1) throws Exception;

void handlerRemoved(ChannelHandlerContext var1) throws Exception;

/** @deprecated */
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

// 该注解表明这个ChannelHandler可被其他线程复用
@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}

入站消息与出站消息由其对应的接口ChannelInboundHandler与ChannelOutboundHandler负责,这两个接口定义了监听Channel的生命周期的状态改变事件的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public interface ChannelInboundHandler extends ChannelHandler {
// 当channel被注册到EventLoop时被调用
void channelRegistered(ChannelHandlerContext var1) throws Exception;

// 当channel已经被创建,但还未注册到EventLoop(或者从EventLoop中注销)被调用
void channelUnregistered(ChannelHandlerContext var1) throws Exception;

// 当channel处于活动状态(连接到远程节点)被调用
void channelActive(ChannelHandlerContext var1) throws Exception;

// 当channel处于非活动状态(没有连接到远程节点)被调用
void channelInactive(ChannelHandlerContext var1) throws Exception;

// 当从channel读取数据时被调用
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

// 当channel的上一个读操作完成时被调用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;

// 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

// 当channel的可写状态发生改变时被调用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

// 当处理过程中发生异常时被调用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

package io.netty.channel;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;

public interface ChannelOutboundHandler extends ChannelHandler {

// 当请求将Channel绑定到一个地址时被调用
// ChannelPromise是ChannelFuture的一个子接口,定义了如setSuccess(),setFailure()等方法
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

// 当请求将Channel连接到远程节点时被调用
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

// 当请求将Channel从远程节点断开时被调用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

// 当请求关闭Channel时被调用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

// 当请求将Channel从它的EventLoop中注销时被调用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

// 当请求从Channel读取数据时被调用
void read(ChannelHandlerContext var1) throws Exception;

// 当请求通过Channel将数据写到远程节点时被调用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

// 当请求通过Channel将缓冲中的数据冲刷到远程节点时被调用
void flush(ChannelHandlerContext var1) throws Exception;
}

​ 通过实现ChannelInboundHandler或者ChannelOutboundHandler就可以完成用户自定义的应用逻辑处理程序,不过Netty已经帮你实现了一些基本操作,用户只需要继承并扩展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter来作为自定义实现的起始点。

​ ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter都继承于ChannelHandlerAdapter,该抽象类简单实现了ChannelHandler接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ChannelHandlerAdapter(){
}

// 该方法不允许将此ChannelHandler共享复用
protected void ensureNotSharable(){
if(this.isSharable()){
throw new IllegalStateException("ChannelHandler "+this.getClass().getName()+" is not allowed to be shared");
}
}

// 使用反射判断实现类有没有@Sharable注解,以确认该类是否为可共享复用的
public boolean isSharable(){
Class clazz=this.getClass();
Map cache=InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable=(Boolean)cache.get(clazz);
if(sharable==null){
sharable=Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));
cache.put(clazz,sharable);
}

return sharable.booleanValue();
}

public void handlerAdded(ChannelHandlerContext ctx)throws Exception{
}

public void handlerRemoved(ChannelHandlerContext ctx)throws Exception{
}

public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)throws Exception{
ctx.fireExceptionCaught(cause);
}
}

ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter默认只是简单地将请求传递给ChannelPipeline中的下一个ChannelHandler,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() {
}

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
public ChannelOutboundHandlerAdapter() {
}

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}

public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}

public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}

public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}

public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

​ 对于处理入站消息,另外一种选择是继承SimpleChannelInboundHandler,它是Netty的一个继承于ChannelInboundHandlerAdapter的抽象类,并在其之上实现了自动释放资源的功能。

​ 我们在了解ByteBuf时就已经知道了Netty使用了一套自己实现的引用计数算法来主动释放资源,假设你的ChannelHandler继承于ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那么你就有责任去管理你所分配的ByteBuf,一般来说,一个消息对象(ByteBuf)已经被消费(或丢弃)了,并且不会传递给ChannelHandler链中的下一个处理器(如果该消息到达了实际的传输层,那么当它被写入或Channel关闭时,都会被自动释放),那么你就需要去手动释放它。通过一个简单的工具类ReferenceCountUtil的release方法,就可以做到这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 这个泛型为消息对象的类型
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;

protected SimpleChannelInboundHandler() {
this(true);
}

protected SimpleChannelInboundHandler(boolean autoRelease) {
this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}

protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}

protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
this.matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}

public boolean acceptInboundMessage(Object msg) throws Exception {
return this.matcher.match(msg);
}

// SimpleChannelInboundHandler只是替你做了ReferenceCountUtil.release()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;

try {
if (this.acceptInboundMessage(msg)) {
this.channelRead0(ctx, msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (this.autoRelease && release) {
ReferenceCountUtil.release(msg);
}

}

}

// 这个方法才是我们需要实现的方法
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;
}

// ReferenceCountUtil中的源码,release方法对消息对象的类型进行判断然后调用它的release()方法
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted) msg).release() : false;
}

ChannelHandler 的生命周期

​ 接口 ChannelHandler 定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline 中或者被从ChannelPipeline 中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext 参数。

  • handlerAdded :当把ChannelHandler 添加到ChannelPipeline 中时被调用

  • handlerRemoved : 当从ChannelPipeline 中移除ChannelHandler 时被调用

  • exceptionCaught : 当处理过程中在ChannelPipeline 中有错误产生时被调用

Netty 定义了下面两个重要的ChannelHandler 子接口:

  • ChannelInboundHandler——处理入站数据以及各种状态变化;

  • ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。

ChannelInboundHandler

下面列出了接口 ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel 状态发生改变时被调用。正如我们前面所提到的,这些方法和Channel 的生命周期密切相关。

  • channelRegistered

    当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用

  • channelUnregistered

    当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用

  • channelActive

    当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪

  • channelInactive

    当Channel 离开活动状态并且不再连接它的远程节点时被调用

  • channelReadComplete

    当Channel上的一个读操作完成时被调用

  • channelRead

    当从Channel 读取数据时被调用

  • ChannelWritabilityChanged

    当Channel 的可写状态发生改变时被调用。可以通过调用Channel 的isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置

  • userEventTriggered

    当ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用。

ChannelOutboundHandler接口

出站操作和数据将由ChannelOutboundHandler 处理。它的方法将被Channel、Channel-

Pipeline 以及ChannelHandlerContext 调用。

所有由ChannelOutboundHandler 本身所定义的方法:

  • bind(ChannelHandlerContext,SocketAddress,ChannelPromise)

    当请求将Channel 绑定到本地地址时被调用

  • connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)

    当请求将Channel 连接到远程节点时被调用

  • disconnect(ChannelHandlerContext,ChannelPromise)

    当请求将Channel 从远程节点断开时被调用

  • close(ChannelHandlerContext,ChannelPromise)

    当请求关闭Channel 时被调用

  • deregister(ChannelHandlerContext,ChannelPromise)

    当请求将Channel 从它的EventLoop 注销时被调用

  • read(ChannelHandlerContext)

    当请求从Channel 读取更多的数据时被调用

  • flush(ChannelHandlerContext)

    当请求通过Channel 将入队数据冲刷到远程节点时被调用

  • write(ChannelHandlerContext,Object,ChannelPromise)

    当请求通过Channel 将数据写到远程节点时被调用

ChannelHandler的适配器

​ 有一些适配器类可以将编写自定义的ChannelHandler 所需要的工作降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。因为你有时会忽略那些不感兴趣的事件,所以Netty提供了抽象基类ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter。

​ 你可以使用ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler 的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler 的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler 的方法。

​ ChannelHandlerAdapter 还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline。

ChannelPipeline 接口

​ 为了模块化与解耦合,不可能由一个ChannelHandler来完成所有应用逻辑,所以Netty采用了拦截器链的设计。ChannelPipeline就是用来管理ChannelHandler实例链的容器,它的职责就是保证实例链的流动。

​ 每一个新创建的Channel都将会被分配一个新的ChannelPipeline,这种关联关系是永久性的,一个Channel一生只能对应一个ChannelPipeline。

​ 当Channel 被创建时,它将会被自动地分配一个新的ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

​ 使得事件流经ChannelPipeline 是ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler。它们的执行顺序是由它们被添加的顺序所决定的。

​ 入站和出站ChannelHandler 可以被安装到同一个ChannelPipeline中。如果一个消息或者任何其他的入站事件被读取,那么它会从ChannelPipeline 的头部开始流动,最终,数据将会到达ChannelPipeline 的尾端,届时,所有处理就都结束了。

​ 数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,这里显示为Socket。通常情况下,这将触发一个写操作。

​ 如果将两个类别的ChannelHandler都混合添加到同一个ChannelPipeline 中会发生什么。虽然ChannelInboundHandle 和ChannelOutboundHandle 都扩展自ChannelHandler,但是Netty 能区分ChannelInboundHandler实现和ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个ChannelHandler 之间传递。

​ 一个入站事件被触发时,它会先从ChannelPipeline的最左端(头部)开始一直传播到ChannelPipeline的最右端(尾部),而出站事件正好与入站事件顺序相反(从最右端一直传播到最左端)。这个顺序是定死的,Netty总是将ChannelPipeline的入站口作为头部,而将出站口作为尾部。在事件传播的过程中,ChannelPipeline会判断下一个ChannelHandler的类型是否和事件的运动方向相匹配,如果不匹配,就跳过该ChannelHandler并继续检查下一个(保证入站事件只会被ChannelInboundHandler处理),一个ChannelHandler也可以同时实现ChannelInboundHandler与ChannelOutboundHandler,它在入站事件与出站事件中都会被调用。

​ 在阅读ChannelHandler的源码时,发现很多方法需要一个ChannelHandlerContext类型的参数,该接口是ChannelPipeline与ChannelHandler之间相关联的关键。ChannelHandlerContext可以通知ChannelPipeline中的当前ChannelHandler的下一个ChannelHandler,还可以动态地改变当前ChannelHandler在ChannelPipeline中的位置(通过调用ChannelPipeline中的各种方法来修改)。

​ ChannelHandlerContext负责了在同一个ChannelPipeline中的ChannelHandler与其他ChannelHandler之间的交互,每个ChannelHandlerContext都对应了一个ChannelHandler。在DefaultChannelPipeline的源码中,已经表现的很明显了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DefaultChannelPipeline implements ChannelPipeline {
.........
// 头部节点和尾部节点的引用变量
// ChannelHandlerContext在ChannelPipeline中是以链表的形式组织的
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
.........

// 添加一个ChannelHandler到链表尾部
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup) null, name, handler);
}

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查ChannelHandler是否为一个共享对象(@Sharable)
// 如果该ChannelHandler没有@Sharable注解,并且是已被添加过的那么就抛出异常
checkMultiplicity(handler);
// 返回一个DefaultChannelHandlerContext,注意该对象持有了传入的ChannelHandler
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
// 如果当前ChannelPipeline没有被注册,那么就先加到未决链表中
if (!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}

// 否则就调用ChannelHandler中的handlerAdded()
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
public void run() {
DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
}
});
return this;
}
}

this.callHandlerAdded0(newCtx);
return this;
}

// 将新的ChannelHandlerContext插入到尾部与尾部之前的节点之间
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
.....
}

​ ChannelHandlerContext还定义了许多与Channel和ChannelPipeline重合的方法(像read()、write()、connect()这些用于出站的方法或者如fireChannelXXXX()这样用于入站的方法),不同之处在于调用Channel或者ChannelPipeline上的这些方法,它们将会从头沿着整个ChannelHandler实例链进行传播,而调用位于ChannelHandlerContext上的相同方法,则会从当前所关联的ChannelHandler开始,且只会传播给实例链中的下一个ChannelHandler。而且,事件之间的移动(从一个ChannelHandler到下一个ChannelHandler)也是通过ChannelHandlerContext中的方法调用完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class DefaultChannelPipeline implements ChannelPipeline {

public final ChannelPipeline fireChannelRead(Object msg) {
// 注意这里将头节点传入了进去
AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
return this;
}

}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}

}

private void invokeChannelRead(Object msg) {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler) this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.notifyHandlerException(var3);
}
} else {
// 寻找下一个ChannelHandler
this.fireChannelRead(msg);
}

}

public ChannelHandlerContext fireChannelRead(Object msg) {
invokeChannelRead(this.findContextInbound(), msg);
return this;
}

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;

do {
ctx = ctx.next;
} while (!ctx.inbound); // 直到找到一个ChannelInboundHandler

return ctx;
}

}

ChannelPipeline上的方法

  • addFirst、addBefore、addAfter、addLast

    将一个ChannelHandler 添加到ChannelPipeline 中

  • remove

    将一个ChannelHandler 从ChannelPipeline 中移除

  • replace

    将ChannelPipeline 中的一个ChannelHandler 替换为另一个ChannelHandler

  • get

    通过类型或者名称返回ChannelHandler

  • context

    返回和ChannelHandler 绑定的ChannelHandlerContext

  • names

    返回ChannelPipeline 中所有ChannelHandler 的名称

ChannelPipeline 的API 公开了用于调用入站和出站操作的附加方法。

ChannelHandlerContext

​ 通过使用作为参数传递到每个方法的ChannelHandlerContext,事件可以被传递给当前ChannelHandler 链中的下一个ChannelHandler。虽然这个对象可以被用于获取底层的Channel,但是它主要还是被用于写出站数据。

​ ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之间的关联,每当有ChannelHandler 添加到ChannelPipeline 中时,都会创建ChannelHandler-Context。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler 和在同一个ChannelPipeline 中的其他ChannelHandler 之间的交互。

​ ChannelHandlerContext 有很多的方法,其中一些方法也存在于Channel 和Channel-Pipeline 本身上,但是有一点重要的不同。如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的ChannelHandler。

ChannelHandlerContext的API
  • alloc

    返回和这个实例相关联的Channel 所配置的ByteBufAllocator

  • bind

    绑定到给定的SocketAddress,并返回ChannelFuture

  • channel

    返回绑定到这个实例的Channel

  • close

    关闭Channel,并返回ChannelFuture

  • connect

    连接给定的SocketAddress,并返回ChannelFuture

  • deregister

    从之前分配的EventExecutor 注销,并返回ChannelFuture

  • disconnect

    从远程节点断开,并返回ChannelFuture

  • executor

    返回调度事件的EventExecutor

  • fireChannelActive

    触发对下一个ChannelInboundHandler 上的channelActive()方法(已连接)的调用

  • fireChannelInactive

    触发对下一个ChannelInboundHandler 上的channelInactive()方法(已关闭)的调用

  • fireChannelRead

    触发对下一个ChannelInboundHandler 上的channelRead()方法(已接收的消息)的调用

  • fireChannelReadComplete

    触发对下一个ChannelInboundHandler 上的channelReadComplete()方法的调用

  • fireChannelRegistered

    触发对下一个ChannelInboundHandler 上的fireChannelRegistered()方法的调用

  • fireChannelUnregistered

    触发对下一个ChannelInboundHandler 上的fireChannelUnregistered()方法的调用

  • fireChannelWritabilityChanged

    触发对下一个ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用

  • fireExceptionCaught

    触发对下一个ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用

  • fireUserEventTriggered

    触发对下一个ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用

  • handler

    返回绑定到这个实例的ChannelHandler

  • isRemoved

    如果所关联的ChannelHandler 已经被从ChannelPipeline中移除则返回true

  • name

    返回这个实例的唯一名称

  • pipeline

    返回这个实例所关联的ChannelPipeline

  • read

    将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法

当使用ChannelHandlerContext 的API 的时候,有以下两点:

  • ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;

  • 如同我们在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

评论