Netty的主从Reactor多线程模型 :BossGroup 处理连接请求,WorkerGroup 处理I/O操作。
Netty 一般需要两个必须的线程组 + 一个可选的自定义业务线程池来完成工作:
BossGroup :与客户端建立连接;
WorkerGroup :处理已连接的 I/O 事件(读写、编解码);
自定义业务线程池 :执行耗时的业务逻辑(如数据库操作、复杂计算);
大致的工作流程:
监听端口 :BossGroup 中的线程会绑定服务端端口(如 8080),持续监听客户端的 TCP 连接请求(三次握手);
接收连接 :当客户端完成 TCP 三次握手后,连接会进入内核的 “已完成连接队列”,BossGroup 的线程调用 accept() 获取该连接,生成代表连接的 Channel 对象;
转交连接 :将 Channel 注册到 WorkerGroup 中某个 Worker 线程(EventLoop)的 Selector 上,此后该连接的所有 I/O 事件都由这个 Worker 线程负责。
转交业务 :如有耗时业务,Worker 线程则把该业务给业务线程池,Worker 线程继续处理 I/O 事件。
BossGroup BossGroup 的工作非常轻量(仅处理连接建立),因此线程数不需要太多。实际开发中通常直接使用 new NioEventLoopGroup(1) ——1 个线程足够应对大部分场景。
WorkerGroup WorkerGroup 是 Netty 处理 I/O 事件的核心,它的职责是处理已建立连接的所有网络事件 ,包括:
读取客户端发送的数据;
对数据进行编解码(如 JSON 转对象、协议解析);
将处理后的数据写回客户端。
WorkerGroup 的线程数通常设置为CPU 核心数 × 2 (Netty 的默认值)。
整个 WorkerGroup 的结构大概如下:
1 2 3 4 5 6 7 8 9 WorkerGroup(工作线程组:管理所有 Worker,负责连接分配) ├─ Worker 1(EventLoop 线程:处理I/O事件,驱动 Selector) │ └─ Selector 1(多路复用器:监听I/O事件,仅负责"检测"不处理) │ ├─ Channel 1(连接通道:对应客户端A的TCP连接,传输数据) │ ├─ Channel 2(连接通道:对应客户端B的TCP连接,传输数据) │ └─ ... ├─ Worker 2(EventLoop线程:处理I/O事件,驱动Selector) │ └─ ... └─ ...
WorkerGroup 负责把 Channel 交给不太忙的的 Worker 线程,所以每个 Worker 线程可能负责大量的 Channel。
每个 Worker 线程本质是 EventLoop 线程,循环调用 Selector.select() 监听多个 Channel 的 I/O 事件, Worker 线程从 Selector.select() 拿到已经准备好了的 I/O 事件和对应的 Channel 集合,然后 Worker 线程逐个处理这些事件。
处理完这些事件后,再次进入循环调用 Selector.select() 重复上一步。
EventLoop 是指单个线程以无限循环的方式不断按一定的流程处理任务。
当 Worker 线程处理就绪事件时,如果事件处理非常耗时(比如超过 1 秒,甚至几分钟),会引发一系列连锁性的严重性能问题,本质原因是单线程循环被阻塞,导致后续所有依赖该线程的操作无法执行 。
举个例子: Worker 线程解析到一个 I/O 事件,这是一个耗时1分钟的业务请求。那么同一批还未处理的事件/需要由下轮循环才能获取的就绪事件都需要等很久才被处理,可能会导致对应的客户端一直没有收到响应,从而超时断开连接、发送冗余请求等。
这一系列的连锁反应会使整个服务器的吞吐量暴跌,出现假死(进程没崩溃,但大部分客户端的请求无响应)。
Q: 耗时的工作总得处理啊,怎么办? A: 那就再创建一个业务线程池,把耗时的工作由 WorkerGroup 转交给他就行了。
业务线程池 当 WorkerGroup 完成数据的读取和编解码后,就需要处理具体的业务逻辑了(如校验数据、操作数据库、调用 RPC 服务)。 这些操作往往耗时较长(毫秒级甚至秒级),如果放在 Worker 线程中执行,会阻塞 I/O 处理。因此需要一个专门的业务线程池来承载这些重活。
线程数量分配:
CPU 密集型:以 “CPU 核心数” 为基准;
I/O 密集型:以 “CPU 核心数 ×2” 为基准。
基于WebSocket的Netty服务器 示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 @Component @Slf4j public class NettyServer { @Value("${netty.websocket.port:8080}") private int port; @Value("${netty.websocket.path:/ws}") private String webSocketPath; @Value("${netty.websocket.max-frame-size:65536}") private int maxFrameSize; @Value("${netty.websocket.idle-timeout:60}") private int idleTimeout; @Autowired private ExecutorService businessExecutor; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; @Autowired private ObjectProvider<WebSocketFrameHandler> webSocketFrameHandlerProvider; @PostConstruct public void start () { bossGroup = new NioEventLoopGroup (1 ); workerGroup = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024 ) .option(ChannelOption.SO_REUSEADDR, true ) .childOption(ChannelOption.SO_KEEPALIVE, true ) .childOption(ChannelOption.TCP_NODELAY, true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (maxFrameSize)); pipeline.addLast(new WebSocketServerProtocolHandler ( webSocketPath, null , true , maxFrameSize, false , true )); pipeline.addLast(new IdleStateHandler ( idleTimeout, 0 , 0 , TimeUnit.SECONDS )); WebSocketFrameHandler frameHandler = webSocketFrameHandlerProvider.getObject(); frameHandler.setBusinessExecutor(businessExecutor); pipeline.addLast(frameHandler); } }); serverChannel = bootstrap.bind(port).sync().channel(); log.info("Netty WebSocket server started at ws://localhost:{}{}" , port, webSocketPath); } catch (InterruptedException e) { log.error("Netty server start interrupted" , e); Thread.currentThread().interrupt(); stop(); } catch (Exception e) { log.error("Failed to start Netty server" , e); stop(); } } @PreDestroy public void stop () { log.info("Shutting down Netty WebSocket server..." ); if (serverChannel != null && serverChannel.isActive()) { serverChannel.close().addListener(future -> { if (future.isSuccess()) { log.info("Server channel closed successfully" ); } else { log.error("Server channel closed failed" , future.cause()); } }); } if (workerGroup != null ) { workerGroup.shutdownGracefully(1 , 5 , TimeUnit.SECONDS) .addListener(future -> log.info("Worker group shutdown completed" )); } if (bossGroup != null ) { bossGroup.shutdownGracefully(1 , 5 , TimeUnit.SECONDS) .addListener(future -> log.info("Boss group shutdown completed" )); } log.info("Netty WebSocket server stopped" ); } public int getBoundPort () { if (serverChannel != null && serverChannel.isActive()) { return ((InetSocketAddress) serverChannel.localAddress()).getPort(); } return -1 ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Component @Scope("prototype") @Slf4j public class WebSocketFrameHandler extends SimpleChannelInboundHandler <WebSocketFrame> { private ExecutorService businessExecutor; public void setBusinessExecutor (ExecutorService businessExecutor) { this .businessExecutor = businessExecutor; } @Override protected void channelRead0 (ChannelHandlerContext ctx, WebSocketFrame msg) { msg.retain(); businessExecutor.submit(() -> { try { if (msg instanceof TextWebSocketFrame) { String text = ((TextWebSocketFrame) msg).text(); String result = processBusiness(text); ctx.executor().execute(() -> { if (ctx.channel().isActive()) { ctx.writeAndFlush(new TextWebSocketFrame (result)); } }); } } catch (Exception e) { log.error("Business process failed" , e); } finally { ReferenceCountUtil.release(msg); } }); } }
基于TCP的Netty服务器 这个是纯TCP的服务器,之前做测试的时候用的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Component public class NettyServer { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; private final int port = 8080 ; @PostConstruct private void start () throws InterruptedException { bossGroup = new NioEventLoopGroup (1 ); workerGroup = new NioEventLoopGroup (); ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline() .addLast(new StringDecoder ()) .addLast(new StringEncoder ()) .addLast(new MyServerHandler ()) ; } }) .option(ChannelOption.SO_BACKLOG, 128 ) .childOption(ChannelOption.SO_KEEPALIVE, true ); serverChannel = bootstrap.bind(port).sync().channel(); System.out.println("Netty server started on port: " + port); } @PreDestroy private void stop () { if (serverChannel != null ) { serverChannel.close(); } if (workerGroup != null ) { workerGroup.shutdownGracefully(); } if (bossGroup != null ) { bossGroup.shutdownGracefully(); } System.out.println("Netty server stopped" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class MyServerHandler extends SimpleChannelInboundHandler <String> { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) { System.out.println("Received: " + msg); ctx.writeAndFlush("Echo: " + msg); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
测试: 终端使用telnet连上TCP连接:
1 2 3 4 5 6 7 8 $ telnet 127.0.0.1 8080 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]' . my fork Echo: my fork my spoon Echo: my spoon
Netty服务端日志:
1 2 Received: my fork Received: my spoon
Netty服务端接收到Msg经过解码编码后进行回应,并记录日志。