自学rpc之传输协议


前言

自己看的

传输协议部分

为什么要有传输协议?

通过设计协议,我们定义需要传输哪些类型的数据, 并且还会规定每一种类型的数据应该占多少字节。这样我们在接收到二进制数据之后,就可以正确的解析出我们需要的数据。

通常一些标准的 RPC 协议包含下面这些内容:

● 魔数 : 通常是 4 个字节。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。
● 序列化器编号 :标识序列化的方式,比如是使用 Java 自带的序列化,还是 json,kryo 等序列化方式。
● 消息体长度 : 运行时计算出来。
● ……

所以本项目,作者采用的传输协议是:

*   0     1     2     3     4        5     6     7     8         9          10      11     12  13  14   15 16
*   +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
*   |   magic   code        |version | full length         | messageType| codec|compress|    RequestId       |
*   +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
*   |                                                                                                       |
*   |                                         body                                                          |
*   |                                                                                                       |
*   |                                        ... ...                                                        |
*   +-------------------------------------------------------------------------------------------------------+
* 4B  magic code(魔法数)   1B version(版本)   4B full length(消息长度)    1B messageType(消息类型)
* 1B compress(压缩类型) 1B codec(序列化类型)    4B  requestId(请求的Id)

Magic Code(魔法数)(4字节):

  • 用于标识消息的开始,通常是一个固定的值,可以防止误解析非法数据为有效消息。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。

Version(版本)(1字节):

  • 表示消息协议的版本号,用于在未来协议更新时进行兼容性处理。

Full Length(消息长度)(4字节):

  • 表示整个消息的长度,包括消息头和消息体。用于解析时确定消息的边界。

Message Type(消息类型)(1字节):

  • 表示消息的类型,可能是请求消息、响应消息、心跳消息等。

Codec(序列化类型)(1字节):

  • 表示消息体采用的序列化方式,如JSON、Protobuf、Hessian等。

Compress(压缩类型)(1字节):

  • 表示消息体是否经过压缩,以及采用的压缩算法。

Request ID(请求的ID)(4字节):

  • 表示请求的唯一标识,用于将请求与响应进行关联。

Body(消息体):

  • 包含实际的业务数据,可以是序列化后的对象等。

编解码器

编解码器的作用主要是让我们在 Netty 进行网络传输所用的对象类型 ByteBuf 与 我们代码层面需要的业务对象之间转换。

这部分呢,其实在之前的编解码器博客已经分析过了

只不过这里要加上这个传输协议

RpcMessageDecoder

自定义解码器。负责处理”入站”消息,将 ByteBuf 消息格式的对象转换为我们需要的业务对象。

有一说一还是有点不一样的,接下来我们来分析代码

构造方法

一上来给我来了个看不懂的勾八东西

public RpcMessageDecoder() {
       // lengthFieldOffset: magic code is 4B, and version is 1B, and then full length. so value is 5
       // lengthFieldLength: full length is 4B. so value is 4
       // lengthAdjustment: full length include all data and read 9 bytes before, so the left length is (fullLength-9). so values is -9
       // initialBytesToStrip: we will check magic code and version manually, so do not strip any bytes. so values is 0
       this(RpcConstants.MAX_FRAME_LENGTH, 5, 4, -9, 0);
   }

   /**
    * @param maxFrameLength      Maximum frame length. It decide the maximum length of data that can be received.
    *                            If it exceeds, the data will be discarded.
    * @param lengthFieldOffset   Length field offset. The length field is the one that skips the specified length of byte.
    * @param lengthFieldLength   The number of bytes in the length field.
    * @param lengthAdjustment    The compensation value to add to the value of the length field
    * @param initialBytesToStrip Number of bytes skipped.
    *                            If you need to receive all of the header+body data, this value is 0
    *                            if you only want to receive the body data, then you need to skip the number of bytes consumed by the header.
    */
   public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
                            int lengthAdjustment, int initialBytesToStrip) {
       super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
   }

RpcMessageDecoder继承了Netty提供的LengthFieldBasedFrameDecoder

maxFrameLength: 最大帧长度,指定能够处理的帧的最大长度。如果帧的长度超过这个值,将抛出TooLongFrameException,表示数据帧太长,超出系统能够处理的范围,可能是恶意攻击或异常情况。

