nio 03 实战演练

长连接与短连接

TCP 本身并没有长短连接的区别,长短与否,完全取决于我们怎么用它。

短连接:每次通信时,创建 Socket;一次通信结束,调用 socket.close()。这就是一般意义上的短连接,短连接的好处是管理起来比较简单,存在的连接都是可用的连接,不需要额外的控制手段。

长连接:每次通信完毕后,不会关闭连接,这样就可以做到连接的复用。长连接的好处便是省去了创建连接的耗时。

短连接和长连接的优势,分别是对方的劣势

想要图简单,不追求高性能,使用短连接合适,这样我们就不需要操心连接状态的管理;

想要追求性能,使用长连接,我们就需要担心各种问题:比如端对端连接的维护,连接的保活。

长连接还常常被用来做数据的推送,我们大多数时候对通信的认知还是 request/response 模型,但 TCP 双工通信的性质决定了它还可以被用来做双向通信。在长连接之下,可以很方便的实现 push 模型。

短连接没有太多东西可以讲,所以实战环节我们将目光聚焦在长连接的一些问题上。

心跳服务设计与实现

在类似RPC(远程过程调用)场景中为了保证传输的效率,通常情况下会采用长链接,而长链接的保持即通过定时心跳实现。类似场景在消息推送服务中也是非常常见。所以心跳服务是网络编程中的基础且普遍的应用。

我们将从 “长连接心跳服务” 作为实战切入点,心跳需求是这样的:

由客户端定时发送给服务端,服务端作出响应。因为一直要发送,所以发送的消息和返回的消息的体量必须足够小,只要能标识心跳事件即可。具体消息格式设计要根据所使用的应用协议而定。

服务端代码实现

package com.shar.netty.echo;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

// 心跳服务器
public class EchoServer {

    @Test
    public void serverTest() throws IOException {
        // 打开 socket 管道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定 8080  端口
        serverSocketChannel.bind(new InetSocketAddress(8080));
        // 设定为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 打开 多路复用器
        Selector selector = Selector.open();
        // 将管道注册到多路复用器
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        try {
            dispatch(selector, serverSocketChannel);
        } catch (Throwable e) {
            e.printStackTrace();
        }finally {
            System.out.println("关闭");
        }
    }

    private void dispatch(Selector selector , ServerSocketChannel serverSocketChannel) throws IOException {
        while (true) {
            // 更新键的状态
            int count = selector.select();
            // 获取已更新的键集合
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                // 键是否有效
                if (!key.isValid()) {
                    continue;
                }
                // 管道是否处于被访问状态
                // 注意,这里和上面的 SelectionKey.OP_ACCEPT 对应
                // 也就是 ServerSocketChannel 才会进入此代码片段
                // SocketChannel 不会进入,因为我们下面注册的是 SelectionKey.OP_READ
                else if(key.isAcceptable()){
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                // 管道是否处于可读状态
                else if(key.isReadable()){
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 定义缓存空间并且读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(64);
                    channel.read(buffer);
                    // 传输结束
                    if (buffer.hasRemaining() && buffer.get(0)==4) {
                        channel.close();
                        System.out.println("关闭管道:"+channel);
                        break;
                    }
                    // 在客户端发过来的内容后面继续写入数据
                    buffer.put(String.valueOf(System.currentTimeMillis()).getBytes());
                    // 写入管道,回传给客户端
                    buffer.flip();
                    channel.write(buffer);
                }
            }
        }
    }
}

客户端代码实现

package com.shar.netty.echo;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

// 心跳客户端
public class EchoClient {

    @Test
    public void test() throws IOException, InterruptedException {

        // 创建管道
        SocketChannel channel = SocketChannel.open();
        // 设定为非阻塞模式
        channel.configureBlocking(false);
        // 打开选择器
        Selector selector = Selector.open();
        // 把socket通道注册到选择器中
        channel.register(selector, SelectionKey.OP_CONNECT);
        // 连接到指定 ip port
        boolean connect = channel.connect(new InetSocketAddress("127.0.0.1", 8080));
        while (true) {
            // 更新键的状态
            selector.select();
            // 获取已更新的键集
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                // 判断键是否有效
                if (!key.isValid()) {
                    continue;
                }
                // 管道是否处于连接就绪状态
                if (key.isConnectable()) {
                    System.out.println("是否连接:"+channel.isConnected());
                    channel.finishConnect();
                    // 监听写事件
                    key.interestOps(SelectionKey.OP_WRITE);
                }
                // 管道是否处于可写状态
                else if(key.isWritable()){
                    // 心跳写入
                    channel.write(ByteBuffer.wrap("heartbeat".getBytes()));
                    // 监听读事件
                    key.interestOps(SelectionKey.OP_READ);
                }
                // 管道是否处于可读状态
                else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(64);
                    channel.read(buffer);
                    buffer.flip();
                    System.out.println(new String(buffer.array(), 0, buffer.limit()));
                    key.interestOps(SelectionKey.OP_WRITE);
                    Thread.sleep(2000);
                }
            }
        }
    }
}

