Linux就该这么学
kaka的gravatar头像
kaka2018-03-13 15:16:29
dubbo负载均衡策略以及自定义负载均衡

最近在用dubbo做服务治理,用到了负载均衡,看了下dubbo的源码,整理下。

dubbo的负载均衡类图如下:

dubbo负载均衡策略以及自定义负载均衡

LoadBalance是顶层接口,提供了唯一的接口方法select,如下:

dubbo负载均衡策略以及自定义负载均衡

标注为@SPI的注解,只有标有@SPI注解的接口类才会查找扩展点的实现,dubbo依次从下面这三个路径读取扩展点文件:META-INF/dubbo/internal 、META-INF/dubbo/ 、META-INF/services/,其中dubbo内部实现的各种扩展文件都放在META-INF/dubbo/internal目录下面,如下定义

dubbo负载均衡策略以及自定义负载均衡

所以我们如果要动态扩展LoadBalance,只需要实现该接口,然后将全类名加入到扩展点即可。

AbstractLoadBalance:抽象类,实现了一些通用的权重计算方法,具体的负载均衡交给子类去实现doSelect方法,如下:

dubbo负载均衡策略以及自定义负载均衡

dubbo提供了四种负载均衡策略,如下:

dubbo负载均衡策略以及自定义负载均衡

下面一一介绍这四种负载均衡策略

1.RandomLoadBalance:按权重随机调用,这种方式是dubbo默认的负载均衡策略,源码如下:

实现思路很简单:如果服务多实例权重相同,则进行随机调用;如果权重不同,按照总权重取随机数

根据总权重数生成一个随机数,然后和具体服务实例的权重进行相减做偏移量,然后找出偏移量小于0的,比如随机数为10,某一个服务实例的权重为12,那么10-12=-2<0成立,则该服务被调用,这种策略在随机的情况下尽可能保证权重大的服务会被随机调用。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int totalWeight = 0; // 总权重
        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }

2.RoundRobinLoadBalance:轮询,按公约后的权重设置轮询比率

实现思路:首先计算出多服务实例的最大最小权重,如果权重都一样(maxWeight=minWeight),则直接取模轮询;如果权重不一样,每一轮调用,都计算出一个基础的权重值,然后筛选出权重值大于基础权重值得invoker进行取模随机调用。

private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    int length = invokers.size(); // 总个数
    int maxWeight = 0; // 最大权重
    int minWeight = Integer.MAX_VALUE; // 最小权重
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        maxWeight = Math.max(maxWeight, weight); // 累计最大权重
        minWeight = Math.min(minWeight, weight); // 累计最小权重
    }
    if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
        AtomicPositiveInteger weightSequence = weightSequences.get(key);
        if (weightSequence == null) {
            weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
            weightSequence = weightSequences.get(key);
        }
        int currentWeight = weightSequence.getAndIncrement() % maxWeight;
        List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
        for (Invoker<T> invoker : invokers) { // 筛选权重大于当前权重基数的Invoker
            if (getWeight(invoker, invocation) > currentWeight) {
                weightInvokers.add(invoker);
            }
        }
        int weightLength = weightInvokers.size();
        if (weightLength == 1) {
            return weightInvokers.get(0);
        } else if (weightLength > 1) {
            invokers = weightInvokers;
            length = invokers.size();
        }
    }
    AtomicPositiveInteger sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new AtomicPositiveInteger());
        sequence = sequences.get(key);
    }
    // 取模轮循
    return invokers.get(sequence.getAndIncrement() % length);
}

3.LeastActiveLoadBalance:最少活跃次数,dubbo框架自定义了一个Filter,用于计算服务被调用的次数,具体实现自己可以看源码

dubbo负载均衡策略以及自定义负载均衡

最小活跃次数思路:首先查找最小活跃数的服务并统计权重和出现的频次,如果最小活跃次数只出现一次,直接使用该服务;如果出现多次且权重不相同,则按照总权重数随机;如果出现多次且权重相同,则随机调用

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int leastActive = -1; // 最小的活跃数
        int leastCount = 0; // 相同最小活跃数的个数
        int[] leastIndexs = new int[length]; // 相同最小活跃数的下标
        int totalWeight = 0; // 总权重
        int firstWeight = 0; // 第一个权重,用于于计算是否相同
        boolean sameWeight = true; // 是否所有权重相同
        for (int i = 0; i < length; i++) {
        	Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
            if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
                leastActive = active; // 记录最小活跃数
                leastCount = 1; // 重新统计相同最小活跃数的个数
                leastIndexs[0] = i; // 重新记录最小活跃数下标
                totalWeight = weight; // 重新累计总权重
                firstWeight = weight; // 记录第一个权重
                sameWeight = true; // 还原权重相同标识
            } else if (active == leastActive) { // 累计相同最小的活跃数
                leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标
                totalWeight += weight; // 累计总权重
                // 判断所有权重是否一样
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // 如果只有一个最小则直接返回
            return invokers.get(leastIndexs[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offsetWeight = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

4.ConsistentHashLoadBalance:一致性hash

一致性Hash负载均衡涉及到两个主要的配置参数为hash.arguments 与hash.nodes。

hash.arguments : 当进行调用时候根据调用方法的哪几个参数生成key,并根据key来通过一致性hash算法来选择调用结点

hash.nodes: 为结点的副本数。

 @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //获取调用方法名称
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        //生成调用列表的hashcode
        int identityHashCode = System.identityHashCode(invokers);
        //根据方法名key获取一致性hash选择器
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        //选择节点
        return selector.select(invocation);
    }
private static final class ConsistentHashSelector<T> {

        private final TreeMap<Long, Invoker<T>> virtualInvokers;//虚拟节点

        private final int                       replicaNumber;//副本数
        
        private final int                       identityHashCode;//调用节点的hashcode
        
        private final int[]                     argumentIndex;//参数索引数组

        public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = System.identityHashCode(invokers);
            URL url = invokers.get(0).getUrl();
            //获取所配置的虚拟节点数,默认为160个
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i ++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            //创建虚拟节点,对每一个Invoker生成replicaNumber个虚拟节点并存放于virtualInvokers中
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

      ..........省略..........

    }

5.自定义负载均衡策略

自定义类,只需要实现AbstractLoadBalance抽象类即可,然后将该类放入dubbo可发现的扩展点即可。


打赏

已有1人打赏

最代码官方的gravatar头像

分享到:

最近浏览
二十八画 LV58月6日
月亮星星
jerrylost LV27月30日
星星星星
zhuminghui LV57月26日
月亮星星
xiaoxinxin5202 LV17月20日
星星
ewf_momo LV157月12日
月亮月亮月亮星星星星星星
灰色灬海枫 LV27月8日
星星星星
451561 LV37月3日
星星星星星星
你说的我懂 LV26月13日
星星星星
sjjsjjs LV66月13日
月亮星星星星
没有梦想的咸鱼 LV126月1日
月亮月亮月亮
顶部客服微信二维码底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友