再看rpc之网络传输


前言

学完了黑马netty,再来看rpc,很多东西迎刃而解

实体类

RpcMessage

就是用来表示消息的,

java
public class RpcMessage {

    /**
     * rpc message type
     */
    private byte messageType;
    /**
     * serialization type
     */
    private byte codec;
    /**
     * compress type
     */
    private byte compress;
    /**
     * request id
     */
    private int requestId;
    /**
     * request data
     */
    private Object data;

}

RpcRequest

java
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 1905122041950251207L;
    private String requestId;//RPC请求的唯一标识符
    private String interfaceName;//RPC调用的接口名称
    private String methodName;//要调用的方法名称
    private Object[] parameters;//要传递给方法的参数数组
    private Class<?>[] paramTypes;//与参数对应的参数类型数组
    private String version;// 字段(服务版本)主要是为后续不兼容升级提供可能
    private String group;//用于处理一个接口有多个类实现的情况。

    public String getRpcServiceName() {
        return this.getInterfaceName() + this.getGroup() + this.getVersion();
    }
}

RpcResponse

java
public class RpcResponse<T> implements Serializable {

    private static final long serialVersionUID = 715745410605631233L;
    private String requestId;//RPC响应的唯一标识符
    /**
     * response code
     */
    private Integer code;
    /**
     * response message
     */
    private String message;
    /**
     * response body
     */
    private T data;

    public static <T> RpcResponse<T> success(T data, String requestId) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());
        response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());
        response.setRequestId(requestId);
        if (null != data) {
            response.setData(data);
        }
        return response;
    }

    public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setCode(rpcResponseCodeEnum.getCode());
        response.setMessage(rpcResponseCodeEnum.getMessage());
        return response;
    }

}

success和fail都很简单,就是根据成功与否去设置消息的内容

Netty客户端

NettyRpcClient

java
@Slf4j
public final class NettyRpcClient implements RpcRequestTransport {
    private final ServiceDiscovery serviceDiscovery;//服务发现的实例,用于查找服务的地址
    private final UnprocessedRequests unprocessedRequests;//未处理的请求实例,用于存储未完成的RPC请求。
    private final ChannelProvider channelProvider;//通道提供者的实例,用于管理和提供与服务器地址相关联的通道。
    private final Bootstrap bootstrap;//Netty的Bootstrap实例,用于配置和初始化Netty客户端。
    private final EventLoopGroup eventLoopGroup;//Netty的EventLoopGroup实例,用于处理事件循环。

    public NettyRpcClient() {
        // initialize resources such as EventLoopGroup, Bootstrap
        eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                //  The timeout period of the connection.
                //  If this time is exceeded or the connection cannot be established, the connection fails.
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline p = ch.pipeline();
                        // If no data is sent to the server within 15 seconds, a heartbeat request is sent心跳机制
                        p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));//用于处理连接的空闲状态
                        p.addLast(new RpcMessageEncoder());
                        p.addLast(new RpcMessageDecoder());
                        p.addLast(new NettyRpcClientHandler());
                    }
                });
        this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());
        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
        this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
    }

    /**
     * connect server and get the channel ,so that you can send rpc message to server
     *
     * @param inetSocketAddress server address
     * @return the channel
     */
    @SneakyThrows
    public Channel doConnect(InetSocketAddress inetSocketAddress) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
                completableFuture.complete(future.channel());
            } else {
                throw new IllegalStateException();
            }
        });
        return completableFuture.get();
    }

    @Override
    public Object sendRpcRequest(RpcRequest rpcRequest) {
        // build return value
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();//这个异步操作的结果是一个包含泛型类型为 Object 的 RpcResponse 对象。
        // get server address
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);//使用 serviceDiscovery(服务发现)查找并获取RPC请求的服务地址。
        // get  server address related channel
        Channel channel = getChannel(inetSocketAddress);//通过 getChannel 方法获取与指定服务器地址关联的 Channel。
        if (channel.isActive()) {
            // put unprocessed request
            unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);//将 RPC 请求放入未处理请求的集合中,以便后续处理响应。
            RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
                    .codec(SerializationTypeEnum.HESSIAN.getCode())
                    .compress(CompressTypeEnum.GZIP.getCode())
                    .messageType(RpcConstants.REQUEST_TYPE).build();
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    log.info("client send message: [{}]", rpcMessage);
                } else {
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                    log.error("Send failed:", future.cause());
                }
            });
        } else {
            throw new IllegalStateException();
        }

        return resultFuture;
    }

    public Channel getChannel(InetSocketAddress inetSocketAddress) {
        Channel channel = channelProvider.get(inetSocketAddress);
        if (channel == null) {
            channel = doConnect(inetSocketAddress);
            channelProvider.set(inetSocketAddress, channel);
        }
        return channel;
    }

    public void close() {
        eventLoopGroup.shutdownGracefully();
    }
}