lengthFieldOffset: 长度字段偏移量,指定长度字段在帧中的偏移量。在这里,魔法数占4个字节,版本号占1个字节,然后是消息总长度字段,因此长度字段的偏移量是5。也就是说,下标5后面,是长度字段

lengthFieldLength: 长度字段的字节长度,指定长度字段占用几个字节。在这里,消息总长度字段占4个字节,所以长度字段的字节长度是4。

lengthAdjustment: 长度调整值,指定长度字段的补偿值,用于计算实际的消息长度。在这里,由于总长度字段包括所有数据,因此读取9个字节之前,剩下的长度是(fullLength-9),所以长度调整值是-9。为什么是减去9呢?因为在整个消息中,前面有4字节的魔法数,1字节的版本号,4字节的总长度字段,所以这9个字节是在计算消息长度时已经包括在内的,需要减去。举个例子,假设读取到的长度字段的值是20,那么实际的消息长度就是20 - 9 = 11字节。这样就可以正确计算出消息的实际长度,以便后续的解码过程。

initialBytesToStrip: 跳过的初始字节数,指定从解码帧中去掉的字节数,这些字节包括长度字段本身。在这里,由于将手动检查魔法数和版本号,所以不需要跳过任何字节,值是0。

decode

@Override
   protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
       Object decoded = super.decode(ctx, in);
       if (decoded instanceof ByteBuf) {
           ByteBuf frame = (ByteBuf) decoded;
           if (frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) {
               try {
                   return decodeFrame(frame);
               } catch (Exception e) {
                   log.error("Decode frame error!", e);
                   throw e;
               } finally {
                   frame.release();
               }
           }

       }
       return decoded;
   }

覆盖了父类的decode方法

Object decoded = super.decode(ctx, in);: 这一行调用了父类LengthFieldBasedFrameDecoderdecode方法,获得解码后的结果。LengthFieldBasedFrameDecoder会根据长度字段的信息,将输入的ByteBuf切割成一帧一帧的消息。

if (decoded instanceof ByteBuf) {: 这里检查decoded是否是ByteBuf的实例。如果是,说明解码成功,获取到了一帧完整的消息。

ByteBuf frame = (ByteBuf) decoded;: 将解码后的结果强制转换为ByteBuf类型,方便后续处理。

if (frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) {: 这里检查帧的可读字节数是否大于或等于RpcConstants.TOTAL_LENGTH,即消息的总长度。这是为了确保帧中包含了完整的消息。

return decodeFrame(frame);: 如果帧中包含完整的消息,调用decodeFrame方法进行进一步的解码。

frame.release();: 在finally块中释放frame,确保不会造成内存泄漏。

return decoded;: 如果帧中不包含完整的消息,则直接返回上一层解码器的结果,等待更多数据的到来。

decodeframe

将二进制数据解析成可读的 RpcMessage 对象

private Object decodeFrame(ByteBuf in) {
    // note: must read ByteBuf in order
    checkMagicNumber(in);
    checkVersion(in);
    int fullLength = in.readInt();
    // build RpcMessage object
    byte messageType = in.readByte();
    byte codecType = in.readByte();
    byte compressType = in.readByte();
    int requestId = in.readInt();
    RpcMessage rpcMessage = RpcMessage.builder()
            .codec(codecType)
            .requestId(requestId)
            .messageType(messageType).build();
    if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
        rpcMessage.setData(RpcConstants.PING);
        return rpcMessage;
    }
    if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
        rpcMessage.setData(RpcConstants.PONG);
        return rpcMessage;
    }
    int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;
    if (bodyLength > 0) {
        byte[] bs = new byte[bodyLength];
        in.readBytes(bs);
        // decompress the bytes
        String compressName = CompressTypeEnum.getName(compressType);
        Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
                .getExtension(compressName);
        bs = compress.decompress(bs);
        // deserialize the object
        String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
        log.info("codec name: [{}] ", codecName);
        Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
                .getExtension(codecName);
        if (messageType == RpcConstants.REQUEST_TYPE) {
            RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
            rpcMessage.setData(tmpValue);
        } else {
            RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class);
            rpcMessage.setData(tmpValue);
        }
    }
    return rpcMessage;

}

