抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

NETTY组件-EventLoop

EventLoop 接口

​ 回想一下我们在NIO中是如何处理我们关心的事件的?在一个while循环中select出事件,然后依次处理每种事件。我们可以把它称为事件循环,这就是EventLoop。interface io.netty.channel. EventLoop 定义了Netty 的核心抽象,用于处理网络连接的生命周期中所发生的事件。

​ io.netty.util.concurrent 包构建在JDK 的java.util.concurrent 包上。而 io.netty.channel 包中的类,为了与Channel 的事件进行交互,扩展了这些接口/类。一个EventLoop 将由一个永远都不会改变的Thread 驱动,同时任务(Runnable 或者Callable)可以直接提交给EventLoop 实现,以立即执行或者调度执行。

架构设计

​ 为了最大限度地提供高性能和可维护性,Netty设计了一套强大又易用的线程模型。在一个网络框架中,最重要的能力是能够快速高效地处理在连接的生命周期内发生的各种事件,与之相匹配的程序构造被称为事件循环,Netty定义了接口EventLoop来负责这项工作。

​ 如果是经常用Java进行多线程开发的童鞋想必经常会使用到线程池,也就是Executor这套API。Netty就是从 Executor(java.util.concurrent)之上扩展了自己的EventExecutorGroup(io.netty.util.concurrent),同时为了与Channel的事件进行交互,还扩展了EventLoopGroup接口(io.netty.channel)。在io.netty.util.concurrent包下的EventExecutorXXX负责实现线程并发相关的工作,而在io.netty.channel包下的EventLoopXXX负责实现网络编程相关的工作(处理Channel中的事件)。

​ 在Netty的线程模型中,一个EventLoop将由一个永远不会改变的Thread驱动,而一个Channel一生只会使用一个EventLoop(但是一个EventLoop可能会被指派用于服务多个Channel),在Channel中的所有I/O操作和事件都由EventLoop中的线程处理,也就是说一个Channel的一生之中都只会使用到一个线程。不过在Netty3,只有入站事件会被EventLoop处理,所有出站事件都会由调用线程处理,这种设计导致了ChannelHandler的线程安全问题。Netty4简化了线程模型,通过在同一个线程处理所有事件,既解决了这个问题,还提供了一个更加简单的架构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647));
private final Queue<Runnable> tailTasks;

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
this.tailTasks = (Queue)ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

// 返回它所在的EventLoopGroup
public EventLoopGroup parent() {
return (EventLoopGroup)super.parent();
}

public EventLoop next() {
return (EventLoop)super.next();
}

// 注册Channel,这里ChannelPromise和Channel关联到了一起
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}

public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

/** @deprecated */
@Deprecated
public ChannelFuture register(Channel channel, ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
ObjectUtil.checkNotNull(channel, "channel");
channel.unsafe().register(this, promise);
return promise;
}

// 剩下这些函数都是用于调度任务
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (this.isShutdown()) {
reject();
}

if (!this.tailTasks.offer(task)) {
this.reject(task);
}

if (!(task instanceof LazyRunnable) && this.wakesUpForTask(task)) {
this.wakeup(this.inEventLoop());
}

}

final boolean removeAfterEventLoopIterationTask(Runnable task) {
return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}

protected void afterRunningAllTasks() {
this.runAllTasksFrom(this.tailTasks);
}

protected boolean hasTasks() {
return super.hasTasks() || !this.tailTasks.isEmpty();
}

public int pendingTasks() {
return super.pendingTasks() + this.tailTasks.size();
}

public int registeredChannels() {
return -1;
}
}

​ 根据配置和可用核心的不同,可能会创建多个EventLoop 实例用以优化资源的使用,并且单个EventLoop 可能会被指派用于服务多个Channel。

​ Netty的EventLoop在继承了ScheduledExecutorService的同时,只定义了一个方法,parent()。在Netty 4 中,所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。