初始化的部分很简单 ,我这里是用了ExtensionLoader和SingletonFactory去获取对象,这个然后单说]

来看看sendRequests

java
public Object sendRpcRequest(RpcRequest rpcRequest) {
    // build return value
    CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();//这个异步操作的结果是一个包含泛型类型为 Object 的 RpcResponse 对象。
    // get server address
    InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);//使用 serviceDiscovery(服务发现)查找并获取RPC请求的服务地址。
    // get  server address related channel
    Channel channel = getChannel(inetSocketAddress);//通过 getChannel 方法获取与指定服务器地址关联的 Channel。
    if (channel.isActive()) {
        // put unprocessed request
        unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);//将 RPC 请求放入未处理请求的集合中,以便后续处理响应。
        RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
                .codec(SerializationTypeEnum.HESSIAN.getCode())
                .compress(CompressTypeEnum.GZIP.getCode())
                .messageType(RpcConstants.REQUEST_TYPE).build();
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                log.info("client send message: [{}]", rpcMessage);
            } else {
                future.channel().close();
                resultFuture.completeExceptionally(future.cause());
                log.error("Send failed:", future.cause());
            }
        });
    } else {
        throw new IllegalStateException();
    }

    return resultFuture;//在这里不对响应进行处理,而是到unprocessedRequests的complete方法里面才处理,也就是说这里返回的resultFuture啥也没有
}

注意这里的resultFuture的用法,他是在unprocessedRequests里才处理的

unprocessedRequests

其实就是一个map,将对应的requeid和用来存储响应的resultFuture放进map里面

这样的好处就是:

如果在send发送完数据之后对channel进行手动阻塞等待返回Response,这样会导致Event Loop阻塞导致效率下降。优化后使用CompletableFuture包装Response实现线程同步,避免了Event Loop阻塞。

java
public class UnprocessedRequests {
    public static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES=new ConcurrentHashMap<>();
    public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {
        UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);
    }

    public void complete(RpcResponse<Object> rpcResponse)
    {
        CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());
        if(null!=future)
        {
            future.complete(rpcResponse);//在这个时候,future里面才存入了rpcResponse
        }else
        {
            throw new IllegalArgumentException();
        }
    }
}

NettyRpcClientHandler

这个处理器被加到rpcclient的channel.pipeline()的最后面

自定义处理器,继承了ChannelInboundHandlerAdapter 说明它是一个入站处理器

重写了三个方法:

java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {//这个时候已经是经过解码了的所以可以直接判断他的类型
    try {
        log.info("client receive msg: [{}]", msg);
        if (msg instanceof RpcMessage) {//判断接收到的消息是否是RpcMessage类型
            RpcMessage tmp = (RpcMessage) msg;
            byte messageType = tmp.getMessageType();//获取RpcMessage中的messageType字段,用于确定消息的类型
            if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
                log.info("heart [{}]", tmp.getData());
            } else if (messageType == RpcConstants.RESPONSE_TYPE) {
                RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();
                unprocessedRequests.complete(rpcResponse);
            }
        }
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

将已经经过解码了的msg处理,如果是心跳消息,那么打印日志,不做其他处理,如果是响应的话,那么就把他转成Rpcrespnse对象,然后调用complete方法,来存到对应的future里面

java
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {//当触发了用户自定义事件时被调用

    if (evt instanceof IdleStateEvent) {
        IdleState state = ((IdleStateEvent) evt).state();
        if (state == IdleState.WRITER_IDLE) {//写空闲状态表示一段时间内没有向服务器发送数据。,则发送心跳请求以维持与服务器的连接
            log.info("write idle happen [{}]", ctx.channel().remoteAddress());
            Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());
            RpcMessage rpcMessage = new RpcMessage();
            rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
            rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
            rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
            rpcMessage.setData(RpcConstants.PING);
            channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);///这个监听器的作用是,在发送心跳请求时如果出现问题,就关闭当前通道,以便后续可能重新建立连接。
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

触发了用户自定义事件去调用,就是如果一段时间没有向服务器发送数据,就发送一个心跳请求,去维持链接

java
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    log.error("client catch exception:", cause);
    cause.printStackTrace();
    ctx.close();
}

这个就是捕获异常的,没什么

ChannelProvider

管理channel的,每一个ip:port(也就是注册的服务)有一个channel

有三个方法

java
private final Map<String, Channel> channelMap;//使用 ConcurrentHashMap 作为存储通道映射关系的容器。ConcurrentHashMap 是线程安全的,适用于并发环境。

public ChannelProvider() {
    channelMap = new ConcurrentHashMap<>();
}

public Channel get(InetSocketAddress inetSocketAddress) {
    String key = inetSocketAddress.toString();
    // determine if there is a connection for the corresponding address
    if (channelMap.containsKey(key)) {
        Channel channel = channelMap.get(key);
        // if so, determine if the connection is available, and if so, get it directly
        if (channel != null && channel.isActive()) {
            return channel;
        } else {
            channelMap.remove(key);
        }
    }
    return null;
}

get:获得channel

set:设置

java
public void set(InetSocketAddress inetSocketAddress, Channel channel){
    String key=inetSocketAddress.toString();
    channelMap.put(key,channel);
}

remove:删除,但是没用到

java
public void remove(InetSocketAddress inetSocketAddress) {
    String key = inetSocketAddress.toString();
    channelMap.remove(key);
    log.info("Channel map size :[{}]", channelMap.size());
}

Netty服务端

NettyRpcServer

主要就是启动服务端,然后提供了一个手动注册zk服务的方法,当然我们以后是用注解的方式注册服务

然后显式的提供了bossgroup,workergroup和defauxxx,defaxxx就是用来处理耗时的事件

静态变量

java
public static final int PORT = 9998;

private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);

