在使用Netty时,都写过下面的代码,一开始菜跟懒,根本都不了解EventLoopGroup是个什么鬼东西。
1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventLoopGroup
根据以下类图可以得出几个结论

EventLoopGroup是一个支持定时的线程池- 从
EventExecutorGroup看出EventLoopGroup支持迭代功能,迭代的对象为EventExecutor(EventExecutor是一个特殊的EventExecutorGroup,可以大致理解EventExecutor是真正负责工作的,而EventExecutorGroup只是迭代(选择)出EventExecutor来工作,其实就是线程池和线程的关系,看完下面的具体实现懂了)
再看代码,EventLoopGroup将next()方法的对象进一步具体到了EventLoop,同时定义了几个Channel的register方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
*/
@Override
EventLoop next();
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(ChannelPromise promise);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
*
* @deprecated Use {@link #register(ChannelPromise)} instead.
*/
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}
接着用最常使用的NioEventLoopGroup看看
NioEventLoopGroup

结合类图,我们从父类到子类讲。
AbstractEventExecutorGroup
先看AbstractEventExecutorGroup,它实现了线程池工作的所有方法,实现的也很简单,就是调用next()方法获取EventExecutor(放在NioEventLoopGroup里就是NioEventLoop),让它去执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
......
}
MultithreadEventExecutorGroup
这个类最主要的方法有两个
-
构造方法,包含了整个线程池创建的核心,同时提供模板方法
newChild(Executor executor, Object... args)给子类创建不同的EventExecutor,具体创建逻辑放后面NioEventLoopGroup创建流程介绍。 -
其次就是实现了
next()方法,使用了EventExecutorChooserFactory.EventExecutorChooser去选择EventExecutor1 2 3 4
@Override public EventExecutor next() { return chooser.next(); }
MultithreadEventLoopGroup
这个类实现EventLoopGroup定义的方法
- 实现
EventLoopGroup#next()同时也是重写父类的next()方法,因为EventLoop是EventExecutor子类 - 实现了
EventLoopGroup中声明的Channel的register方法,实现的方法也一样,调用next()方法交由EventLoop进行register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
......
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
NioEventLoopGroup
抛开构造方法,NioEventLoopGroup实现的就是一个newChild方法,创建具体的EventLoop (NioEventLoop)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
......
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
}
大概了解了类的关系后,接下来可以看看NioEventLoopGroup的创建过程了
NioEventLoopGroup创建
跟踪NioEventLoopGroup的构造方法,最终会来到MultithreadEventExecutorGroup构造方法中,主要干3几件事情
- 检查或使用
DefaultThreadFactory创建线程执行器(DefaultThreadFactory使用FastThreadLocalRunnable包装Runnable,使用FastThreadLocalThread执行任务) - 创建指定线程数量的
EventExecutor,这里用了模板方法,具体的创建交由子类完成,至于线程数量没设置的话默认是cpu线程数*2(见MultithreadEventLoopGroup#DEFAULT_EVENT_LOOP_THREADS) - 创建
EventExecutor选择器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
//1.
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//2.这里放到EventLoop的时候看
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//3.使用工厂方法创建EventExecutorChooser,DefaultEventExecutorChooserFactory根据EventExecutor数量创建不同的EventExecutorChooser
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//复制一份EventExecutor只读集合
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
总结
EventLoopGroup其实就是个线程池,具体的工作会交由EventLoop去执行,它只负责选择EventLoop- 与Java线程池对比,它的线程数量是稳定的,同时
EventLoopGroup少了一些东西,任务队列、拒绝策略(其实并不是没有,只是不在EventLoopGroup中,而是在EventLoop中,可以回头看看NioEventLoopGroup#newChild)