再看rpc之负载均衡算法


前言

这个负载均衡算法是用来干什么的

用来当服务发现时,从zk里面选择一个合适的节点,然后把这个结点存储的ip:port给返回掉

LoadBalance接口

里面就一个方法,就是选择一个ip:port

AbstractLoadBalance

一个抽象类,实现了LoadBalance接口,实现了selectServiceAddress方法

@Override
public String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
    if (CollectionUtil.isEmpty(serviceAddresses)) {
        return null;
    }
    if (serviceAddresses.size() == 1) {
        return serviceAddresses.get(0);
    }
    return doSelect(serviceAddresses, rpcRequest);
}

这里的逻辑是,如果你这个地址列表里面只有一个ip:port,那么就直接返回这个唯一的ip:port就行了

你知道这个List serviceAddresses是哪来的吗?还记得我们注册的时候有一个map,存储了一个父节点和对应所有的子节点的list,就是那个list

注意这里有个doSelect,这才是真正的选择逻辑

LoadBalancer包

里面包含了两个负载均衡算法

RandomLoadBalance

看名字就是随机负载均衡算法,就是随便选一个

public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
        Random random = new Random();
        return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
    }
}

ConsistentHashLoadBalance

一致性哈希算法,大名鼎鼎

里面有一个内部类ConsistentHashSelector

private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();

每一个服务有对应的一致性选择器

ConsistentHashSelector

静态变量

private final TreeMap<Long, String> virtualInvokers;//一个红黑树,用于存储哈希值和对应的虚拟节点
        private final int identityHashCode;//一致性哈希选择器的标识哈希码。这个标识哈希码的作用是在后续检测哈希环是否需要更新时使用。

我们先来看几个方法

md5

通过 MD5 算法计算给定键的哈希值。这里使用了 Java 提供的 MessageDigest 类来进行 MD5 计算。

static byte[] md5(String key)//md5算法计算哈希值
{
    MessageDigest md;
    try{
        md=MessageDigest.getInstance("MD5");//获取md5算法的实例
        byte[]bytes=key.getBytes(StandardCharsets.UTF_8);//设置字符集
        md.update(bytes);//把bytes传到md里面
    }catch (NoSuchAlgorithmException e)
    {
        throw new IllegalStateException(e.getMessage(), e);
    }
    return md.digest();//计算得哈希值
}

hash

刚刚md5函数返回的是一个bytes[],hash做的就是把他转为一个long类型的哈希值

static long hash(byte[] digest, int idx) {
    return ((long) (digest[3 + idx * 4] & 255) << 24 |
            (long) (digest[2 + idx * 4] & 255) << 16 |
            (long) (digest[1 + idx * 4] & 255) << 8 |
            (long) (digest[idx * 4] & 255)) & 4294967295L;
}

没啥好说的。。我也懒得细究

select

调用selectForKey方法选择一个虚拟节点

public String select(String rpcServiceKey) {
           byte[] digest = md5(rpcServiceKey);
           return selectForKey(hash(digest, 0));
       }

selectForKey

由给定的值返回一个虚拟节点,一致性哈希算法的核心部分

public String selectForKey(long hashCode)
       {
           Map.Entry<Long,String>entry=virtualInvokers.tailMap(hashCode,true).firstEntry();//tailmap返回key大于等于hashcode的键值对然后.firstEntry就是返回第一个大于等于他的
           if(entry==null)//我们并没有一个实际的环,所以如果hashcode比所有的key都大,那么我们就返回所有entry里的第一个entry
           {
               entry=virtualInvokers.firstEntry();
           }
           return  entry.getValue();//返回虚拟节点对应的服务提供者
       }

构造函数

用于初始化虚拟节点的哈希环

ConsistentHashSelector(List<String> invokers, int replicaNumber/*每个实际节点对应虚拟节点的数量*/, int identityHashCode) {
    this.virtualInvokers = new TreeMap<>();//用于存储虚拟节点和他对应的位置,也就是哈希值
    this.identityHashCode = identityHashCode;//保存一下这个东西,其实没啥用

    for (String invoker : invokers) {
        for (int i = 0; i < replicaNumber / 4; i++) {//一轮注册四个虚拟节点
            byte[] digest = md5(invoker + i);//+i是为了区分不同的虚拟节点
            for (int h = 0; h < 4; h++) {//每个md5哈希值创造4个虚拟节点
                long m = hash(digest, h);//随着h的变化,每个虚拟节点其实是对应的digest的不同字节片段的哈希值
                virtualInvokers.put(m, invoker);//放到红黑树里面,就相当于注册虚拟节点了
            }
        }
    }
}

doSelect方法

看完上面的就都很好理解了

@Override
   protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
       int identityHashCode = System.identityHashCode(serviceAddresses);
       // build rpc service name by rpcRequest
       String rpcServiceName = rpcRequest.getRpcServiceName();
       ConsistentHashSelector selector = selectors.get(rpcServiceName);
       // check for updates,不一致那么就更新,比如有过节点添加删除的情况
       if (selector == null || selector.identityHashCode != identityHashCode) {
           selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
           selector = selectors.get(rpcServiceName);
       }
       return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));//调用selector返回
   }

  目录