基于 Netty 实现 Redis 客户端

  1. 需求分析
  2. RESP
  3. 编码
  4. 附完整代码

基于前面的学习总结,我已经掌握了 Netty 编程基础

现学以致用对 Netty 做实战开发

实现目标:开发一个类似 JRedis 的高性能的 Redis 客户端

需求分析

开发者觉得 Redis 快,有部分原因是它的报文协议设计

Redis 的网络传输有两好

  • 一是采用文本协议实现,方便了开发者阅读
  • 二是有着跟二进制协议一样的小体积,保证传输效率

我们先通过 这篇文章 搭建 Redis 服务器

搭建完成后,连接 Redis 服务器 telnet 192.168.2.88 6379

基于我们对 Redis 的学习,敲几个简单的命令进去分析分析

dbsize
:350
// : 号,代表返回整数
set shar 10000            
+OK
// + 号,代表执行成功
get shar 
$5
10000
// $ 号,代表后面是批量字符串,长度为5
keys shar*
*2
$7
sharlot
$4
shar
// * 号,代表返回数组
huawei
-ERR unknown command `huawei`, with args beginning with: 
// - 号,代表错误

RESP

上面我们做了简单的实验并对实验结果做了分析

其实这有个专业的说法,就是 RESP

什么是 RESP?

REdis Serialization Protocol,是基于 TCP的应用层协议,然后根据解析规则解析相应信息,Redis 的客户端和服务端之间采取 RESP 协议,作者主要考虑了以下几个点:

  • 容易实现
  • 解析快
  • 人类可读

RESP 可以序列化不同的数据类型,如整数,字符串,数组。还有一种特定的错误类型。请求从客户端发送到 Redis 服务器,作为表示要执行的命令的参数的字符串数组。Redis 使用特定于命令的数据类型进行回复。

RESP 是二进制安全的,不需要处理从一个进程传输到另一个进程的批量数据,因为它使用前缀长度来传输批量数据。

注意:RESP 虽然是为 Redis 设计的,但是同样也可以用于其他 C/S 的软件。Redis Cluster使用不同的二进制协议(gossip),以便在节点之间交换消息。

关于协议的具体描述,见官方文档

RESP 在 Redis 中用作请求-响应协议的方式如下:

客户端将命令作为 Bulk Strings 的 RESP 数组发送到 Redis 服务器。

服务器根据命令实现回复一种 RESP 类型。

在 RESP 中,某些数据的类型取决于第一个字节:

符号 含义
+ 代表简单字符串 Simple Strings
- 代表错误类型
: 代表整数
$ 代表 Bulk Strings
* 代表数组

此外,RESP 能够使用稍后指定的 Bulk Strings 或 Array 的特殊变体来表示Null值。
在 RESP 中,协议的不同部分始终以 “\r\n”(CRLF)结束。

编码

查看 Netty 源码,handler 包其实已经帮我们封装了一些常用的技术

这对于我们实现 Redis 客户端来说是非常有帮助的

核心代码

ch.pipeline().addLast("decoder", new RedisDecoder());
// 对应处理 RESP 中的 $ Bulk Strings
ch.pipeline().addLast("bulk-aggregator",new RedisBulkStringAggregator());
// 对应处理 RESP 中的 * 数组
ch.pipeline().addLast("array-aggregator",new RedisArrayAggregator());
ch.pipeline().addLast("encoder", new RedisEncoder());
ch.pipeline().addLast("dispatch", new MyRedisHandler());

这里要额外补充一个知识点:

addLast 方法是在 pipeline 末端追加 ChannelHandler

那么我们上面 encoder 在 dispatch 之前,会不会出问题呢?

不会的。encoder 属于 OutboundHandler,只有在我们发送命令给 Redis 时才会生效

附完整代码

package com.shar.netty.netty.redis;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.redis.*;
import io.netty.util.CharsetUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

public class RedisClient {

    private Channel channel;

    public void openConnection(String host, int port) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(1))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast("decoder", new RedisDecoder());
                        // 对应处理 RESP 中的 $ Bulk Strings
                        ch.pipeline().addLast("bulk-aggregator",new RedisBulkStringAggregator());
                        // 对应处理 RESP 中的 * 数组
                        ch.pipeline().addLast("array-aggregator",new RedisArrayAggregator());
                        ch.pipeline().addLast("encoder", new RedisEncoder());
                        ch.pipeline().addLast("dispatch", new MyRedisHandler());
                    }
                });
        channel = bootstrap.connect(host, port).sync().channel();
        System.out.println("连接成功");
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        RedisClient client = new RedisClient();
        client.openConnection("192.168.2.197",6379);
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String s = in.readLine();
            System.out.print(">");
            client.channel.writeAndFlush(s);
        }
    }

    private class MyRedisHandler extends ChannelDuplexHandler {

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (!(msg instanceof String)) {
                ctx.write(msg);
                return;
            }
            String cmd = (String) msg;
            String[] commands = ((String) msg).split("\\s+");
            List<RedisMessage> children = new ArrayList<>(commands.length);
            for (String cmdString : commands) {
                children.add(new FullBulkStringRedisMessage(Unpooled.wrappedBuffer(cmdString.getBytes())));
            }
            RedisMessage request = new ArrayRedisMessage(children);
            ctx.write(request, promise);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            printAggregatedRedisResponse((RedisMessage) msg);
        }

        private  void printAggregatedRedisResponse(RedisMessage msg) {
            if (msg instanceof SimpleStringRedisMessage) {
                System.out.println(((SimpleStringRedisMessage) msg).content());
            } else if (msg instanceof ErrorRedisMessage) {
                System.out.println(((ErrorRedisMessage) msg).content());
            } else if (msg instanceof IntegerRedisMessage) {
                System.out.println(((IntegerRedisMessage) msg).value());
            } else if (msg instanceof FullBulkStringRedisMessage) {
                System.out.println(getString((FullBulkStringRedisMessage) msg));
            } else if (msg instanceof ArrayRedisMessage) {
                for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
                    printAggregatedRedisResponse(child);
                }
            } else {
                throw new CodecException("unknown message type: " + msg);
            }
        }

        private  String getString(FullBulkStringRedisMessage msg) {
            if (msg.isNull()) {
                return "(null)";
            }
            return msg.content().toString(CharsetUtil.UTF_8);
        }
    }
}

转载请注明来源。 欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。 可以在下面评论区评论,也可以邮件至 sharlot2050@foxmail.com。

文章标题:基于 Netty 实现 Redis 客户端

字数:1.3k

本文作者:夏来风

发布时间:2020-08-01, 22:58:34

原始链接:http://www.demo1024.com/blog/netty-RedisClient/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。