前言
guide司马了
注册中心负责服务地址的注册与查找,相当于目录服务。 服务端启动的时候将服务名称及其对应的地址(ip+port)注册到注册中心,服务消费端根据服务名称找到对应的服务地址。有了服务地址之后,服务消费端就可以通过网络请求服务端了。
服务名称是什么,就是哪个类的哪个方法
第一遍自学和第二遍再看,发现其实差不多,主要第一遍的时候CuratorUtils类没看,这里看一看
ServiceDiscovery
@SPI
public interface ServiceDiscovery {
/**
* lookup service by rpcServiceName
*
* @param rpcRequest rpc service pojo
* @return service address
*/
InetSocketAddress lookupService(RpcRequest rpcRequest);
}
就是个普通的接口,这边是准备实现服务发现,就是找到这个服务,这个函数在NettyClient有用到,废话因为你得找到往哪里发请求啊
ServiceRegistry
@SPI
public interface ServiceRegistry {
/**
* register service
*
* @param rpcServiceName rpc service name
* @param inetSocketAddress service address
*/
void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);
}
注册服务,这个函数呢,一个是NettyServer有,这个是手动注册服务的
还有一个是在SpringBeanPostProcessor里面,这里是运用了扫描,然后自动注册服务,重点关注第二种
ZkServiceRegistryImpl
@Slf4j
public class ZkServiceRegistryImpl implements ServiceRegistry {
@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorUtils.getZkClient();//获得Curator客户端
CuratorUtils.createPersistentNode(zkClient, servicePath);//创建持久节点
}
}
这里依赖了curatorutils类,这个类的东西,见对应文章
所谓注册服务,就是在zk里面创建一个对应的持久节点
当我们的服务被注册进 zookeeper 的时候,我们将完整的服务名称 rpcServiceName (class name+group+version)作为根节点 ,子节点是对应的服务地址(ip+端口号)。
如果我们要获得某个服务对应的地址的话,就直接根据完整的服务名称来获取到其下的所有子节点,然后通过具体的负载均衡策略取出一个就可以了。相关代码如下在 ZkServiceDiscovery.java中已经给出。
ZkServiceDiscoveryImpl
成员变量
private final LoadBalance loadBalance;
public ZkServiceDiscoveryImpl() {
this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(LoadBalanceEnum.LOADBALANCE.getName());
}
负载均衡算法,这一部分看相关博客
lookupservice方法
主要作用是在给定的服务名称下查找可用的服务实例,使用负载均衡算法选择一个实例,最终返回该实例的网络地址。
这个负载均衡是 从给定的服务名称里面,选择一个合适的节点,一个实例对应的有一个父节点,下面的子节点全是他对应的服务
@Override
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.getRpcServiceName();//获得名字
CuratorFramework zkClient = CuratorUtils.getZkClient();//获得curator实例
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);//获取指定服务名称下的所有子节点,即服务实例列表
if (CollectionUtil.isEmpty(serviceUrlList)) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);//通过负载均衡算法选择一个服务实例
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");//分为主机和端口号
String host = socketAddressArray[0];//主机
int port = Integer.parseInt(socketAddressArray[1]);//端口
return new InetSocketAddress(host, port);//构成地址
}
负载均衡算法在loadbalance里,见别的博客
CuratorUtils
就是创建节点的工具类,包装了一些方法
静态变量
private static final int BASE_SLEEP_TIME=1000;
private static final int MAX_RETRIES=3;
public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc";//根目录
private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();//服务对应的地址,一个服务可以对应对个地址
private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet();//注册了的路径
private static CuratorFramework zkClient;//zk提供给我们的东西,用这个进行操作,这个就是zk客户端
private static final String DEFAULT_ZOOKEEPER_ADDRESS = "127.0.0.1:2181";//zk运行的地址
createPersistentNode
看名字也看得出来,就是创建持久节点
关键:zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
其他代码只是做个检查罢了
public static void createPersistentNode(CuratorFramework zkClient,String path)//创建一个持久节点,参数就是工具和你要注册的节点的路径
{
try
{
if(REGISTERED_PATH_SET.contains(path)||zkClient.checkExists().forPath(path)!=null)//判断是不是已经被注册或者已经有这个结点了
{
log.info("The node already exists. The node is:[{}]", path);
}else
{
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
log.info("The node was created successfully. The node is:[{}]", path);
}
REGISTERED_PATH_SET.add(path);
}catch (Exception e)
{
log.error("create persistent node for path [{}] fail", path);
}
}
持久节点什么意思:当链接断开的时候,这个结点不被删除
getChildrenNodes
看名字就看得出来,获取某个节点的所有子节点路径
关键:result = zkClient.getChildren().forPath(servicePath);
public static List<String> getChildrenNodes(CuratorFramework zkClient,String rpcServiceName)//获取某个节点的所有子节点路径
{
if(SERVICE_ADDRESS_MAP.containsKey(rpcServiceName))//如果map里已经有了,那就直接返回就行
{
return SERVICE_ADDRESS_MAP.get(rpcServiceName);
}
//如果没有,我们自己创造
List<String>result=null;
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
try{
SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
result=zkClient.getChildren().forPath(servicePath);
registerWatcher(rpcServiceName, zkClient);//给这个结点注册监听器,注册了监听器之后,这个节点的子节点发生变化比如增加、减少或者更新的时候,你可以自定义回调操作。
}catch (Exception e)
{
log.error("get children nodes for path [{}] fail", servicePath);
}
return result;
}
注意还有一个增加监听器的操作,这个函数在下面
clearRegistry
这个也简单,就是删除节点,相当于清除注册
这里就不写了
getZkClient
获得zk客户端,这里就是给zkclient初始化
从配置文件读取zk地址
Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
最终要么使用默认地址,要么从配置文件读取
这部分是todo,我还没看
public static CuratorFramework getZkClient()
{
Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
//如果得到了并且是被启动了的 ,直接返回就行了
if(zkClient!=null&&zkClient.getState()== CuratorFrameworkState.STARTED)
{
return zkClient;
}
//设置重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
// the server to connect to (can be a server list)
.connectString(zookeeperAddress)
.retryPolicy(retryPolicy)
.build();
zkClient.start();//启动
try {
// wait 30s until connect to the zookeeper
if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Time out waiting to connect to ZK!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return zkClient;
}
注意:
重试策略:ExponentialBackoffRetry是指数退避重试策略,这个然后背背八股
registerWatcher
注册监听器,这里设置的监听器回调事件是:
回调方法将会更新 SERVICE_ADDRESS_MAP,从而保持最新的服务地址列表。
public static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception{
String servicepath=ZK_REGISTER_ROOT_PATH+rpcServiceName;
PathChildrenCache pathChildrenCache=new PathChildrenCache(zkClient,servicepath,true);
PathChildrenCacheListener pathChildrenCacheListener=((curatorFramework, pathChildrenCacheEvent) -> {
List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicepath);//子节点出现变化了,那么就更新对应 的map
SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
});
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);//添加这个监听器
pathChildrenCache.start();
}
Provider
有这么一个provider包,里面提供了注册相关服务的接口
ServiceProvider
一个接口,还没是注册服务的接口,因为我们这里只实现了用zk当注册中心,所以只有一个实现类
public interface ServiceProvider {
/**
* @param rpcServiceConfig rpc service related attributes
*/
void addService(RpcServiceConfig rpcServiceConfig);
/**
* @param rpcServiceName rpc service name
* @return service object
*/
Object getService(String rpcServiceName);
/**
* @param rpcServiceConfig rpc service related attributes
*/
void publishService(RpcServiceConfig rpcServiceConfig);
}
ZkServiceProviderImp
实现了zk的对外接口,后面我们要干什么只需要通过单例模式获得这个类,然后用这个类的方法就行了
静态变量
private final Map<String, Object> serviceMap;//存储服务名称和对象
private final Set<String> registeredService;//存储已经注册了的服务的名称
private final ServiceRegistry serviceRegistry;//用来注册的工具
publishService
发布服务,其实就是调用了方法去把他注册上去
public void publishService(RpcServiceConfig rpcServiceConfig) {
try
{
String host= InetAddress.getLocalHost().getHostAddress();
this.addService(rpcServiceConfig);//加到map里
serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(),new InetSocketAddress(host,PORT));//调用方法注册,调用的这个方法做的事情其实就是新建了一个consistentnode然后把地址放进去
}catch (UnknownHostException e)
{
log.error("{}",e);
}
}
addService
只是放到数据结构里,不是真的注册了
public void addService(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName=rpcServiceConfig.getServiceName();
if(registeredService.contains(rpcServiceName))
{
return;
}
registeredService.add(rpcServiceName);
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}
getService
简单,不写了