自学rpc之Netty


前言

序列化

自定义序列化接口

就是一个接口

public interface Serializer {
    
    byte[] serialize(Object obj);
    /*
    	将给定的对象 obj 进行序列化,返回一个字节数组表示序列化后的数据。
    */

    <T> T deserialize(byte[] bytes, Class<T> clazz);
    /*
    	将给定的字节数组 bytes 进行反序列化,返回一个根据指定类 clazz 进行类型转换后的对象。这里使用了泛型 <T> 来表示反序列化后的具体类型。
    */
}

实现序列化接口

用kryo实现序列化,这个guide讲的还行,看他的就可以

public class KryoSerializer implements Serializer {
    /**
     * 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
     * 所以,使用 ThreadLocal 存放 Kryo 对象
     */
    private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.register(RpcResponse.class);
        kryo.register(RpcRequest.class);
        kryo.setReferences(true);//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
        kryo.setRegistrationRequired(false);//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             Output output = new Output(byteArrayOutputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            // Object->byte:将对象序列化为byte数组
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        } catch (Exception e) {
            throw new SerializeException("序列化失败");
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
             Input input = new Input(byteArrayInputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            // byte->Object:从byte数组中反序列化出对对象
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return clazz.cast(o);
        } catch (Exception e) {
            throw new SerializeException("反序列化失败");
        }
    }

}

编码器解码器

编码和解码,通常与序列化结合使用,我们来看看gpt的解释

序列化(Serialization)是将对象转换为字节流的过程。在 Java 中,对象通常是由一组字段(属性)组成的,而字节流是一种表示数据的二进制形式。序列化的目的是将对象转换为字节流的形式,以便在网络传输、持久化存储或跨平台通信中使用。

编码(Encoding)和解码(Decoding)是在网络通信中使用的过程。它们涉及将数据从一种形式转换为另一种形式,以便在网络中传输和处理。

编码是将数据从一种格式转换为另一种格式的过程,通常将数据转换为字节流的形式,以便在网络中传输。在网络通信中,编码的目的是将数据转换为网络传输所需的格式,例如将对象序列化为字节流、将文本编码为字节流等。

解码是将编码后的数据重新转换回原始格式的过程。在网络通信中,接收到的数据通常是经过编码的字节流。解码的目的是将字节流解析为原始的数据格式,例如将字节流反序列化为对象、将字节流解码为文本等。

在网络通信中,序列化和编码通常是结合使用的。序列化将对象转换为字节流,编码将字节流转换为网络传输所需的格式(如按照协议规定的格式进行编码)。在接收端,解码将网络传输的数据解析为原始的格式(如解码成对象),以便进行进一步的处理和使用。

我们用netty,netty提供了ByteBuf这种容器,更为方便地处理数据

所以我们的编码就是先序列化,然后再写进bytebuf对象中

解码就是从bytebuf对象中读,然后转成我们需要的对象

编码器

先序列化,然后再写进bytebuf对象中

@AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {//你要继承netty的这个类
    private final Serializer serializer;//序列化器
    private final Class<?> genericClass;//要编码的对象类型

    /**
     * 将对象转换为字节码然后写入到 ByteBuf 对象中
     */
    @Override//重写encoder方法
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) {
        if (genericClass.isInstance(o)) {
            // 1. 将对象转换为byte
            byte[] body = serializer.serialize(o);//先序列化
            // 2. 读取消息的长度
            int dataLength = body.length;
            // 3.写入消息对应的字节数组长度,writerIndex 加 4
            byteBuf.writeInt(dataLength);
            //4.将字节数组写入 ByteBuf 对象中
            byteBuf.writeBytes(body);//再写到bytebuf中
        }
    }
}

解码器

从bytebuf对象中读,然后转成我们需要的对象

@AllArgsConstructor
@Slf4j
public class NettyKryoDecoder extends ByteToMessageDecoder {

    private final Serializer serializer;
    private final Class<?> genericClass;

    /**
     * Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
     */
    private static final int BODY_LENGTH = 4;

