自学和再看rpc之注册中心


前言

guide司马了

注册中心负责服务地址的注册与查找,相当于目录服务。 服务端启动的时候将服务名称及其对应的地址(ip+port)注册到注册中心,服务消费端根据服务名称找到对应的服务地址。有了服务地址之后,服务消费端就可以通过网络请求服务端了。

服务名称是什么,就是哪个类的哪个方法

第一遍自学和第二遍再看,发现其实差不多,主要第一遍的时候CuratorUtils类没看,这里看一看

ServiceDiscovery

@SPI
public interface ServiceDiscovery {
    /**
     * lookup service by rpcServiceName
     *
     * @param rpcRequest rpc service pojo
     * @return service address
     */
    InetSocketAddress lookupService(RpcRequest rpcRequest);
}

就是个普通的接口,这边是准备实现服务发现,就是找到这个服务,这个函数在NettyClient有用到,废话因为你得找到往哪里发请求啊

ServiceRegistry

@SPI
public interface ServiceRegistry {
    /**
     * register service
     *
     * @param rpcServiceName    rpc service name
     * @param inetSocketAddress service address
     */
    void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);

}

注册服务,这个函数呢,一个是NettyServer有,这个是手动注册服务的

还有一个是在SpringBeanPostProcessor里面,这里是运用了扫描,然后自动注册服务,重点关注第二种

ZkServiceRegistryImpl

@Slf4j
public class ZkServiceRegistryImpl implements ServiceRegistry {

    @Override
    public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
        String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
        CuratorFramework zkClient = CuratorUtils.getZkClient();//获得Curator客户端
        CuratorUtils.createPersistentNode(zkClient, servicePath);//创建持久节点
    }
}

这里依赖了curatorutils类,这个类的东西,见对应文章

所谓注册服务,就是在zk里面创建一个对应的持久节点

当我们的服务被注册进 zookeeper 的时候,我们将完整的服务名称 rpcServiceName (class name+group+version)作为根节点 ,子节点是对应的服务地址(ip+端口号)。

如果我们要获得某个服务对应的地址的话,就直接根据完整的服务名称来获取到其下的所有子节点,然后通过具体的负载均衡策略取出一个就可以了。相关代码如下在 ZkServiceDiscovery.java中已经给出。

ZkServiceDiscoveryImpl

成员变量

private final LoadBalance loadBalance;

    public ZkServiceDiscoveryImpl() {
        this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(LoadBalanceEnum.LOADBALANCE.getName());
    }

负载均衡算法,这一部分看相关博客

lookupservice方法

主要作用是在给定的服务名称下查找可用的服务实例,使用负载均衡算法选择一个实例,最终返回该实例的网络地址。

这个负载均衡是 从给定的服务名称里面,选择一个合适的节点一个实例对应的有一个父节点,下面的子节点全是他对应的服务

@Override
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
    String rpcServiceName = rpcRequest.getRpcServiceName();//获得名字
    CuratorFramework zkClient = CuratorUtils.getZkClient();//获得curator实例
    List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);//获取指定服务名称下的所有子节点,即服务实例列表
    if (CollectionUtil.isEmpty(serviceUrlList)) {
        throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
    }
    // load balancing
    String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);//通过负载均衡算法选择一个服务实例
    log.info("Successfully found the service address:[{}]", targetServiceUrl);
    String[] socketAddressArray = targetServiceUrl.split(":");//分为主机和端口号
    String host = socketAddressArray[0];//主机
    int port = Integer.parseInt(socketAddressArray[1]);//端口
    return new InetSocketAddress(host, port);//构成地址
}

负载均衡算法在loadbalance里,见别的博客

CuratorUtils

就是创建节点的工具类,包装了一些方法

静态变量

private static  final  int BASE_SLEEP_TIME=1000;
private static final int MAX_RETRIES=3;
public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc";//根目录
private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();//服务对应的地址,一个服务可以对应对个地址
private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet();//注册了的路径
private static CuratorFramework zkClient;//zk提供给我们的东西,用这个进行操作,这个就是zk客户端
private static final String DEFAULT_ZOOKEEPER_ADDRESS = "127.0.0.1:2181";//zk运行的地址

createPersistentNode

看名字也看得出来,就是创建持久节点

关键:zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);

其他代码只是做个检查罢了

public static void createPersistentNode(CuratorFramework zkClient,String path)//创建一个持久节点,参数就是工具和你要注册的节点的路径
{
    try
    {
        if(REGISTERED_PATH_SET.contains(path)||zkClient.checkExists().forPath(path)!=null)//判断是不是已经被注册或者已经有这个结点了
        {
            log.info("The node already exists. The node is:[{}]", path);
        }else
        {
            zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
            log.info("The node was created successfully. The node is:[{}]", path);
        }
        REGISTERED_PATH_SET.add(path);
    }catch (Exception e)
    {
        log.error("create persistent node for path [{}] fail", path);
    }
}

