前言
学完了黑马netty,再来看rpc,很多东西迎刃而解
实体类
RpcMessage
就是用来表示消息的,
public class RpcMessage {
/**
* rpc message type
*/
private byte messageType;
/**
* serialization type
*/
private byte codec;
/**
* compress type
*/
private byte compress;
/**
* request id
*/
private int requestId;
/**
* request data
*/
private Object data;
}
RpcRequest
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 1905122041950251207L;
private String requestId;//RPC请求的唯一标识符
private String interfaceName;//RPC调用的接口名称
private String methodName;//要调用的方法名称
private Object[] parameters;//要传递给方法的参数数组
private Class<?>[] paramTypes;//与参数对应的参数类型数组
private String version;// 字段(服务版本)主要是为后续不兼容升级提供可能
private String group;//用于处理一个接口有多个类实现的情况。
public String getRpcServiceName() {
return this.getInterfaceName() + this.getGroup() + this.getVersion();
}
}
RpcResponse
public class RpcResponse<T> implements Serializable {
private static final long serialVersionUID = 715745410605631233L;
private String requestId;//RPC响应的唯一标识符
/**
* response code
*/
private Integer code;
/**
* response message
*/
private String message;
/**
* response body
*/
private T data;
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());
response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());
response.setRequestId(requestId);
if (null != data) {
response.setData(data);
}
return response;
}
public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(rpcResponseCodeEnum.getCode());
response.setMessage(rpcResponseCodeEnum.getMessage());
return response;
}
}
success和fail都很简单,就是根据成功与否去设置消息的内容
Netty客户端
NettyRpcClient
@Slf4j
public final class NettyRpcClient implements RpcRequestTransport {
private final ServiceDiscovery serviceDiscovery;//服务发现的实例,用于查找服务的地址
private final UnprocessedRequests unprocessedRequests;//未处理的请求实例,用于存储未完成的RPC请求。
private final ChannelProvider channelProvider;//通道提供者的实例,用于管理和提供与服务器地址相关联的通道。
private final Bootstrap bootstrap;//Netty的Bootstrap实例,用于配置和初始化Netty客户端。
private final EventLoopGroup eventLoopGroup;//Netty的EventLoopGroup实例,用于处理事件循环。
public NettyRpcClient() {
// initialize resources such as EventLoopGroup, Bootstrap
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// The timeout period of the connection.
// If this time is exceeded or the connection cannot be established, the connection fails.
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// If no data is sent to the server within 15 seconds, a heartbeat request is sent心跳机制
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));//用于处理连接的空闲状态
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(new NettyRpcClientHandler());
}
});
this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
}
/**
* connect server and get the channel ,so that you can send rpc message to server
*
* @param inetSocketAddress server address
* @return the channel
*/
@SneakyThrows
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();//这个异步操作的结果是一个包含泛型类型为 Object 的 RpcResponse 对象。
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);//使用 serviceDiscovery(服务发现)查找并获取RPC请求的服务地址。
// get server address related channel
Channel channel = getChannel(inetSocketAddress);//通过 getChannel 方法获取与指定服务器地址关联的 Channel。
if (channel.isActive()) {
// put unprocessed request
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);//将 RPC 请求放入未处理请求的集合中,以便后续处理响应。
RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
.codec(SerializationTypeEnum.HESSIAN.getCode())
.compress(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE).build();
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
});
} else {
throw new IllegalStateException();
}
return resultFuture;
}
public Channel getChannel(InetSocketAddress inetSocketAddress) {
Channel channel = channelProvider.get(inetSocketAddress);
if (channel == null) {
channel = doConnect(inetSocketAddress);
channelProvider.set(inetSocketAddress, channel);
}
return channel;
}
public void close() {
eventLoopGroup.shutdownGracefully();
}
}
初始化的部分很简单 ,我这里是用了ExtensionLoader和SingletonFactory去获取对象,这个然后单说]
来看看sendRequests
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();//这个异步操作的结果是一个包含泛型类型为 Object 的 RpcResponse 对象。
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);//使用 serviceDiscovery(服务发现)查找并获取RPC请求的服务地址。
// get server address related channel
Channel channel = getChannel(inetSocketAddress);//通过 getChannel 方法获取与指定服务器地址关联的 Channel。
if (channel.isActive()) {
// put unprocessed request
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);//将 RPC 请求放入未处理请求的集合中,以便后续处理响应。
RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
.codec(SerializationTypeEnum.HESSIAN.getCode())
.compress(CompressTypeEnum.GZIP.getCode())
.messageType(RpcConstants.REQUEST_TYPE).build();
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
});
} else {
throw new IllegalStateException();
}
return resultFuture;//在这里不对响应进行处理,而是到unprocessedRequests的complete方法里面才处理,也就是说这里返回的resultFuture啥也没有
}
注意这里的resultFuture的用法,他是在unprocessedRequests里才处理的
unprocessedRequests
其实就是一个map,将对应的requeid和用来存储响应的resultFuture放进map里面
这样的好处就是:
如果在send发送完数据之后对channel进行手动阻塞等待返回Response,这样会导致Event Loop阻塞导致效率下降。优化后使用CompletableFuture包装Response实现线程同步,避免了Event Loop阻塞。
public class UnprocessedRequests {
public static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES=new ConcurrentHashMap<>();
public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
}
public void complete(RpcResponse<Object> rpcResponse)
{
CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
if(null!=future)
{
future.complete(rpcResponse);//在这个时候,future里面才存入了rpcResponse
}else
{
throw new IllegalArgumentException();
}
}
}
NettyRpcClientHandler
这个处理器被加到rpcclient的channel.pipeline()的最后面
自定义处理器,继承了ChannelInboundHandlerAdapter 说明它是一个入站处理器
重写了三个方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {//这个时候已经是经过解码了的所以可以直接判断他的类型
try {
log.info("client receive msg: [{}]", msg);
if (msg instanceof RpcMessage) {//判断接收到的消息是否是RpcMessage类型
RpcMessage tmp = (RpcMessage) msg;
byte messageType = tmp.getMessageType();//获取RpcMessage中的messageType字段,用于确定消息的类型
if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
log.info("heart [{}]", tmp.getData());
} else if (messageType == RpcConstants.RESPONSE_TYPE) {
RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();
unprocessedRequests.complete(rpcResponse);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}
将已经经过解码了的msg处理,如果是心跳消息,那么打印日志,不做其他处理,如果是响应的话,那么就把他转成Rpcrespnse对象,然后调用complete方法,来存到对应的future里面
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {//当触发了用户自定义事件时被调用
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {//写空闲状态表示一段时间内没有向服务器发送数据。,则发送心跳请求以维持与服务器的连接
log.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcMessage.setData(RpcConstants.PING);
channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);///这个监听器的作用是,在发送心跳请求时如果出现问题,就关闭当前通道,以便后续可能重新建立连接。
}
} else {
super.userEventTriggered(ctx, evt);
}
}
触发了用户自定义事件去调用,就是如果一段时间没有向服务器发送数据,就发送一个心跳请求,去维持链接
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("client catch exception:", cause);
cause.printStackTrace();
ctx.close();
}
这个就是捕获异常的,没什么
ChannelProvider
管理channel的,每一个ip:port(也就是注册的服务)有一个channel
有三个方法
private final Map<String, Channel> channelMap;//使用 ConcurrentHashMap 作为存储通道映射关系的容器。ConcurrentHashMap 是线程安全的,适用于并发环境。
public ChannelProvider() {
channelMap = new ConcurrentHashMap<>();
}
public Channel get(InetSocketAddress inetSocketAddress) {
String key = inetSocketAddress.toString();
// determine if there is a connection for the corresponding address
if (channelMap.containsKey(key)) {
Channel channel = channelMap.get(key);
// if so, determine if the connection is available, and if so, get it directly
if (channel != null && channel.isActive()) {
return channel;
} else {
channelMap.remove(key);
}
}
return null;
}
get:获得channel
set:设置
public void set(InetSocketAddress inetSocketAddress, Channel channel){
String key=inetSocketAddress.toString();
channelMap.put(key,channel);
}
remove:删除,但是没用到
public void remove(InetSocketAddress inetSocketAddress) {
String key = inetSocketAddress.toString();
channelMap.remove(key);
log.info("Channel map size :[{}]", channelMap.size());
}
Netty服务端
NettyRpcServer
主要就是启动服务端,然后提供了一个手动注册zk服务的方法,当然我们以后是用注解的方式注册服务
然后显式的提供了bossgroup,workergroup和defauxxx,defaxxx就是用来处理耗时的事件
静态变量
public static final int PORT = 9998;
private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
public void registerService(RpcServiceConfig rpcServiceConfig) {
serviceProvider.publishService(rpcServiceConfig);
}//显式注册服务的方法
start
public void start() {
CustomShutdownHook.getCustomShutdownHook().clearAll();//这一行代码清理了之前设置的关闭钩子,确保在服务器关闭时资源能够得到释放。
String host = InetAddress.getLocalHost().getHostAddress();// 获取了服务器主机的 IP 地址。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);//创建了用于接收客户端连接的主事件循环组 bossGroup,参数 1 表示线程数为 1。
EventLoopGroup workerGroup = new NioEventLoopGroup();//创建了用于处理客户端请求的工作事件循环组 workerGroup,采用默认线程数。
DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
RuntimeUtil.cpus() * 2,
ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false)
);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());//这个handler就会在serviceHandlerGroup上执行
}
});
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(host, PORT).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("occur exception when start server:", e);
} finally {
log.error("shutdown bossGroup and workerGroup");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
serviceHandlerGroup.shutdownGracefully();
}
}
NettyRpcServerHandler
继承了 ChannelInboundHandlerAdapter,也是一个入站处理器
拥有一个RpcRequestHandler这个处理器就是去执行我们调用的方法的,严格来说不是什么handler,只是叫这个名字
重写了三个方法:
channelRead
发生读事件执行,这里就是判断客户端发来的是什么请求,是心跳请求返回pong,否则就是执行方法的请求,调用handler执行,然后把结果返回给客户端
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try
{
if(msg instanceof RpcMessage)
{
log.info("server receive msg: [{}] ", msg);
byte messageType = ((RpcMessage) msg).getMessageType();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.HESSIAN.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
//如果是心跳请求,那么就返回个pong回去
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
rpcMessage.setData(RpcConstants.PONG);
}else {//否则就是有内容的了
RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
// Execute the target method (the method the client needs to execute) and return the method result
Object result = rpcRequestHandler.handle(rpcRequest);//执行请求,获得结果
log.info(String.format("server get result: %s", result.toString()));
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
//把结果写到response里面
if(ctx.channel().isActive()&&ctx.channel().isWritable())
{
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
rpcMessage.setData(rpcResponse);
}else//否则失败了,把失败的结果写到里面
{
RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
rpcMessage.setData(rpcResponse);
log.error("not writable now, message dropped");
}
}
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);//把结果写回给客户端
}
}finally {
ReferenceCountUtil.release(msg);
}
}
还有两个比较简单,不分析了
RpcRequestHandler
这个在handler包下,用于真正去调用方法,并不是pipeline里面的handler
@Slf4j
public class RpcRequestHandler {
private final ServiceProvider serviceProvider;
public RpcRequestHandler() {
serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
}
/**
* Processing rpcRequest: call the corresponding method, and then return the method
*/
public Object handle(RpcRequest rpcRequest) {
Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
return invokeTargetMethod(rpcRequest, service);
}
/**
* get method execution results
*
* @param rpcRequest client request
* @param service service object
* @return the result of the target method execution
*/
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
Object result;
try {
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
throw new RpcException(e.getMessage(), e);
}
return result;
}
}
这里理一理zk和服务端的关系:
每个服务提供者(服务端)都会在启动时向 ZooKeeper 注册自己的信息,例如服务名称、IP 地址、端口号等。这样,ZooKeeper 中就会维护一个服务实例的注册表,记录了所有可用的服务提供者及其相关信息。
客户端在需要调用某个服务时,会向 ZooKeeper 发起查询请求,获取相关服务的信息。然后,客户端可以选择一个可用的服务实例进行调用。
关于zK的,到了common里面细说
传输协议和编解码器
这部分看自学rpc之传输协议 | 说的 (gusmallwhite.github.io)即可
根之前一样,还是比较好理解 的
然后序列化用的是kryo序列化,这里看傻逼guide的就行
关于compress
项目里有个压缩的东西,爷没弄没搞明白
应该就是把东西压缩和解压缩
public interface Compress {
byte[] compress(byte[] bytes);
byte[] decompress(byte[] bytes);
}
GzipCompress给出了实现
使用gzip进行压缩
public class GzipCompress implements Compress {
private static final int BUFFER_SIZE = 1024 * 4;
@Override
public byte[] compress(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("bytes is null");
}
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out)) {
gzip.write(bytes);
gzip.flush();
gzip.finish();
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("gzip compress error", e);
}
}
@Override
public byte[] decompress(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("bytes is null");
}
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPInputStream gunzip = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
byte[] buffer = new byte[BUFFER_SIZE];
int n;
while ((n = gunzip.read(buffer)) > -1) {
out.write(buffer, 0, n);
}
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("gzip decompress error", e);
}
}
}
这东西就是一个工具,会用就行
所以总的流程是:我客户端发送之前,先根据我要调用 的方法,去zk上获得这个服务对应的ip:port,然后再根据这个ip:port获取对应的channel,
然后将请求发送给channel对面的服务端
服务端接收到了之后,就xxxx,然后执行handle,handle里面是method.invoke,然后结果result放到response里面
所以zk注册的其实是什么?我的服务名(也就是你要调用哪个类里的哪个方法)和地址(也就是ip:port)
明白了
我们看到handle里面
public Object handle(RpcRequest rpcRequest) {
Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
return invokeTargetMethod(rpcRequest, service);
}
先是获取了service对象
service其实就是一个Object:
public class RpcServiceConfig {
/**
* service version
*/
private String version = "";
/**
* when the interface has multiple implementation classes, distinguish by group
*/
private String group = "";
/**
* target service
*/
private Object service;
public String getRpcServiceName() {
return this.getServiceName() + this.getGroup() + this.getVersion();
}
public String getServiceName() {
return this.service.getClass().getInterfaces()[0].getCanonicalName();
}
}
那么问题来了
,service是什么时候实例化的呢,我们有提到, 这个项目是用注解实现自动注册服务的
那么我们看这里:
SpringBeanPostProcessor 类里面的
@SneakyThrows
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().isAnnotationPresent(RpcService.class)) {
log.info("[{}] is annotated with [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());
// get RpcService annotation
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
// build RpcServiceProperties
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(rpcService.group())
.version(rpcService.version())
.service(bean).build();
serviceProvider.publishService(rpcServiceConfig);
}
return bean;
}
也就是说,如果一个类上面有@RpcService注解,那么他就会通过service(bean).build(); 这个时候,service被实例化为当前类