    /**
     * 解码 ByteBuf 对象
     *
     * @param ctx 解码器关联的 ChannelHandlerContext 对象
     * @param in  "入站"数据,也就是 ByteBuf 对象
     * @param out 解码之后的数据对象需要添加到 out 对象里面
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

        //1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4,
        if (in.readableBytes() >= BODY_LENGTH) {
            //2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用
            in.markReaderIndex();
            //3.读取消息的长度
            //注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法
            int dataLength = in.readInt();
            //4.遇到不合理的情况直接 return
            if (dataLength < 0 || in.readableBytes() < 0) {
                log.error("data length or byteBuf readableBytes is not valid");
                return;
            }
            //5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return;
            }
            // 6.走到这里说明没什么问题了,可以序列化了
            byte[] body = new byte[dataLength];
            in.readBytes(body);//从对象中读
            // 将bytes数组转换为我们需要的对象
            Object obj = serializer.deserialize(body, genericClass);//反序列化
            out.add(obj);
            log.info("successful decode ByteBuf to Object");
        }
    }
}

我们注意到参数ChannelHandlerContext ctx,那么问题来了,这是什么玩意

简单来说,这就是

ChannelHandlerContext 是 Netty 中的一个关键对象,它表示了 ChannelHandler 和 ChannelPipeline 之间的上下文环境。每当有事件在 ChannelPipeline 中传播时,ChannelHandlerContext 负责管理和传递事件。

ChannelHandlerContext 提供了许多操作和方法,可以与 ChannelPipeline 进行交互,包括数据的读写、事件的触发、ChannelHandler 的添加和移除等。它是 ChannelHandler 在处理事件时的主要操作接口。

其他的,移步博客:ChannelHandlerContext,但是我还没写

传输实体类

刚刚我们已经了解了序列化,以及编码解码,现在终于可以开始了解如何用netty传输对象了

先定义请求和响应的类

请求

@AllArgsConstructor
@Getter
@NoArgsConstructor
@Builder
@ToString
public class RpcRequest {
    private String interfaceName;//要调用的接口名称
    private String methodName;//要调用的方法名称
}

响应

@AllArgsConstructor
@Getter
@NoArgsConstructor
@Builder
@ToString
public class RpcResponse {
   private String message;//响应内容
}

客户端

客户端中主要有一个用于向服务端发送消息的 sendMessage()方法,通过这个方法你可以将消息也就是RpcRequest 对象发送到服务端,并且你可以同步获取到服务端返回的结果也就是RpcResponse 对象。

先总的来看一下

public class NettyClient {
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    private final String host;
    private final int port;
    private static final Bootstrap b;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    // 初始化相关资源比如 EventLoopGroup, Bootstrap
    static {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        b = new Bootstrap();
        KryoSerializer kryoSerializer = new KryoSerializer();
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                // 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
                //  如果 15 秒之内没有发送数据给服务端的话,就发送一次心跳请求
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        /*
                         自定义序列化编解码器
                         */
                        // RpcResponse -> ByteBuf
                        ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
                        // ByteBuf -> RpcRequest
                        ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
    }

    /**
     * 发送消息到服务端
     *
     * @param rpcRequest 消息体
     * @return 服务端返回的数据
     */
    public RpcResponse sendMessage(RpcRequest rpcRequest) {
        try {
            ChannelFuture f = b.connect(host, port).sync();
            logger.info("client connect  {}", host + ":" + port);
            Channel futureChannel = f.channel();
            logger.info("send message");
            if (futureChannel != null) {
                futureChannel.writeAndFlush(rpcRequest).addListener(future -> {
                    if (future.isSuccess()) {
                        logger.info("client send message: [{}]", rpcRequest.toString());
                    } else {
                        logger.error("Send failed:", future.cause());
                    }
                });
               //阻塞等待 ,直到Channel关闭
                futureChannel.closeFuture().sync();
               // 将服务端返回的数据也就是RpcResponse对象取出
                AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
                return futureChannel.attr(key).get();
            }
        } catch (InterruptedException e) {
            logger.error("occur exception when connect server:", e);
        }
        return null;
    }
}

细说初始化

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);//日志类
private final String host;//主机名
private final int port;//端口
private static final Bootstrap b;
//用于启动和配置客户端的网络通信。

Bootstrap是Netty中的类,主要的两个作用:

初始化客户端的相关资源:通过 Bootstrap 可以创建和管理客户端的资源,例如 EventLoopGroup、Channel、ChannelPipeline 等。它提供了一系列方法来设置和配置这些资源,以满足特定的需求。

设置客户端的连接参数和事件处理器:Bootstrap 允许我们设置客户端的连接参数,例如远程服务器的地址和端口、连接超时时间等。同时,通过 handler() 方法可以设置客户端的事件处理器,用于处理各种网络事件,例如连接建立、数据读写、异常处理等。

