Netty 线程模型与基础用法

前面几篇文章我们掌握了 nio 的使用,其实 Netty 就是在 java nio 的基础上做了一层封装

好比 mybatis 之于 java jdbc,简化了业务系统开发的难度

接下来我们从 Netty 的基本概念开始学习掌握

Reactor 模型

Netty 是基于 Reactor 框架实现的

什么是 Reactor 呢?

字面意思来看 Reactor 是反应堆的意思

通过一个或多个输入同时传递给服务处理器的服务请求的 事件驱动处理模式

服务端程序处理传入多路请求,并将它们同步反派给请求对应的处理线程

Reactor 模式也叫 Dispatcher 模式

Reactor 三种角色

  • Reactor 负责监听、分配事件,将 IO 事件反派给对应的 Handler
  • Acceptor 处理客户端新连接,并反派到处理器链
  • Handler 事件处理,解码、业务处理、编码

单 Reactor 单线程模型

该模型下所有请求建立、IO读写、业务处理都在一个线程中完成

如果在业务中处理中出现了耗时操作,就会导致所有请求全部处理延时

因为他们是有由一个线程同步处理的

是不是和 selector 很像?

前面文章所讲的心跳服务就是一个例子

结合前面的 EchoServer while(true) 代码块!

单 Reactor 多线程模型

为了防止业务处理导致阻塞,在多线程模型下会用一个线程池来异步处理业务,当处理完成后在回写给客户端

前面文章所讲的 Http 服务就是一个例子

主从 Reactor 多线程模型

单 Reactor 始终无法发挥现代服务器多核 CPU 的并行处理能力,

所以 Reactor 是可以有个的,并且有一主多从之分。

一个主 Reactor 仅处理连接,而多个子 Reactor 用于处理 IO 读写。然后交给线程池处理业务。

Tomcat 就是采用该模式实现

如果你知道 nginx ,上面的模式完全可以把主 Reactor 理解成 nginx 分钟均衡!

Netty 线程模型

Netty 的线程模型 就是上面 主从 Reactor 多线程模型

主 Reactor 即对应 Boss group 线程组,多个子 Reactor 对应 Worker Group 线程组。

主线程用于接收连接,并将建立好的连接注册到 Worker 组,当最后 IO 事件触发后由对应 Pipeline 进行处理。

EventLoop

它内部就是死循环。事件循环器,这里充当了Reactor的核心。每个EventLoop 都会包含一个Selector选择器,用于处理 IO事件,和一个 taskQueue 用于存储用户提交的任务。此 EventLoop 会用一个独有的线程,默认是不启动的,当有任务触发时就会启动,并一直轮询下去。

@Test
public void test() throws IOException {
    // 1.构建EventLoop
    System.out.println("当前ID:"+Thread.currentThread().getId());// 守护线程
    NioEventLoopGroup group=new NioEventLoopGroup(1);
    // 执行一个任务,在 NioEventLoopGroup 线程中执行
    group.execute(() -> System.out.println("hello event loop:"+Thread.currentThread().getId()));
    // 提交一个任务,在 NioEventLoopGroup 线程中执行
    group.submit(() -> System.out.println("submit:"+Thread.currentThread().getId()));
    // 测试用,阻塞掉,不阻塞掉 上面的 submit 可能没法执行:
    // 这是 java 基础知识点,主线程关闭后子线程会自动关闭
    System.in.read();
    // 安全关闭
    group.shutdownGracefully();
}

运行上面的代码并且打断点,得知 NioEventLoopGroup 中任务有额外的线程

注册管道

@Test
public void test2() throws IOException {
    NioEventLoopGroup group = new NioEventLoopGroup(1);
    NioDatagramChannel channel = new NioDatagramChannel();
    // 管道注册
    // 调用register方法最终会调用NIO当中的register方法进行注册
    group.register(channel);
    // bind在io线程中完成
    // 如果在管道注册之前执行bind,会报错:java.lang.IllegalStateException: channel not registered to an event loop
    ChannelFuture future = channel.bind(new InetSocketAddress(8080));
    future.addListener(future1 -> System.out.println("完成绑定"));
    // 业务处理
    channel.pipeline().addLast(new SimpleChannelInboundHandler() {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            //  ByteBuffer
            if (msg instanceof DatagramPacket) {
                DatagramPacket packet= (DatagramPacket) msg;
                System.out.println(packet.content().toString(Charset.defaultCharset()));
            }
        }
    });
    // 测试用,阻塞掉,不阻塞掉 上面的 submit 可能没法执行:
    // 这是 java 基础知识点,主线程关闭后子线程会自动关闭
    System.in.read();
}

Netty 核心组件

NioChannel 与 Channelhandler