Http 服务设计与实现

前面设计的心跳服务端所有读写包括业务处理都是在一个线程中同步完成的

因为心跳业务操作没有几乎没有任何阻塞延时操作,以单个线程足够了。

如果中间的业务处理是一个非常耗时操作如:查询数据库。这将导致所有请求都被会阻塞。

所以我们应该将耗时会采用异步线程池来完成

接下来一起设计一个Http服务并实现

流程说明

  • 客户端发起请求,并根据协议封装请求头与请求参数。
  • 服务端接受连接
  • 读取请求数据包
  • 将数据包 decode 解码成 HttpRequest 对象
  • 业务处理(异步),并将结果封装成 HttpResponse
  • 基于协议编码 HttpResponse
  • 将编码后的数据写入管道

代码实现

package com.shar.netty.http;

import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class SimpleHttpServer {
    private final Selector selector;
    int port;
    volatile boolean run = false;
    HttpServlet servlet;
    // 定义线程池大小
    ExecutorService executor = Executors.newFixedThreadPool(5);
    public SimpleHttpServer(int port, HttpServlet servlet) throws IOException {
        this.port = port;
        this.servlet = servlet;
        // 打开通道
        ServerSocketChannel listenerChannel = ServerSocketChannel.open();
        // 打开多路复用选择器
        selector = Selector.open();
        // 绑定端口
        listenerChannel.bind(new InetSocketAddress(port));
        // 设置非阻塞
        listenerChannel.configureBlocking(false);
        // 注册监听 accept 事件
        listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public Thread start() {
        run = true;
        Thread thread = new Thread(() -> {
            try {
                while (run) {
                    dispatch();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, "selector-io");
        thread.start();
        return thread;
    }

    public void stop(int delay) {
        run = false;
    }

    private void dispatch() throws IOException {
        //
        int select = selector.select(2000);
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        SelectionKey key;
        while (iterator.hasNext()) {
            key = iterator.next();
            iterator.remove();
            if (key.isAcceptable()) {
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = channel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            } else if (key.isReadable()) {
                final SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                final ByteArrayOutputStream out = new ByteArrayOutputStream();
                // 将管道中到数据分批次读入缓冲空间
                while (channel.read(buffer) >0) {
                    buffer.flip();
                    // 数据写入 输出流
                    out.write(buffer.array(), 0, buffer.limit());
                    buffer.clear();
                }
                if (out.size() <= 0) {
                    channel.close();
                    continue;
                }
                System.out.println("当前通道:"+channel);
                // 数据缓冲完毕,开启业务线程
                executor.submit(() -> {
                    // 模拟 request\response 机制
                    try {
                        Request request = decode(out.toByteArray());
                        Response response = new Response();
                        if (request.method.equalsIgnoreCase("GET")) {
                            servlet.doGet(request, response);
                        } else {
                            servlet.doPost(request, response);
                        }
                        renderResponse(request, response);
                        channel.write(ByteBuffer.wrap(encode(response)));
                    } catch (Throwable e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }


    // 解码Http服务
    private Request decode(byte[] bytes) throws IOException {
        Request request = new Request();
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
        String firstLine = reader.readLine();
        String[] split = firstLine.trim().split(" ");
        request.method = split[0];
        request.url = split[1];
        request.version = split[2];

        //读取请求头
        Map<String, String> heads = new HashMap<>();
        while (true) {
            String line = reader.readLine();
            System.out.println(line);
            if (line.trim().equals("")) {
                break;
            }
            String[] split1 = line.split(":");
            heads.put(split1[0], split1[1]);
        }
        request.heads = heads;
        request.params = getUrlParams(request.url);
        //读取请求体
        request.body = reader.readLine();
        return request;

    }

    private void renderResponse(Request request, Response response){
        response.code = 200;
        response.headers = new HashMap<>();
        if (request.params.containsKey("short")) {
            response.headers.put("Connection", "close");
        } else if (request.params.containsKey("long")) {
            response.headers.put("Connection", "keep-alive");
            response.headers.put("Keep-Alive", "timeout=30,max=300");
        }
        response.headers.put("Content-Type", "text/html;charset=UTF-8");
    }

    //编码Http 服务
    private byte[] encode(Response response) {

        StringBuilder builder = new StringBuilder(512);
        builder.append("HTTP/1.1").append(" ").append(response.code).append(" ").append(Code.msg(response.code))
                .append("\r\n");

        if (response.body != null && response.body.length() != 0) {
            builder
                    .append("Content-Length: ").append(response.body.getBytes().length)
                    .append("\r\n");
        }
        if (response.headers != null && !response.headers.isEmpty()) {
            String headStr = response.headers.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue())
                    .collect(Collectors.joining("\r\n"));
            builder.append(headStr+"\r\n");
        }

        builder.append("\r\n").append(response.body);
        System.out.println("--------------------");
        System.out.println(builder.toString());
        System.out.println("--------------------");
        return builder.toString().getBytes();
    }

    public static class Code {

        public static final int HTTP_OK = 200;

        static String msg (int code) {

            switch (code) {
                case HTTP_OK:
                    return "OK";
                default:
                    return null;
            }
        }
    }

    public abstract static class HttpServlet {

        abstract void doGet(Request request, Response response);

        abstract void doPost(Request request, Response response);
    }

    public static class Request {
        Map<String, String> heads;
        String url;
        String method;
        String version;
        String body;    //请求内容
        Map<String, String> params;
    }

    public static class Response {
        Map<String, String> headers;
        int code;
        String body; //返回结果
    }


    private static Map getUrlParams(String url) {
        Map<String, String> map = new HashMap<>();
        url = url.replace("?", ";");
        if (!url.contains(";")) {
            return map;
        }
        if (url.split(";").length > 0) {
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr) {
                if (s.contains("=")) {
                    String key = s.split("=")[0];
                    String value = s.split("=")[1];
                    map.put(key, value);
                }else {
                    map.put(s,null);
                }
            }
            return map;

        } else {
            return map;
        }
    }
}

Tomcat源码IO模型线程分析

前面设计的 Http 服务通过异步处理业务,提高了 IO 线程的效率。但 IO 线程始终只有一个无法发挥现代服务器多核 CPU 并行处理能力。像 Tomcat、Jobss 等生产级的 Http 服务容器 ,会构造多个 IO 线程同时运行,每个 IO 线程中都独占一个 Selector(选择器)。另外接收连接与 IO 读写也会分开,用专门线程处理 OP_ACCEPT 事件,工作内容包括:接收连接、验证连接安全性、检测连接是否超出限定值、将超出的数量添加至等待对列。

连接线程

调用 accept() 获得一个新管道,然后注册至 IO 线程当中的选择器。请注意这里不能直接拿 IO 线程中的 Selector 进行注册,因为注册(register)和刷新(select)选择器是一个同步操作。如果在非 IO 线程中调用 register,会导致阻塞发生。所以在 Tomcat 当中 ,会将新管道的注册操作封装成 PollEvent(注册事件)添加至 IO 线程的任务对列。由 IO 线程在刷新之后执行。

IO线程

处理读取事件,并提交给业务线程池处理业务。Tomcat 这里的面的细节会很多,整个思路跟上面的Http服务是类似的,就不在这里赘述。有机会在跟大家分析Tomcat源码时讲解。

总结

  • 基于心跳服务我们学到 选持器的监听事件是可以进轮转的。
  • 基于TCP的事件必须进行消费,否则会一直触发该事件,造成死循环
  • 耗时操作不能在IO线程中完成,因为所有的IO操作都是同步的。
  • 为了更高效利用服务器多核CPU特性, IO线程可以有多个,并且都有独立的选择器
  • 所有的IO操作都必须在IO线程完成(如注册、刷新),否则会导致资源锁争抢和阻塞。

转载请注明来源。 欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。 可以在下面评论区评论,也可以邮件至 sharlot2050@foxmail.com。

文章标题:nio 03 实战演练

字数:2.9k

本文作者:夏来风

发布时间:2020-07-18, 10:00:42

原始链接:http://www.demo1024.com/blog/nio-demo/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。