public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
//就是构造函数
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();//处理客户端的io操作

EventLoopGroup 是用于处理所有的事件和 I/O 操作的线程池。它负责管理和分配网络事件给相应的 EventLoop,并处理事件的执行、I/O 的读写等操作。

b = new Bootstrap();
KryoSerializer kryoSerializer = new KryoSerializer();

这两句就是简单的初始化

b.group(eventLoopGroup)//设置客户端的事件循环组,用于管理客户端的 I/O 操作。
                .channel(NioSocketChannel.class)//设置channel
                .handler(new LoggingHandler(LogLevel.INFO))//设置日志handler
                // 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
                //  如果 15 秒之内没有发送数据给服务端的话,就发送一次心跳请求
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        /*
                         自定义序列化编解码器
                         */
                        // RpcResponse -> ByteBuf
                        ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
                        // ByteBuf -> RpcRequest
                        ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });

详细说说这个:

handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        /*
                         自定义序列化编解码器
                         */
                        // RpcResponse -> ByteBuf
                        ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
                        // ByteBuf -> RpcRequest
                        ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
  • SocketChannel 是 Netty 中的一个特定类型的通道,用于在客户端与服务器之间进行网络通信。

  • ChannelPipeline 是每个 Channel 对象所关联的处理器链。通过 pipeline,可以按照特定的顺序添加、移除、替换处理器,以构建一个处理器链。数据在进入和离开通道时,会依次经过处理器链中的每个处理器,进行相应的处理和转换操作。

    体现到客户端,就是先解码服务端的响应,再编码自己的请求,最后执行这个NettyClientHandler()的

NettyClientHandler()后面再说

sendmessage

public RpcResponse sendMessage(RpcRequest rpcRequest) {
        try {
            ChannelFuture f = b.connect(host, port).sync();//连接服务器,ChannelFuture 表示一个异步的 I/O 操作的结果
            logger.info("client connect  {}", host + ":" + port);//记录
            Channel futureChannel = f.channel();//获取channel
            logger.info("send message");
            if (futureChannel != null) {
                futureChannel.writeAndFlush(rpcRequest).addListener(future -> {
                    //讲请求写到通道里,并且设置监听器,实现异步
                    if (future.isSuccess()) {
                        logger.info("client send message: [{}]", rpcRequest.toString());
                    } else {
                        logger.error("Send failed:", future.cause());
                    }
                });
               //阻塞等待 ,直到Channel关闭
                futureChannel.closeFuture().sync();
               // 将服务端返回的数据也就是RpcResponse对象取出
                //先讲字符串"rpcResponse"转换为AttributeKey类型
                AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
                //然后返回attribute中,key对应的值,也就是RpcResponse
                return futureChannel.attr(key).get();
            }
        } catch (InterruptedException e) {
            logger.error("occur exception when connect server:", e);
        }
        return null;
    }
}

AttributeMap

public interface AttributeMap {

    <T> Attribute<T> attr(AttributeKey<T> key);

    <T> boolean hasAttr(AttributeKey<T> key);
}

类似于map,你可以通过把AttributeKey传到attr里面或者Attribute

Channel 实现了 AttributeMap 接口,这样也就表明它存在了AttributeMap 相关的属性。 每个 Channel上的AttributeMap属于共享数据。AttributeMap 的结构,和Map很像,我们可以把 key 看作是AttributeKey,value 看作是Attribute,我们可以根据 AttributeKey找到对应的Attribute。

NettyClientHandler()

自定义的Handler,用于将服务端的返回结果保存到 AttributeMap 上,便于后面取出

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            RpcResponse rpcResponse = (RpcResponse) msg;//转成RpcResponse
            logger.info("client receive msg: [{}]", rpcResponse.toString());
            // 声明一个 AttributeKey 对象
            AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");//先把string转成key,再保存
            // 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源
            // AttributeMap的key是AttributeKey,value是Attribute
            ctx.channel().attr(key).set(rpcResponse);
            ctx.channel().close();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("client caught exception", cause);
        ctx.close();
    }
}

服务端

server

先来看代码

public class NettyServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    private final int port;

    private NettyServer(int port) {
        this.port = port;
    }

    private void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        KryoSerializer kryoSerializer = new KryoSerializer();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // 是否开启 TCP 底层心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));//先解码
                            ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));//再编码
                            ch.pipeline().addLast(new NettyServerHandler());//自定义的用来处理信息
                        }
                    });

            // 绑定端口,同步等待绑定成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("occur exception when start server:", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