逐步分析

checkMagicNumber(in);//检查魔术
checkVersion(in);//检查版本
int fullLength = in.readInt();//传输协议里吗,消息长度四个字节,正好一个int
// build RpcMessage object
byte messageType = in.readByte();//一个字节
byte codecType = in.readByte();//一个字节
byte compressType = in.readByte();//一个字节
int requestId = in.readInt();//四个字节

分别于传输协议对应

构建 RpcMessage 对象

RpcMessage rpcMessage = RpcMessage.builder()
        .codec(codecType)
        .requestId(requestId)
        .messageType(messageType).build();

利用上述解析到的信息构建了 RpcMessage 对象。这里应该是使用了构建者模式,根据解析得到的信息初始化 RpcMessage

处理心跳消息

if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
    rpcMessage.setData(RpcConstants.PING);
    return rpcMessage;
}
if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
    rpcMessage.setData(RpcConstants.PONG);
    return rpcMessage;
}

处理非心跳消息的数据解析

int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;//计算消息体的长度。
if (bodyLength > 0) {
    byte[] bs = new byte[bodyLength];
    in.readBytes(bs);//从 ByteBuf 中读取相应长度的字节数据
    // 解压缩数据
    String compressName = CompressTypeEnum.getName(compressType);//获取压缩类型名称
    
    Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
            .getExtension(compressName);//获取压缩算法的实例
    bs = compress.decompress(bs);//按这个实例去解压缩
   
    // deserialize the object
    String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
    log.info("codec name: [{}] ", codecName);
    Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
            .getExtension(codecName);
    
    if (messageType == RpcConstants.REQUEST_TYPE) {
        RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
        rpcMessage.setData(tmpValue);
    } else {
        RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class);
        rpcMessage.setData(tmpValue);
    }
}

ExtensionLoader.getExtensionLoader(Compress.class).getExtension(compressName):这里使用了扩展加载器模式。ExtensionLoader 是一个通用的扩展加载器,用于加载各种扩展点的实现。在这里,加载了实现了 Compress 接口的压缩算法的具体实现类。compressName 是之前获取的压缩算法类型的名称,通过加载器获取对应的压缩算法的实例。

SerializationTypeEnum.getName(rpcMessage.getCodec()):通过 SerializationTypeEnum 中的方法获取给定序列化类型的名称。SerializationTypeEnum 应该是一个枚举,其中包含了各种序列化算法的类型。

ExtensionLoader.getExtensionLoader(Serializer.class).getExtension(codecName):这里同样使用了扩展加载器模式。ExtensionLoader 加载了实现了 Serializer 接口的序列化算法的具体实现类。codecName 是之前获取的序列化算法类型的名称,通过加载器获取对应的序列化算法的实例。

这个extensionloader,后面再说,你只要知道他现在能够获得对应的实例化对象就行了

根据消息类型的判断:

  • 如果消息类型是请求类型 (RpcConstants.REQUEST_TYPE),则调用序列化算法的 deserialize 方法将接收到的字节数组 bs 反序列化为 RpcRequest 对象,并将其设置为 rpcMessage 的数据部分。
  • 如果消息类型不是请求类型,即为响应类型,则调用序列化算法的 deserialize 方法将接收到的字节数组 bs 反序列化为 RpcResponse 对象,并将其设置为 rpcMessage 的数据部分。

checkverison

private void checkVersion(ByteBuf in) {
    // read the version and compare
    byte version = in.readByte();
    if (version != RpcConstants.VERSION) {
        throw new RuntimeException("version isn't compatible" + version);
    }
}

就是读取version,并且检查

checkMagicNumber

private void checkMagicNumber(ByteBuf in) {
    // read the first 4 bit, which is the magic number, and compare
    int len = RpcConstants.MAGIC_NUMBER.length;
    byte[] tmp = new byte[len];
    in.readBytes(tmp);
    for (int i = 0; i < len; i++) {
        if (tmp[i] != RpcConstants.MAGIC_NUMBER[i]) {
            throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(tmp));
        }
    }
}