public void registerService(RpcServiceConfig rpcServiceConfig) {
    serviceProvider.publishService(rpcServiceConfig);
}//显式注册服务的方法

start

java
public void start() {
    CustomShutdownHook.getCustomShutdownHook().clearAll();//这一行代码清理了之前设置的关闭钩子,确保在服务器关闭时资源能够得到释放。
    String host = InetAddress.getLocalHost().getHostAddress();// 获取了服务器主机的 IP 地址。
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);//创建了用于接收客户端连接的主事件循环组 bossGroup,参数 1 表示线程数为 1。
    EventLoopGroup workerGroup = new NioEventLoopGroup();//创建了用于处理客户端请求的工作事件循环组 workerGroup,采用默认线程数。

    DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
            RuntimeUtil.cpus() * 2,
            ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false)
    );
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 是否开启 TCP 底层心跳机制
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                .option(ChannelOption.SO_BACKLOG, 128)
                .handler(new LoggingHandler(LogLevel.INFO))
                // 当客户端第一次进行请求的时候才会进行初始化
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        // 30 秒之内没有收到客户端请求的话就关闭连接
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                        p.addLast(new RpcMessageEncoder());
                        p.addLast(new RpcMessageDecoder());
                        p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());//这个handler就会在serviceHandlerGroup上执行
                    }
                });

        // 绑定端口,同步等待绑定成功
        ChannelFuture f = b.bind(host, PORT).sync();
        // 等待服务端监听端口关闭
        f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.error("occur exception when start server:", e);
    } finally {
        log.error("shutdown bossGroup and workerGroup");
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        serviceHandlerGroup.shutdownGracefully();
    }
}

NettyRpcServerHandler

继承了 ChannelInboundHandlerAdapter,也是一个入站处理器

拥有一个RpcRequestHandler这个处理器就是去执行我们调用的方法的,严格来说不是什么handler,只是叫这个名字

重写了三个方法:

channelRead

发生读事件执行,这里就是判断客户端发来的是什么请求,是心跳请求返回pong,否则就是执行方法的请求,调用handler执行,然后把结果返回给客户端

java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try
    {
        if(msg instanceof RpcMessage)
        {
            log.info("server receive msg: [{}] ", msg);
            byte messageType = ((RpcMessage) msg).getMessageType();
            RpcMessage rpcMessage = new RpcMessage();
            rpcMessage.setCodec(SerializationTypeEnum.HESSIAN.getCode());
            rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
         //如果是心跳请求,那么就返回个pong回去
            if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
                rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
                rpcMessage.setData(RpcConstants.PONG);
            }else {//否则就是有内容的了
                RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
                // Execute the target method (the method the client needs to execute) and return the method result
                Object result = rpcRequestHandler.handle(rpcRequest);//执行请求,获得结果
                log.info(String.format("server get result: %s", result.toString()));
                rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
                //把结果写到response里面
                if(ctx.channel().isActive()&&ctx.channel().isWritable())
                {
                    RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
                    rpcMessage.setData(rpcResponse);
                }else//否则失败了,把失败的结果写到里面
                {
                    RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
                    rpcMessage.setData(rpcResponse);
                    log.error("not writable now, message dropped");
                }
            }
            ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);//把结果写回给客户端
        }
    }finally {
        ReferenceCountUtil.release(msg);
    }
}