没啥好说的,跟客户端差不多只不过不需要sendmessage

自定义handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private static final AtomicInteger atomicInteger = new AtomicInteger(1);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            RpcRequest rpcRequest = (RpcRequest) msg;
            logger.info("server receive msg: [{}] ,times:[{}]", rpcRequest, atomicInteger.getAndIncrement());
            RpcResponse messageFromServer = RpcResponse.builder().message("message from server").build();
            ChannelFuture f = ctx.writeAndFlush(messageFromServer);//写进channel里,其实是写进bytebuf中了
            f.addListener(ChannelFutureListener.CLOSE);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("server catch exception",cause);
        ctx.close();
    }
}

总的来说,流程(GPT给的

下面是一个描述客户端给服务端发送请求并接收响应的流程,包括数据结构、序列化和使用 Attribute 存储上下文数据的过程:

  1. 客户端准备请求数据:
  • 客户端根据业务逻辑创建请求对象(例如 RpcRequest)。
  • 请求对象包含了需要发送给服务端的数据,例如请求的方法名、参数等。
  1. 序列化请求数据:
  • 客户端将请求对象进行序列化,将其转换为字节序列以便在网络中传输。
  • 常见的序列化方式包括使用 JSON、Protobuf、MessagePack 等。
  1. 将序列化后的数据发送给服务端:
  • 客户端通过网络将序列化后的数据发送给服务端。
  • 客户端使用的网络通信框架(例如 Netty)将数据写入到通道(Channel)的发送缓冲区(ByteBuf)中。
  1. 服务端接收请求数据:
  • 服务端通过网络接收到客户端发送的请求数据。
  • 服务端的网络通信框架(例如 Netty)将接收到的数据存储在接收缓冲区(ByteBuf)中。
  1. 反序列化请求数据:
  • 服务端将接收到的字节序列进行反序列化,将其转换为请求对象(例如 RpcRequest)。
  • 服务端使用相同的序列化方式进行反序列化操作,以还原请求对象的结构和内容。
  1. 处理请求:
  • 服务端根据请求对象中的数据,执行相应的业务逻辑。
  • 在处理请求的过程中,服务端可能会使用一些上下文数据,例如用户身份信息、权限检查等。
  1. 序列化响应数据:
  • 服务端根据业务逻辑创建响应对象(例如 RpcResponse)。
  • 响应对象包含了需要发送给客户端的数据,例如响应结果、错误信息等。
  • 服务端将响应对象进行序列化,将其转换为字节序列以便在网络中传输。
  1. 将序列化后的响应数据发送给客户端:
  • 服务端通过网络将序列化后的响应数据发送给客户端。
  • 服务端的网络通信框架将数据写入到通道的发送缓冲区中。
  1. 客户端接收响应数据:
  • 客户端通过网络接收到服务端发送的响应数据。
  • 客户端的网络通信框架将接收到的数据存储在接收缓冲区中。
  1. 反序列化响应数据:
  • 客户端将接收到的字节序列进行反序列化,将其转换为响应对象(例如 RpcResponse)。
  • 客户端使用相同的序列化方式进行反序列化操作,以还原响应对象的结构和内容。
  1. 使用响应数据:
  • 客户端根据响应对象中的数据,进行相应的处理,例如获取响应结果、处理错误信息等。
  1. 使用 Attribute 存储上下文数据:
  • 在客户端和服务端的处理过程中,可能需要存储一些上下文数据,例如用户身份、请求 ID 等。
  • 通过使用 Attribute,客户端和服务端可以将这些上下文数据与通道(Channel)进行关联。
  • 客户端和服务端可以使用 Channelattr(key).set(value) 方法将数据存储到 Attribute 中。
  • 存储在 Attribute 中的数据可以在客户端和服务端的不同处理方法中方便地获取和使用,而无需显式传递参数。

总结起来,客户端通过序列化请求数据并发送给服务端。服务端接收请求数据,处理请求并生成响应数据,然后将响应数据序列化并发送给客户端。客户端接收响应数据并进行处理。在整个过程中,通过使用 Attribute 存储上下文数据,可以方便地在客户端和服务端之间传递额外的信息和上下文。


  目录