Netty的主从Reactor多线程模型 :BossGroup处理连接请求,WorkerGroup处理I/O操作
Netty工作一般需要三个线程组:
BossGroup :与客户端建立连接;
WorkerGroup :处理已连接的 I/O 事件(读写、编解码);
自定义业务线程池 :执行耗时的业务逻辑(如数据库操作、复杂计算);
具体工作流程:
监听端口 :BossGroup 中的线程会绑定服务端端口(如 8080),持续监听客户端的 TCP 连接请求(三次握手);
接收连接 :当客户端完成 TCP 三次握手后,连接会进入内核的 “已完成连接队列”,BossGroup 的线程调用accept()获取该连接,生成代表连接的Channel对象;
转交连接 :将Channel注册到 WorkerGroup 中某个线程(EventLoop)的Selector上,此后该连接的所有 I/O 事件都由这个 Worker 线程负责。
BossGroup BossGroup 的工作非常轻量(仅处理连接建立),因此线程数不需要太多。实际开发中通常直接使用new NioEventLoopGroup(1)——1 个线程足够应对大部分场景。
WorkerGroup WorkerGroup是 Netty 处理 I/O 事件的核心,它的职责是处理已建立连接的所有网络事件 ,包括:
读取客户端发送的数据;
对数据进行编解码(如 JSON 转对象、协议解析);
将处理后的数据写回客户端。
Q:为什么说 WorkerGroup 是 “传送带”? A:WorkerGroup 中的每个线程(EventLoop)都绑定一个Selector(多路复用器),负责监听其管理的所有Channel的 I/O 事件。由于 I/O 操作是非阻塞的(基于 NIO 的select机制),Worker 线程在等待 I/O 就绪时不会阻塞,可高效切换到其他就绪的Channel处理事件 —— 这就像传送带,始终在 “搬运” 数据,不浪费时间等待。
Q:为什么说 WorkerGroup 需要拒绝耗时操作 WorkerGroup 的线程是 “I/O 专用” 的,绝对不能在其中执行耗时业务逻辑 (如查询数据库、调用远程接口)。原因很简单:如果 Worker 线程被耗时任务占用,会导致其管理的所有Channel的 I/O 事件无法及时处理,最终引发连接超时、吞吐量下降。
WorkerGroup 的线程数通常设置为CPU 核心数 × 2 (Netty 的默认值)。
业务线程池 当 WorkerGroup 完成数据的读取和编解码后,就需要处理具体的业务逻辑了(如校验数据、操作数据库、调用 RPC 服务)。这些操作往往耗时较长(毫秒级甚至秒级),如果放在 Worker 线程中执行,会阻塞 I/O 处理 —— 因此需要一个专门的业务线程池 来承载这些 “重活”。
线程数量分配:
CPU 密集型:以 “CPU 核心数” 为基准;
I/O 密集型:以 “CPU 核心数 ×2” 为基准。
基于WebSocket的Netty服务器 示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 @Component @Slf4j public class NettyServer { @Value("${netty.websocket.port:8080}") private int port; @Value("${netty.websocket.path:/ws}") private String webSocketPath; @Value("${netty.websocket.max-frame-size:65536}") private int maxFrameSize; @Value("${netty.websocket.idle-timeout:60}") private int idleTimeout; @Autowired private ExecutorService businessExecutor; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; @Autowired private ObjectProvider<WebSocketFrameHandler> webSocketFrameHandlerProvider; @PostConstruct public void start () { bossGroup = new NioEventLoopGroup (1 ); workerGroup = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024 ) .option(ChannelOption.SO_REUSEADDR, true ) .childOption(ChannelOption.SO_KEEPALIVE, true ) .childOption(ChannelOption.TCP_NODELAY, true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (maxFrameSize)); pipeline.addLast(new WebSocketServerProtocolHandler ( webSocketPath, null , true , maxFrameSize, false , true )); pipeline.addLast(new IdleStateHandler ( idleTimeout, 0 , 0 , TimeUnit.SECONDS )); WebSocketFrameHandler frameHandler = webSocketFrameHandlerProvider.getObject(); frameHandler.setBusinessExecutor(businessExecutor); pipeline.addLast(frameHandler); } }); serverChannel = bootstrap.bind(port).sync().channel(); log.info("Netty WebSocket server started at ws://localhost:{}{}" , port, webSocketPath); } catch (InterruptedException e) { log.error("Netty server start interrupted" , e); Thread.currentThread().interrupt(); stop(); } catch (Exception e) { log.error("Failed to start Netty server" , e); stop(); } } @PreDestroy public void stop () { log.info("Shutting down Netty WebSocket server..." ); if (serverChannel != null && serverChannel.isActive()) { serverChannel.close().addListener(future -> { if (future.isSuccess()) { log.info("Server channel closed successfully" ); } else { log.error("Server channel closed failed" , future.cause()); } }); } if (workerGroup != null ) { workerGroup.shutdownGracefully(1 , 5 , TimeUnit.SECONDS) .addListener(future -> log.info("Worker group shutdown completed" )); } if (bossGroup != null ) { bossGroup.shutdownGracefully(1 , 5 , TimeUnit.SECONDS) .addListener(future -> log.info("Boss group shutdown completed" )); } log.info("Netty WebSocket server stopped" ); } public int getBoundPort () { if (serverChannel != null && serverChannel.isActive()) { return ((InetSocketAddress) serverChannel.localAddress()).getPort(); } return -1 ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Component @Scope("prototype") @Slf4j public class WebSocketFrameHandler extends SimpleChannelInboundHandler <WebSocketFrame> { private ExecutorService businessExecutor; public void setBusinessExecutor (ExecutorService businessExecutor) { this .businessExecutor = businessExecutor; } @Override protected void channelRead0 (ChannelHandlerContext ctx, WebSocketFrame msg) { msg.retain(); businessExecutor.submit(() -> { try { if (msg instanceof TextWebSocketFrame) { String text = ((TextWebSocketFrame) msg).text(); String result = processBusiness(text); ctx.executor().execute(() -> { if (ctx.channel().isActive()) { ctx.writeAndFlush(new TextWebSocketFrame (result)); } }); } } catch (Exception e) { log.error("Business process failed" , e); } finally { ReferenceCountUtil.release(msg); } }); } }
基于TCP的Netty服务器 这个是纯TCP的服务器,之前做测试的时候用的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Component public class NettyServer { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; private final int port = 8080 ; @PostConstruct private void start () throws InterruptedException { bossGroup = new NioEventLoopGroup (1 ); workerGroup = new NioEventLoopGroup (); ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline() .addLast(new StringDecoder ()) .addLast(new StringEncoder ()) .addLast(new MyServerHandler ()) ; } }) .option(ChannelOption.SO_BACKLOG, 128 ) .childOption(ChannelOption.SO_KEEPALIVE, true ); serverChannel = bootstrap.bind(port).sync().channel(); System.out.println("Netty server started on port: " + port); } @PreDestroy private void stop () { if (serverChannel != null ) { serverChannel.close(); } if (workerGroup != null ) { workerGroup.shutdownGracefully(); } if (bossGroup != null ) { bossGroup.shutdownGracefully(); } System.out.println("Netty server stopped" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class MyServerHandler extends SimpleChannelInboundHandler <String> { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) { System.out.println("Received: " + msg); ctx.writeAndFlush("Echo: " + msg); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
测试: 终端使用telnet连上TCP连接:
1 2 3 4 5 6 7 8 $ telnet 127.0.0.1 8080 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]' . my fork Echo: my fork my spoon Echo: my spoon
Netty服务端日志:
1 2 Received: my fork Received: my spoon
Netty服务端接收到Msg经过解码编码后进行回应,并记录日志。