就是一位一位去比较魔术对不对

RpcMessageEncoder

@Slf4j
public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
    private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);

    @Override
    protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
        try {
            out.writeBytes(RpcConstants.MAGIC_NUMBER);
            out.writeByte(RpcConstants.VERSION);
            // leave a place to write the value of full length
            out.writerIndex(out.writerIndex() + 4);
            byte messageType = rpcMessage.getMessageType();
            out.writeByte(messageType);
            out.writeByte(rpcMessage.getCodec());
            out.writeByte(CompressTypeEnum.GZIP.getCode());
            out.writeInt(ATOMIC_INTEGER.getAndIncrement());
            // build full length
            byte[] bodyBytes = null;
            int fullLength = RpcConstants.HEAD_LENGTH;
            // if messageType is not heartbeat message,fullLength = head length + body length
            if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
                    && messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
                // serialize the object
                String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
                log.info("codec name: [{}] ", codecName);
                Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
                        .getExtension(codecName);
                bodyBytes = serializer.serialize(rpcMessage.getData());
                // compress the bytes
                String compressName = CompressTypeEnum.getName(rpcMessage.getCompress());
                Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
                        .getExtension(compressName);
                bodyBytes = compress.compress(bodyBytes);
                fullLength += bodyBytes.length;
            }

            if (bodyBytes != null) {
                out.writeBytes(bodyBytes);
            }
            int writeIndex = out.writerIndex();
            out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);
            out.writeInt(fullLength);
            out.writerIndex(writeIndex);
        } catch (Exception e) {
            log.error("Encode request error!", e);
        }

    }
}

ATOMIC_INTEGER 用于为每个请求生成一个唯一的标识。每次调用 getAndIncrement 时,都会返回当前值,并将其递增。因此,每个请求都能够获得不同的唯一标识。

encode方法

用于将 RpcMessage 对象编码成字节流以便发送到网络。以下是主要步骤的解释:

@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
    try {
        out.writeBytes(RpcConstants.MAGIC_NUMBER);//写入魔术
        out.writeByte(RpcConstants.VERSION);//写入版本
        // leave a place to write the value of full length
        out.writerIndex(out.writerIndex() + 4);//预留四个byte的地方给总长度
        byte messageType = rpcMessage.getMessageType();//得到消息类型
        out.writeByte(messageType);//写入信息类型
        out.writeByte(rpcMessage.getCodec());//写入序列化类型
        out.writeByte(CompressTypeEnum.GZIP.getCode());//写入压缩方式
        out.writeInt(ATOMIC_INTEGER.getAndIncrement());//把唯一的,自增的int值,作为唯一标识
        // build full length
        byte[] bodyBytes = null;//存放消息体,也便于计算长度
        int fullLength = RpcConstants.HEAD_LENGTH;//初始化消息的总长度,初始值为头部长度。
        // 如果不是心跳消息,总长度等于头+消息体
        if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
                && messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
            // serialize the object
            String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
            log.info("codec name: [{}] ", codecName);
            Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
                    .getExtension(codecName);//还是一样的,用extensionloadre获取具体的序列化对象,
            bodyBytes = serializer.serialize(rpcMessage.getData());//然后进行序列化
            // compress the bytes
            String compressName = CompressTypeEnum.getName(rpcMessage.getCompress());
            Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
                    .getExtension(compressName);
            bodyBytes = compress.compress(bodyBytes);//压缩
            fullLength += bodyBytes.length;//加上到总长度
        }

        if (bodyBytes != null) {
            out.writeBytes(bodyBytes);
        }
        //写入bytebuf中
        int writeIndex = out.writerIndex();//获取当前的写入位置。
        out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);//移动到预留空间的位置。
        out.writeInt(fullLength);//把总长度的值写到对应的地方
        out.writerIndex(writeIndex);//还原写入位置。
    } catch (Exception e) {
        log.error("Encode request error!", e);
    }

}

总结

总的来说,编码就是把消息,按照协议,进行序列化和压缩之后,写到bytebuf之中

解码就是从bytebuf中读取,然后按照协议分析是否正确并且得到消息体,然后解压后和反序列化


  目录