持久节点什么意思:当链接断开的时候,这个结点不被删除

getChildrenNodes

看名字就看得出来,获取某个节点的所有子节点路径

关键:result = zkClient.getChildren().forPath(servicePath);

public static List<String> getChildrenNodes(CuratorFramework zkClient,String rpcServiceName)//获取某个节点的所有子节点路径
{
    if(SERVICE_ADDRESS_MAP.containsKey(rpcServiceName))//如果map里已经有了,那就直接返回就行
    {
        return SERVICE_ADDRESS_MAP.get(rpcServiceName);
    }
    //如果没有,我们自己创造
    List<String>result=null;
    String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
    try{
        SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
        result=zkClient.getChildren().forPath(servicePath);
        registerWatcher(rpcServiceName, zkClient);//给这个结点注册监听器,注册了监听器之后,这个节点的子节点发生变化比如增加、减少或者更新的时候,你可以自定义回调操作。
    }catch (Exception e)
    {
        log.error("get children nodes for path [{}] fail", servicePath);
    }
    return  result;
}

注意还有一个增加监听器的操作,这个函数在下面

clearRegistry

这个也简单,就是删除节点,相当于清除注册

这里就不写了

getZkClient

获得zk客户端,这里就是给zkclient初始化

从配置文件读取zk地址

 Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;

最终要么使用默认地址,要么从配置文件读取

这部分是todo,我还没看

public static CuratorFramework getZkClient()
{
    Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
    String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
    //如果得到了并且是被启动了的 ,直接返回就行了
    if(zkClient!=null&&zkClient.getState()== CuratorFrameworkState.STARTED)
    {
        return  zkClient;
    }
    //设置重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
    zkClient = CuratorFrameworkFactory.builder()
            // the server to connect to (can be a server list)
            .connectString(zookeeperAddress)
            .retryPolicy(retryPolicy)
            .build();
    zkClient.start();//启动
    try {
        // wait 30s until connect to the zookeeper
        if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
            throw new RuntimeException("Time out waiting to connect to ZK!");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return zkClient;
}

注意:

重试策略:ExponentialBackoffRetry是指数退避重试策略,这个然后背背八股

registerWatcher

注册监听器,这里设置的监听器回调事件是:

回调方法将会更新 SERVICE_ADDRESS_MAP,从而保持最新的服务地址列表。

public static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception{
    String servicepath=ZK_REGISTER_ROOT_PATH+rpcServiceName;
    PathChildrenCache pathChildrenCache=new PathChildrenCache(zkClient,servicepath,true);

    PathChildrenCacheListener pathChildrenCacheListener=((curatorFramework, pathChildrenCacheEvent) -> {
        List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicepath);//子节点出现变化了,那么就更新对应 的map
        SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
    });
    pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);//添加这个监听器
    pathChildrenCache.start();
}

Provider

有这么一个provider包,里面提供了注册相关服务的接口

ServiceProvider

一个接口,还没是注册服务的接口,因为我们这里只实现了用zk当注册中心,所以只有一个实现类

public interface ServiceProvider {

    /**
     * @param rpcServiceConfig rpc service related attributes
     */
    void addService(RpcServiceConfig rpcServiceConfig);

    /**
     * @param rpcServiceName rpc service name
     * @return service object
     */
    Object getService(String rpcServiceName);

    /**
     * @param rpcServiceConfig rpc service related attributes
     */
    void publishService(RpcServiceConfig rpcServiceConfig);

}

ZkServiceProviderImp

实现了zk的对外接口,后面我们要干什么只需要通过单例模式获得这个类,然后用这个类的方法就行了

静态变量

private final Map<String, Object> serviceMap;//存储服务名称和对象
    private final Set<String> registeredService;//存储已经注册了的服务的名称
    private final ServiceRegistry serviceRegistry;//用来注册的工具

publishService

发布服务,其实就是调用了方法去把他注册上去

public void publishService(RpcServiceConfig rpcServiceConfig) {
    try
    {
        String host= InetAddress.getLocalHost().getHostAddress();
        this.addService(rpcServiceConfig);//加到map里
        serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(),new InetSocketAddress(host,PORT));//调用方法注册,调用的这个方法做的事情其实就是新建了一个consistentnode然后把地址放进去
    }catch (UnknownHostException e)
    {
        log.error("{}",e);
    }
}

addService

只是放到数据结构里,不是真的注册了

public void addService(RpcServiceConfig rpcServiceConfig) {
    String rpcServiceName=rpcServiceConfig.getServiceName();
    if(registeredService.contains(rpcServiceName))
    {
        return;
    }
    registeredService.add(rpcServiceName);
    serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
    log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}

getService

简单,不写了


  目录