前言
自己看的
传输协议部分
为什么要有传输协议?
通过设计协议,我们定义需要传输哪些类型的数据, 并且还会规定每一种类型的数据应该占多少字节。这样我们在接收到二进制数据之后,就可以正确的解析出我们需要的数据。
通常一些标准的 RPC 协议包含下面这些内容:
● 魔数 : 通常是 4 个字节。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。
● 序列化器编号 :标识序列化的方式,比如是使用 Java 自带的序列化,还是 json,kryo 等序列化方式。
● 消息体长度 : 运行时计算出来。
● ……
所以本项目,作者采用的传输协议是:
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
* | magic code |version | full length | messageType| codec|compress| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
* | |
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id)
Magic Code(魔法数)(4字节):
- 用于标识消息的开始,通常是一个固定的值,可以防止误解析非法数据为有效消息。这个魔数主要是为了筛选来到服务端的数据包,有了这个魔数之后,服务端首先取出前面四个字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,为了安全考虑可以直接关闭连接以节省资源。
Version(版本)(1字节):
- 表示消息协议的版本号,用于在未来协议更新时进行兼容性处理。
Full Length(消息长度)(4字节):
- 表示整个消息的长度,包括消息头和消息体。用于解析时确定消息的边界。
Message Type(消息类型)(1字节):
- 表示消息的类型,可能是请求消息、响应消息、心跳消息等。
Codec(序列化类型)(1字节):
- 表示消息体采用的序列化方式,如JSON、Protobuf、Hessian等。
Compress(压缩类型)(1字节):
- 表示消息体是否经过压缩,以及采用的压缩算法。
Request ID(请求的ID)(4字节):
- 表示请求的唯一标识,用于将请求与响应进行关联。
Body(消息体):
- 包含实际的业务数据,可以是序列化后的对象等。
编解码器
编解码器的作用主要是让我们在 Netty 进行网络传输所用的对象类型 ByteBuf 与 我们代码层面需要的业务对象之间转换。
这部分呢,其实在之前的编解码器博客已经分析过了
只不过这里要加上这个传输协议
RpcMessageDecoder
自定义解码器。负责处理”入站”消息,将 ByteBuf 消息格式的对象转换为我们需要的业务对象。
有一说一还是有点不一样的,接下来我们来分析代码
构造方法
一上来给我来了个看不懂的勾八东西
public RpcMessageDecoder() {
// lengthFieldOffset: magic code is 4B, and version is 1B, and then full length. so value is 5
// lengthFieldLength: full length is 4B. so value is 4
// lengthAdjustment: full length include all data and read 9 bytes before, so the left length is (fullLength-9). so values is -9
// initialBytesToStrip: we will check magic code and version manually, so do not strip any bytes. so values is 0
this(RpcConstants.MAX_FRAME_LENGTH, 5, 4, -9, 0);
}
/**
* @param maxFrameLength Maximum frame length. It decide the maximum length of data that can be received.
* If it exceeds, the data will be discarded.
* @param lengthFieldOffset Length field offset. The length field is the one that skips the specified length of byte.
* @param lengthFieldLength The number of bytes in the length field.
* @param lengthAdjustment The compensation value to add to the value of the length field
* @param initialBytesToStrip Number of bytes skipped.
* If you need to receive all of the header+body data, this value is 0
* if you only want to receive the body data, then you need to skip the number of bytes consumed by the header.
*/
public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
RpcMessageDecoder继承了Netty提供的LengthFieldBasedFrameDecoder
maxFrameLength
: 最大帧长度,指定能够处理的帧的最大长度。如果帧的长度超过这个值,将抛出TooLongFrameException
,表示数据帧太长,超出系统能够处理的范围,可能是恶意攻击或异常情况。
lengthFieldOffset
: 长度字段偏移量,指定长度字段在帧中的偏移量。在这里,魔法数占4个字节,版本号占1个字节,然后是消息总长度字段,因此长度字段的偏移量是5。也就是说,下标5后面,是长度字段
lengthFieldLength
: 长度字段的字节长度,指定长度字段占用几个字节。在这里,消息总长度字段占4个字节,所以长度字段的字节长度是4。
lengthAdjustment
: 长度调整值,指定长度字段的补偿值,用于计算实际的消息长度。在这里,由于总长度字段包括所有数据,因此读取9个字节之前,剩下的长度是(fullLength-9)
,所以长度调整值是-9。为什么是减去9呢?因为在整个消息中,前面有4字节的魔法数,1字节的版本号,4字节的总长度字段,所以这9个字节是在计算消息长度时已经包括在内的,需要减去。举个例子,假设读取到的长度字段的值是20,那么实际的消息长度就是20 - 9 = 11字节。这样就可以正确计算出消息的实际长度,以便后续的解码过程。
initialBytesToStrip
: 跳过的初始字节数,指定从解码帧中去掉的字节数,这些字节包括长度字段本身。在这里,由于将手动检查魔法数和版本号,所以不需要跳过任何字节,值是0。
decode
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decoded = super.decode(ctx, in);
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) decoded;
if (frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) {
try {
return decodeFrame(frame);
} catch (Exception e) {
log.error("Decode frame error!", e);
throw e;
} finally {
frame.release();
}
}
}
return decoded;
}
覆盖了父类的decode方法
Object decoded = super.decode(ctx, in);
: 这一行调用了父类LengthFieldBasedFrameDecoder
的decode
方法,获得解码后的结果。LengthFieldBasedFrameDecoder
会根据长度字段的信息,将输入的ByteBuf
切割成一帧一帧的消息。
if (decoded instanceof ByteBuf) {
: 这里检查decoded
是否是ByteBuf
的实例。如果是,说明解码成功,获取到了一帧完整的消息。
ByteBuf frame = (ByteBuf) decoded;
: 将解码后的结果强制转换为ByteBuf
类型,方便后续处理。
if (frame.readableBytes() >= RpcConstants.TOTAL_LENGTH) {
: 这里检查帧的可读字节数是否大于或等于RpcConstants.TOTAL_LENGTH
,即消息的总长度。这是为了确保帧中包含了完整的消息。
return decodeFrame(frame);
: 如果帧中包含完整的消息,调用decodeFrame
方法进行进一步的解码。
frame.release();
: 在finally
块中释放frame
,确保不会造成内存泄漏。
return decoded;
: 如果帧中不包含完整的消息,则直接返回上一层解码器的结果,等待更多数据的到来。
decodeframe
将二进制数据解析成可读的 RpcMessage
对象
private Object decodeFrame(ByteBuf in) {
// note: must read ByteBuf in order
checkMagicNumber(in);
checkVersion(in);
int fullLength = in.readInt();
// build RpcMessage object
byte messageType = in.readByte();
byte codecType = in.readByte();
byte compressType = in.readByte();
int requestId = in.readInt();
RpcMessage rpcMessage = RpcMessage.builder()
.codec(codecType)
.requestId(requestId)
.messageType(messageType).build();
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
rpcMessage.setData(RpcConstants.PING);
return rpcMessage;
}
if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
rpcMessage.setData(RpcConstants.PONG);
return rpcMessage;
}
int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
in.readBytes(bs);
// decompress the bytes
String compressName = CompressTypeEnum.getName(compressType);
Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
.getExtension(compressName);
bs = compress.decompress(bs);
// deserialize the object
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
log.info("codec name: [{}] ", codecName);
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
if (messageType == RpcConstants.REQUEST_TYPE) {
RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
rpcMessage.setData(tmpValue);
} else {
RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class);
rpcMessage.setData(tmpValue);
}
}
return rpcMessage;
}
逐步分析
checkMagicNumber(in);//检查魔术
checkVersion(in);//检查版本
int fullLength = in.readInt();//传输协议里吗,消息长度四个字节,正好一个int
// build RpcMessage object
byte messageType = in.readByte();//一个字节
byte codecType = in.readByte();//一个字节
byte compressType = in.readByte();//一个字节
int requestId = in.readInt();//四个字节
分别于传输协议对应
构建 RpcMessage 对象
RpcMessage rpcMessage = RpcMessage.builder()
.codec(codecType)
.requestId(requestId)
.messageType(messageType).build();
利用上述解析到的信息构建了 RpcMessage
对象。这里应该是使用了构建者模式,根据解析得到的信息初始化 RpcMessage
。
处理心跳消息
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
rpcMessage.setData(RpcConstants.PING);
return rpcMessage;
}
if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
rpcMessage.setData(RpcConstants.PONG);
return rpcMessage;
}
处理非心跳消息的数据解析
int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;//计算消息体的长度。
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
in.readBytes(bs);//从 ByteBuf 中读取相应长度的字节数据
// 解压缩数据
String compressName = CompressTypeEnum.getName(compressType);//获取压缩类型名称
Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
.getExtension(compressName);//获取压缩算法的实例
bs = compress.decompress(bs);//按这个实例去解压缩
// deserialize the object
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
log.info("codec name: [{}] ", codecName);
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
if (messageType == RpcConstants.REQUEST_TYPE) {
RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
rpcMessage.setData(tmpValue);
} else {
RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class);
rpcMessage.setData(tmpValue);
}
}
ExtensionLoader.getExtensionLoader(Compress.class).getExtension(compressName)
:这里使用了扩展加载器模式。ExtensionLoader
是一个通用的扩展加载器,用于加载各种扩展点的实现。在这里,加载了实现了 Compress
接口的压缩算法的具体实现类。compressName
是之前获取的压缩算法类型的名称,通过加载器获取对应的压缩算法的实例。
SerializationTypeEnum.getName(rpcMessage.getCodec())
:通过 SerializationTypeEnum
中的方法获取给定序列化类型的名称。SerializationTypeEnum
应该是一个枚举,其中包含了各种序列化算法的类型。
ExtensionLoader.getExtensionLoader(Serializer.class).getExtension(codecName)
:这里同样使用了扩展加载器模式。ExtensionLoader
加载了实现了 Serializer
接口的序列化算法的具体实现类。codecName
是之前获取的序列化算法类型的名称,通过加载器获取对应的序列化算法的实例。
这个extensionloader,后面再说,你只要知道他现在能够获得对应的实例化对象就行了
根据消息类型的判断:
- 如果消息类型是请求类型 (
RpcConstants.REQUEST_TYPE
),则调用序列化算法的deserialize
方法将接收到的字节数组bs
反序列化为RpcRequest
对象,并将其设置为rpcMessage
的数据部分。 - 如果消息类型不是请求类型,即为响应类型,则调用序列化算法的
deserialize
方法将接收到的字节数组bs
反序列化为RpcResponse
对象,并将其设置为rpcMessage
的数据部分。
checkverison
private void checkVersion(ByteBuf in) {
// read the version and compare
byte version = in.readByte();
if (version != RpcConstants.VERSION) {
throw new RuntimeException("version isn't compatible" + version);
}
}
就是读取version,并且检查
checkMagicNumber
private void checkMagicNumber(ByteBuf in) {
// read the first 4 bit, which is the magic number, and compare
int len = RpcConstants.MAGIC_NUMBER.length;
byte[] tmp = new byte[len];
in.readBytes(tmp);
for (int i = 0; i < len; i++) {
if (tmp[i] != RpcConstants.MAGIC_NUMBER[i]) {
throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(tmp));
}
}
}
就是一位一位去比较魔术对不对
RpcMessageEncoder
@Slf4j
public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
try {
out.writeBytes(RpcConstants.MAGIC_NUMBER);
out.writeByte(RpcConstants.VERSION);
// leave a place to write the value of full length
out.writerIndex(out.writerIndex() + 4);
byte messageType = rpcMessage.getMessageType();
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeByte(CompressTypeEnum.GZIP.getCode());
out.writeInt(ATOMIC_INTEGER.getAndIncrement());
// build full length
byte[] bodyBytes = null;
int fullLength = RpcConstants.HEAD_LENGTH;
// if messageType is not heartbeat message,fullLength = head length + body length
if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
&& messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
// serialize the object
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
log.info("codec name: [{}] ", codecName);
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
bodyBytes = serializer.serialize(rpcMessage.getData());
// compress the bytes
String compressName = CompressTypeEnum.getName(rpcMessage.getCompress());
Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
.getExtension(compressName);
bodyBytes = compress.compress(bodyBytes);
fullLength += bodyBytes.length;
}
if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}
int writeIndex = out.writerIndex();
out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);
out.writeInt(fullLength);
out.writerIndex(writeIndex);
} catch (Exception e) {
log.error("Encode request error!", e);
}
}
}
ATOMIC_INTEGER
用于为每个请求生成一个唯一的标识。每次调用 getAndIncrement
时,都会返回当前值,并将其递增。因此,每个请求都能够获得不同的唯一标识。
encode方法
用于将 RpcMessage
对象编码成字节流以便发送到网络。以下是主要步骤的解释:
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
try {
out.writeBytes(RpcConstants.MAGIC_NUMBER);//写入魔术
out.writeByte(RpcConstants.VERSION);//写入版本
// leave a place to write the value of full length
out.writerIndex(out.writerIndex() + 4);//预留四个byte的地方给总长度
byte messageType = rpcMessage.getMessageType();//得到消息类型
out.writeByte(messageType);//写入信息类型
out.writeByte(rpcMessage.getCodec());//写入序列化类型
out.writeByte(CompressTypeEnum.GZIP.getCode());//写入压缩方式
out.writeInt(ATOMIC_INTEGER.getAndIncrement());//把唯一的,自增的int值,作为唯一标识
// build full length
byte[] bodyBytes = null;//存放消息体,也便于计算长度
int fullLength = RpcConstants.HEAD_LENGTH;//初始化消息的总长度,初始值为头部长度。
// 如果不是心跳消息,总长度等于头+消息体
if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
&& messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
// serialize the object
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
log.info("codec name: [{}] ", codecName);
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);//还是一样的,用extensionloadre获取具体的序列化对象,
bodyBytes = serializer.serialize(rpcMessage.getData());//然后进行序列化
// compress the bytes
String compressName = CompressTypeEnum.getName(rpcMessage.getCompress());
Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
.getExtension(compressName);
bodyBytes = compress.compress(bodyBytes);//压缩
fullLength += bodyBytes.length;//加上到总长度
}
if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}
//写入bytebuf中
int writeIndex = out.writerIndex();//获取当前的写入位置。
out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);//移动到预留空间的位置。
out.writeInt(fullLength);//把总长度的值写到对应的地方
out.writerIndex(writeIndex);//还原写入位置。
} catch (Exception e) {
log.error("Encode request error!", e);
}
}
总结
总的来说,编码就是把消息,按照协议,进行序列化和压缩之后,写到bytebuf之中
解码就是从bytebuf中读取,然后按照协议分析是否正确并且得到消息体,然后解压后和反序列化