还有两个比较简单,不分析了

RpcRequestHandler

这个在handler包下,用于真正去调用方法,并不是pipeline里面的handler

java
@Slf4j
public class RpcRequestHandler {
    private final ServiceProvider serviceProvider;

    public RpcRequestHandler() {
        serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
    }

    /**
     * Processing rpcRequest: call the corresponding method, and then return the method
     */
    public Object handle(RpcRequest rpcRequest) {
        Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
        return invokeTargetMethod(rpcRequest, service);
    }

    /**
     * get method execution results
     *
     * @param rpcRequest client request
     * @param service    service object
     * @return the result of the target method execution
     */
    private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
        Object result;
        try {
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            result = method.invoke(service, rpcRequest.getParameters());
            log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
        } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
            throw new RpcException(e.getMessage(), e);
        }
        return result;
    }
}

这里理一理zk和服务端的关系:

每个服务提供者(服务端)都会在启动时向 ZooKeeper 注册自己的信息,例如服务名称、IP 地址、端口号等。这样,ZooKeeper 中就会维护一个服务实例的注册表,记录了所有可用的服务提供者及其相关信息。

客户端在需要调用某个服务时,会向 ZooKeeper 发起查询请求,获取相关服务的信息。然后,客户端可以选择一个可用的服务实例进行调用。

关于zK的,到了common里面细说

传输协议和编解码器

这部分看自学rpc之传输协议 | 说的 (gusmallwhite.github.io)即可

根之前一样,还是比较好理解 的

然后序列化用的是kryo序列化,这里看傻逼guide的就行

关于compress

项目里有个压缩的东西,爷没弄没搞明白

应该就是把东西压缩和解压缩

java
public interface Compress {
    byte[] compress(byte[] bytes);


    byte[] decompress(byte[] bytes);
}

GzipCompress给出了实现

使用gzip进行压缩

java
public class GzipCompress implements Compress {


    private static final int BUFFER_SIZE = 1024 * 4;

    @Override
    public byte[] compress(byte[] bytes) {
        if (bytes == null) {
            throw new NullPointerException("bytes is null");
        }
        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
             GZIPOutputStream gzip = new GZIPOutputStream(out)) {
            gzip.write(bytes);
            gzip.flush();
            gzip.finish();
            return out.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("gzip compress error", e);
        }
    }

    @Override
    public byte[] decompress(byte[] bytes) {
        if (bytes == null) {
            throw new NullPointerException("bytes is null");
        }
        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
             GZIPInputStream gunzip = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
            byte[] buffer = new byte[BUFFER_SIZE];
            int n;
            while ((n = gunzip.read(buffer)) > -1) {
                out.write(buffer, 0, n);
            }
            return out.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("gzip decompress error", e);
        }
    }
}

这东西就是一个工具,会用就行

所以总的流程是:我客户端发送之前,先根据我要调用 的方法,去zk上获得这个服务对应的ip:port,然后再根据这个ip:port获取对应的channel,

然后将请求发送给channel对面的服务端

服务端接收到了之后,就xxxx,然后执行handle,handle里面是method.invoke,然后结果result放到response里面

所以zk注册的其实是什么?我的服务名(也就是你要调用哪个类里的哪个方法)和地址(也就是ip:port)

明白了

我们看到handle里面

java
public Object handle(RpcRequest rpcRequest) {
     Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
     return invokeTargetMethod(rpcRequest, service);
 }

先是获取了service对象

service其实就是一个Object:

java
public class RpcServiceConfig {
    /**
     * service version
     */
    private String version = "";
    /**
     * when the interface has multiple implementation classes, distinguish by group
     */
    private String group = "";

    /**
     * target service
     */
    private Object service;

    public String getRpcServiceName() {
        return this.getServiceName() + this.getGroup() + this.getVersion();
    }

    public String getServiceName() {
        return this.service.getClass().getInterfaces()[0].getCanonicalName();
    }
}

那么问题来了

,service是什么时候实例化的呢,我们有提到, 这个项目是用注解实现自动注册服务的

那么我们看这里:

none
SpringBeanPostProcessor 类里面的
java
@SneakyThrows
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    if (bean.getClass().isAnnotationPresent(RpcService.class)) {
        log.info("[{}] is annotated with  [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());
        // get RpcService annotation
        RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
        // build RpcServiceProperties
        RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
                .group(rpcService.group())
                .version(rpcService.version())
                .service(bean).build();
        serviceProvider.publishService(rpcServiceConfig);
    }
    return bean;
}

也就是说,如果一个类上面有@RpcService注解,那么他就会通过service(bean).build(); 这个时候,service被实例化为当前类