​ 为了确保一个Channel的整个生命周期中的I/O事件会被一个EventLoop负责,Netty通过inEventLoop()方法来判断当前执行的线程的身份,确定它是否是分配给当前Channel以及它的EventLoop的那一个线程。如果当前(调用)线程正是EventLoop中的线程,那么所提交的任务将会被直接执行,否则,EventLoop将调度该任务以便稍后执行,并将它放入内部的任务队列(每个EventLoop都有它自己的任务队列,从SingleThreadEventLoop的源码就能发现很多用于调度内部任务队列的方法),在下次处理它的事件时,将会执行队列中的那些任务。这种设计可以让任何线程与Channel直接交互,而无需在ChannelHandler中进行额外的同步。

​ 从性能上来考虑,千万不要将一个需要长时间来运行的任务放入到任务队列中,它会影响到该队列中的其他任务的执行。解决方案是使用一个专门的EventExecutor来执行它(ChannelPipeline提供了带有EventExecutorGroup参数的addXXX()方法,该方法可以将传入的ChannelHandler绑定到你传入的EventExecutor之中),这样它就会在另一条线程中执行,与其他任务隔离。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
.....

public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
} else {
boolean inEventLoop = this.inEventLoop();
if (inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
if (this.isShutdown() && this.removeTask(task)) {
reject();
}
}

if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop);
}

}
}

public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}

.....
}

​ EventLoopGroup负责管理和分配EventLoop(创建EventLoop和为每个新创建的Channel分配EventLoop),根据不同的传输类型,EventLoop的创建和分配方式也不同。例如,使用NIO传输类型,EventLoopGroup就会只使用较少的EventLoop(一个EventLoop服务于多个Channel),这是因为NIO基于I/O多路复用,一个线程可以处理多个连接,而如果使用的是OIO,那么新创建一个Channel(连接)就需要分配一个EventLoop(线程)。

任务调度

​ 偶尔,你将需要调度一个任务以便稍后(延迟)执行或者周期性地执行。例如,你可能想要注册一个在客户端已经连接了5 分钟之后触发的任务。一个常见的用例是,发送心跳消息到远程节点,以检查连接是否仍然还活着。如果没有响应,你便知道可以关闭该Channel 了。

线程管理

​ 在内部,当提交任务到如果(当前)调用线程正是支撑EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。

线程的分配

服务于Channel 的I/O 和事件的EventLoop 则包含在EventLoopGroup 中。

​ 异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个Channel 所共享。这使得可以通过尽可能少量的Thread 来支撑大量的Channel,而不是每个Channel 分配一个Thread。EventLoopGroup 负责为每个新创建的Channel 分配一个EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的EventLoop可能会被分配给多个Channel。

​ 一旦一个Channel 被分配给一个EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的Thread)。请牢记这一点,因为它可以使你从担忧你的ChannelHandler 实现中的线程安全和同步问题中解脱出来。

​ 需要注意,EventLoop 的分配方式对ThreadLocal 的使用的影响。因为一个EventLoop 通常会被用于支撑多个Channel,所以对于所有相关联的Channel 来说,ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然可以被用于在多个Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。

和其他组件的关系

​ Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象。

​ Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。
下图是Channel、EventLoop、Thread、EventLoopGroup之间的关系(摘自《Netty In Action》):

Netty提供的EventLoop结合了JDK的并发编程和Channel的事件,能够帮助用户实现周期性任务调度任务,类层次如下

​ 当一个连接到达时,Netty 就会注册一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 绑定到这个Channel上,在该Channel的整个生命周期中都是有这个绑定的 EventLoop 来服务的。所以有如下约定俗成的关系(非常重要):

  • 一个 EventLoopGroup 包含一个或多个 EventLoop。
  • 一个 EventLoop 在它的生命周期内只能与一个Thread绑定。
  • 所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理。
  • 一个 Channel 在它的生命周期内只能注册与一个 EventLoop。
  • 一个 EventLoop 可被分配至一个或多个 Channel 。

当一个连接到达时,Netty 就会注册一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 绑定到这个Channel上,在该Channel的整个生命周期中都是有这个绑定的 EventLoop 来服务的。

评论