kaka的gravatar头像
kaka 2018-03-13 15:16:29
dubbo负载均衡策略以及自定义负载均衡

最近在用dubbo做服务治理,用到了负载均衡,看了下dubbo的源码,整理下。

dubbo的负载均衡类图如下:

dubbo负载均衡策略以及自定义负载均衡

LoadBalance是顶层接口,提供了唯一的接口方法select,如下:

dubbo负载均衡策略以及自定义负载均衡

标注为@SPI的注解,只有标有@SPI注解的接口类才会查找扩展点的实现,dubbo依次从下面这三个路径读取扩展点文件:META-INF/dubbo/internal 、META-INF/dubbo/ 、META-INF/services/,其中dubbo内部实现的各种扩展文件都放在META-INF/dubbo/internal目录下面,如下定义

dubbo负载均衡策略以及自定义负载均衡

所以我们如果要动态扩展LoadBalance,只需要实现该接口,然后将全类名加入到扩展点即可。

AbstractLoadBalance:抽象类,实现了一些通用的权重计算方法,具体的负载均衡交给子类去实现doSelect方法,如下:

dubbo负载均衡策略以及自定义负载均衡

dubbo提供了四种负载均衡策略,如下:

dubbo负载均衡策略以及自定义负载均衡

下面一一介绍这四种负载均衡策略

1.RandomLoadBalance:按权重随机调用,这种方式是dubbo默认的负载均衡策略,源码如下:

实现思路很简单:如果服务多实例权重相同,则进行随机调用;如果权重不同,按照总权重取随机数

根据总权重数生成一个随机数,然后和具体服务实例的权重进行相减做偏移量,然后找出偏移量小于0的,比如随机数为10,某一个服务实例的权重为12,那么10-12=-2<0成立,则该服务被调用,这种策略在随机的情况下尽可能保证权重大的服务会被随机调用。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int totalWeight = 0; // 总权重
        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }

2.RoundRobinLoadBalance:轮询,按公约后的权重设置轮询比率

实现思路:首先计算出多服务实例的最大最小权重,如果权重都一样(maxWeight=minWeight),则直接取模轮询;如果权重不一样,每一轮调用,都计算出一个基础的权重值,然后筛选出权重值大于基础权重值得invoker进行取模随机调用。

private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    int length = invokers.size(); // 总个数
    int maxWeight = 0; // 最大权重
    int minWeight = Integer.MAX_VALUE; // 最小权重
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        maxWeight = Math.max(maxWeight, weight); // 累计最大权重
        minWeight = Math.min(minWeight, weight); // 累计最小权重
    }
    if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
        AtomicPositiveInteger weightSequence = weightSequences.get(key);
        if (weightSequence == null) {
            weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
            weightSequence = weightSequences.get(key);
        }
        int currentWeight = weightSequence.getAndIncrement() % maxWeight;
        List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
        for (Invoker<T> invoker : invokers) { // 筛选权重大于当前权重基数的Invoker
            if (getWeight(invoker, invocation) > currentWeight) {
                weightInvokers.add(invoker);
            }
        }
        int weightLength = weightInvokers.size();
        if (weightLength == 1) {
            return weightInvokers.get(0);
        } else if (weightLength > 1) {
            invokers = weightInvokers;
            length = invokers.size();
        }
    }
    AtomicPositiveInteger sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new AtomicPositiveInteger());
        sequence = sequences.get(key);
    }
    // 取模轮循
    return invokers.get(sequence.getAndIncrement() % length);
}

3.LeastActiveLoadBalance:最少活跃次数,dubbo框架自定义了一个Filter,用于计算服务被调用的次数,具体实现自己可以看源码

dubbo负载均衡策略以及自定义负载均衡

最小活跃次数思路:首先查找最小活跃数的服务并统计权重和出现的频次,如果最小活跃次数只出现一次,直接使用该服务;如果出现多次且权重不相同,则按照总权重数随机;如果出现多次且权重相同,则随机调用

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int leastActive = -1; // 最小的活跃数
        int leastCount = 0; // 相同最小活跃数的个数
        int[] leastIndexs = new int[length]; // 相同最小活跃数的下标
        int totalWeight = 0; // 总权重
        int firstWeight = 0; // 第一个权重,用于于计算是否相同
        boolean sameWeight = true; // 是否所有权重相同
        for (int i = 0; i < length; i++) {
        	Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
            if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
                leastActive = active; // 记录最小活跃数
                leastCount = 1; // 重新统计相同最小活跃数的个数
                leastIndexs[0] = i; // 重新记录最小活跃数下标
                totalWeight = weight; // 重新累计总权重
                firstWeight = weight; // 记录第一个权重
                sameWeight = true; // 还原权重相同标识
            } else if (active == leastActive) { // 累计相同最小的活跃数
                leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标
                totalWeight += weight; // 累计总权重
                // 判断所有权重是否一样
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // 如果只有一个最小则直接返回
            return invokers.get(leastIndexs[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offsetWeight = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

4.ConsistentHashLoadBalance:一致性hash

一致性Hash负载均衡涉及到两个主要的配置参数为hash.arguments 与hash.nodes。

hash.arguments : 当进行调用时候根据调用方法的哪几个参数生成key,并根据key来通过一致性hash算法来选择调用结点

hash.nodes: 为结点的副本数。

 @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //获取调用方法名称
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        //生成调用列表的hashcode
        int identityHashCode = System.identityHashCode(invokers);
        //根据方法名key获取一致性hash选择器
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        //选择节点
        return selector.select(invocation);
    }
private static final class ConsistentHashSelector<T> {

        private final TreeMap<Long, Invoker<T>> virtualInvokers;//虚拟节点

        private final int                       replicaNumber;//副本数
        
        private final int                       identityHashCode;//调用节点的hashcode
        
        private final int[]                     argumentIndex;//参数索引数组

        public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = System.identityHashCode(invokers);
            URL url = invokers.get(0).getUrl();
            //获取所配置的虚拟节点数,默认为160个
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i ++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            //创建虚拟节点,对每一个Invoker生成replicaNumber个虚拟节点并存放于virtualInvokers中
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

      ..........省略..........

    }

5.自定义负载均衡策略

自定义类,只需要实现AbstractLoadBalance抽象类即可,然后将该类放入dubbo可发现的扩展点即可。


打赏

已有1人打赏

最代码官方的gravatar头像
最近浏览
水光浮藻  LV6 2021年5月14日
疯狂的阳仔  LV7 2020年9月9日
LikL9420  LV12 2020年1月16日
2252536772  LV21 2019年9月10日
liyang155  LV1 2019年7月4日
a807048932  LV5 2019年7月1日
miaoshi  LV16 2019年5月21日
haoran1234  LV1 2019年5月14日
w123520  LV2 2019年4月12日
michaelten  LV6 2019年3月5日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友