自学rpc之优化


前言

所谓的优化就是加上了CompletableFuture

分析

我们看之前的代码

这里是手动阻塞等待,不行

然后这边要获得就是

这里就是把atrributemap绑定到channel里了

不行,每次都要手动阻塞

优化-存

修改之后,用哪个什么什么future

@Override
    public Object sendRpcRequest(RpcRequest rpcRequest) {
        // build return value
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();//这个异步操作的结果是一个包含泛型类型为 Object 的 RpcResponse 对象。
        // get server address
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);//使用 serviceDiscovery(服务发现)查找并获取RPC请求的服务地址。
        // get  server address related channel
        Channel channel = getChannel(inetSocketAddress);//通过 getChannel 方法获取与指定服务器地址关联的 Channel。
        if (channel.isActive()) {
            // put unprocessed request
            unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);//将 RPC 请求放入未处理请求的集合中,以便后续处理响应。
            RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
                    .codec(SerializationTypeEnum.HESSIAN.getCode())
                    .compress(CompressTypeEnum.GZIP.getCode())
                    .messageType(RpcConstants.REQUEST_TYPE).build();
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    log.info("client send message: [{}]", rpcMessage);
                } else {
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                    log.error("Send failed:", future.cause());
                }
            });
        } else {
            throw new IllegalStateException();
        }

        return resultFuture;
    }

我们看啊,它先是,创建了一个CompletableFuture对象,这个对象又存储了RpcResponse,也就是说你的响应会存到这个里面

CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();//

然后他把这个放到了未处理的map里

unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture)

这个类的代码: 其实就是包装了一个map,加上一个方法调用future.complete

public class UnprocessedRequests {
    private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();

    public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
        UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
    }

    public void complete(RpcResponse<Object> rpcResponse) {
        CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
        if (null != future) {
            future.complete(rpcResponse);//调用 future.complete(rpcResponse),将 rpcResponse 设置为 future 的结果。此时,与 future 相关的任何代码,例如使用 future.get() 或注册回调,都将得到这个完成的 RPC 响应作为结果。
        } else {
            throw new IllegalStateException();
        }
    }
}

然后直接返回了

return resultFuture;

这个resultFuture里就有response了

优化-处理

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        log.info("client receive msg: [{}]", msg);
        if (msg instanceof RpcMessage) {//判断接收到的消息是否是RpcMessage类型
            RpcMessage tmp = (RpcMessage) msg;
            byte messageType = tmp.getMessageType();//获取RpcMessage中的messageType字段,用于确定消息的类型
            if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
                log.info("heart [{}]", tmp.getData());
            } else if (messageType == RpcConstants.RESPONSE_TYPE) {
                RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();
                unprocessedRequests.complete(rpcResponse);
            }
        }
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

注意看这里:

unprocessedRequests.complete(rpcResponse);

看上面的代码,他先会根据rpcResponse获取对应的requestId,然后根据id获取他之前存的future对象,然后进行complete

这个complete其实是调用了

future.complete(rpcResponse);

当调用 future.complete(rpcResponse) 后,后续通过 future.get() 获取的值就是 rpcResponse

就像这样:

CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) clientTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();

好在哪:
如果在send发送完数据之后对channel进行手动阻塞等待返回Response,这样会导致Event Loop阻塞导致效率下降。优化后使用CompletableFuture包装Response实现线程同步,避免了Event Loop阻塞。

评价

guide司马了,等老子学完netty再来写清晰一点


  目录