package com.shar.netty.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class ChannelTest {

    @Test
    public void test() throws IOException {

        // 初始化操作与原生NIO类似,都是打开管道、注册选择器最后绑定端口。但有一点要说明
        // NioChannel当中所有操作都是在EventLoop中完成的,所以在绑定端口之前必须先注册。
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup();
        NioServerSocketChannel channel=new NioServerSocketChannel();
        boss.register(channel);
        channel.bind(new InetSocketAddress(8080));// 提交任务到 EventLoop

        // 原生NIO 是直接遍历选择集然后处理读写事件,
        // 在Netty中直接处理读写是不安全的也不推荐的,
        // 而是采用ChannelHandler来间接的处理读写事件。
        // 一般情况下读写是有多个步骤的。
        // Netty中提供了Pipeline来组织这些ChannelHandler。
        // Pipeline是一个链表容器,可以通过addFirst、addLast 在首尾增加Handler。
        channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                // accept
                System.out.println(msg);
                System.out.println("已建立新连接");
                handlerAccept(work, msg);
            }
        });
        System.in.read();

    }

    // 注册新管道,并初始化它
    // 获取新管道(NioSocketChannel) ,需要重亲注册至EventLoop,才能接收消息。
    // 简单起见直接注册到与期父Channel相同的EventLoop中
    // (也可以用一个专门的EventLoop组来处理子管道事件)。
    // 接下来同样要初始化子管道。重写channelRead0 方法来收取消息。
    private void handlerAccept(NioEventLoopGroup group, Object msg) {
        NioSocketChannel channel= (NioSocketChannel) msg;
        EventLoop loop = group.next();
        loop.register(channel);
        channel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                System.out.println(msg.toString(Charset.defaultCharset()));
            }
        });
    }
}

work group 有多少线程呢? 线程数 = 逻辑CPU核数*2

补充:通常来说,逻辑CPU核心数 = 物理CPU核心数*2

ServerBootstrap

上述的案例当中,即需自己初始化父 Channel,又需要初始化子管道,比较复杂。

好在 Netty 提供了一个 ServerBootStrap,

它进一步封装上述的注册、绑定、初始化等操作,已简化对 Netty API 的调用。

通过一个 Http 服务实现来 Netty ServerBootStrap 的常规用法

package com.shar.netty.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;

public class BootstrapTest {

    public void open(int port) {

        ServerBootstrap bootstrap = new ServerBootstrap();
        // 通过ServerBootstrap 可直接设定服务的线程组
        // 其中boss 用于处理NioSeverSocketChannl中的Accept事件
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // 而Work组用于处理IO读写,以及用户提交的异步任务
        EventLoopGroup work = new NioEventLoopGroup(8);
        bootstrap.group(boss, work)
                // 通过指定Channel class 来告诉ServerBootstrap 要维护一个什么样的管道。
                .channel(NioServerSocketChannel.class)
                // 然后就可以初始化子管道的Pipeline了,为其绑定对应的处理Handler即可
                // 我们目标是实现一个Http服务,对应的三个基本操作是:解码、业务处理、编码
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        // 编解码是Http协议通用处理,Netty已自带处理器,直接添加可
                        ch.pipeline().addLast("decode", new HttpRequestDecoder());
                        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
                        // 业务处理需要手动编写
                        ch.pipeline().addLast("servlet", new MyServlet());
                        // 编解码是Http协议通用处理,Netty已自带处理器,直接添加可
                        ch.pipeline().addFirst("encode", new HttpResponseEncoder());
                    }
                });
        // 绑定到一个端口上
        ChannelFuture future = bootstrap.bind(port);
        future.addListener(future1 -> System.out.println("注册成功"));

    }
    // 若要自定义业务处理
    // 通过实现 SimpleChannelInboundHandler 可直接处理读事件用来接收客户端的请求,
    // 接下来构造一个 Response 并设置状态码、响应头,以及响应消息即可完成一个简单的Http服务
    private class MyServlet extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpRequest) {
                FullHttpRequest request= (FullHttpRequest) msg;
                System.out.println("url"+request.uri());
                System.out.println(request.content().toString(Charset.defaultCharset()));

                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
                response.content().writeBytes("hello".getBytes());
                ChannelFuture future = ctx.writeAndFlush(response);
                future.addListener(ChannelFutureListener.CLOSE);
            }
           if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                System.out.println("当前请求:" + request.uri());
            }
            if (msg instanceof HttpContent) {
                ByteBuf content = ((HttpContent) msg).content();
                OutputStream out = new FileOutputStream("c://javatest/02.杭州 (Live)_李志_备份.wav", true);
                content.readBytes(out, content.readableBytes());
                out.close();
            }
            if (msg instanceof LastHttpContent) {
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
                response.content().writeBytes("上传完毕".getBytes());
                ChannelFuture future = ctx.writeAndFlush(response);
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new BootstrapTest().open(8080);
        System.in.read();
    }
}

转载请注明来源。 欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。 可以在下面评论区评论,也可以邮件至 sharlot2050@foxmail.com。

文章标题:Netty 线程模型与基础用法

字数:2.2k

本文作者:夏来风

发布时间:2020-07-19, 10:07:58

原始链接:http://www.demo1024.com/blog/netty-base/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。