读 Netty NioEventLoopGroup

"读 Netty NioEventLoopGroup"

Posted by tablesheep on May 5, 2022

在使用Netty时,都写过下面的代码,一开始菜跟懒,根本都不了解EventLoopGroup是个什么鬼东西。

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

EventLoopGroup

根据以下类图可以得出几个结论

EventLoopGroup

  • EventLoopGroup是一个支持定时的线程池
  • EventExecutorGroup看出EventLoopGroup支持迭代功能,迭代的对象为EventExecutorEventExecutor是一个特殊的EventExecutorGroup,可以大致理解EventExecutor是真正负责工作的,而EventExecutorGroup只是迭代(选择)出EventExecutor来工作,其实就是线程池和线程的关系,看完下面的具体实现懂了)

再看代码,EventLoopGroupnext()方法的对象进一步具体到了EventLoop,同时定义了几个Channelregister方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    @Override
    EventLoop next();

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     */
    ChannelFuture register(Channel channel);

    /**
     * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
     * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
     */
    ChannelFuture register(ChannelPromise promise);

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
     * will get notified once the registration was complete and also will get returned.
     *
     * @deprecated Use {@link #register(ChannelPromise)} instead.
     */
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

接着用最常使用的NioEventLoopGroup看看

NioEventLoopGroup

NioEventLoopGroup

结合类图,我们从父类到子类讲。

AbstractEventExecutorGroup

先看AbstractEventExecutorGroup,它实现了线程池工作的所有方法,实现的也很简单,就是调用next()方法获取EventExecutor(放在NioEventLoopGroup里就是NioEventLoop),让它去执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }
    ......
}

MultithreadEventExecutorGroup

这个类最主要的方法有两个

  • 构造方法,包含了整个线程池创建的核心,同时提供模板方法newChild(Executor executor, Object... args)给子类创建不同的EventExecutor,具体创建逻辑放后面NioEventLoopGroup创建流程介绍。

  • 其次就是实现了next()方法,使用了EventExecutorChooserFactory.EventExecutorChooser去选择EventExecutor

    1
    2
    3
    4
    
    @Override
    public EventExecutor next() {
        return chooser.next();
    }
    

MultithreadEventLoopGroup

这个类实现EventLoopGroup定义的方法

  • 实现EventLoopGroup#next()同时也是重写父类的next()方法,因为EventLoopEventExecutor子类
  • 实现了EventLoopGroup中声明的Channelregister方法,实现的方法也一样,调用next()方法交由EventLoop进行register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
......

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }

}

NioEventLoopGroup

抛开构造方法,NioEventLoopGroup实现的就是一个newChild方法,创建具体的EventLoopNioEventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

   ......

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        SelectorProvider selectorProvider = (SelectorProvider) args[0];
        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = null;
        EventLoopTaskQueueFactory tailTaskQueueFactory = null;

        int argsLength = args.length;
        if (argsLength > 3) {
            taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        }
        if (argsLength > 4) {
            tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
        }
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }
}

大概了解了类的关系后,接下来可以看看NioEventLoopGroup的创建过程了

NioEventLoopGroup创建

跟踪NioEventLoopGroup的构造方法,最终会来到MultithreadEventExecutorGroup构造方法中,主要干3几件事情

  1. 检查或使用DefaultThreadFactory创建线程执行器(DefaultThreadFactory使用FastThreadLocalRunnable包装Runnable,使用FastThreadLocalThread执行任务)
  2. 创建指定线程数量的EventExecutor,这里用了模板方法,具体的创建交由子类完成,至于线程数量没设置的话默认是cpu线程数*2(见MultithreadEventLoopGroup#DEFAULT_EVENT_LOOP_THREADS
  3. 创建EventExecutor选择器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");

    //1.
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //2.这里放到EventLoop的时候看
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    //3.使用工厂方法创建EventExecutorChooser,DefaultEventExecutorChooserFactory根据EventExecutor数量创建不同的EventExecutorChooser
    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    //复制一份EventExecutor只读集合
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

总结

  • EventLoopGroup其实就是个线程池,具体的工作会交由EventLoop去执行,它只负责选择EventLoop
  • 与Java线程池对比,它的线程数量是稳定的,同时EventLoopGroup少了一些东西,任务队列、拒绝策略(其实并不是没有,只是不在EventLoopGroup中,而是在EventLoop中,可以回头看看NioEventLoopGroup#newChild