2019独角兽企业重金招聘Python工程师标准>>>
dubbo版本2.5.3
我们这里以zookeeper作为注册中心为例说明。
这里说的集群,可以理解为,一个接口服务对应有多个提供者。
在dubbo的调用方(reference)看来,每个提供方(service)对应一个invoker。
关于一个调用方对应多个提供方的场景大概包括三大类:
1,者调者订阅一个注册中心,此注册中心,同一个服务有多个提供者(以不同机器,端口,版本等发布的服务)
2,者调者订阅多个注册中心的服务,每个注册中心都有引用的服务的提供者(一个或者多个)。
3,调用方,通过url配置,提供多个提供者地址,多个地址以分号隔开。
1,2是同一类场景,3是直连场景,这两中场景是互斥,也就是用户配置了reference的url属性,dubbo就不会再订阅注册中心。
下面通过代码分析下,这三种场景的集群容错
客户端订阅可以看ReferenceConfig类的createProxy方法里以下代码
if (isJvmRefer) {//引用本地服务,只返回一个exporter不会有集群。URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);invoker = refprotocol.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else {//应用远程服务if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URLString[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {//用户自定多个直连服务for (String u : us) {URL url = URL.valueOf(u);if (url.getPath() == null || url.getPath().length() == 0) {url = url.setPath(interfaceName);}if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));} else {urls.add(ClusterUtils.mergeUrl(url, map));}}}} else { // 通过注册中心配置拼装URLList<URL> us = loadRegistries(false);if (us != null && us.size() > 0) {//用户自定多个注册中心for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));}}if (urls == null || urls.size() == 0) {throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");}}if (urls.size() == 1) {//调用方订阅一个注册中心,或者自定一个直连服务(直连的这种情况不考虑集群,只有一个提供者)//一个注册中心时,这个refprotocol自适应后是RegistryProtocol//一个直连者时,这个refprotocol自适应后是DubboProtocol(如果是duboo协议)invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {//调用方订阅多个注册中心,或者多个直连地址List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // 用了最后一个registry url}}if (registryURL != null) { // 有 注册中心协议的URL// 对有注册中心的Cluster 只用 AvailableCluster 容错策略// 对于订阅多个注册中心的,这里其实有两层的容错机制,只是第一层,被强制设置为AvailableCluster 容错策略URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));//cluster是通过spi机制注入的自适应adaptive实现,场景2执行逻辑} else { // 不是 注册中心的URLinvoker = cluster.join(new StaticDirectory(invokers));//cluster是通过spi机制注入的自适应adaptive实现,场景3执行逻辑}}}
通过代码我们看到,对于场景1,引用一个注册中心的场景,会执行
invoker = refprotocol.refer(interfaceClass, urls.get(0));代码
通过代码调试,可以发现,refprotocol.refer会调用RegistryProtocol的refer方法最终进入doRefer方法,
会执行如下代码
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());if (! Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));return cluster.join(directory);//cluster是通过spi机制注入的自适应adaptive实现。此时的directory是RegistryDirectory类型}
三种场景实际上,都执行了dubbo SPI机制生成的adaptive的Cluster实现代码
通过dubbo打印日志,可以看到adaptive的Cluster实现代码如下
package com.alibaba.dubbo.rpc.cluster;import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();String extName = url.getParameter("cluster", "failover");//可以看到,通过url里的cluster键值获取容错机制,url中没有指定cluster键值,dubbo默认是用failover集群容错策略if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);return extension.join(arg0);}
}
到此,我们可以看到对于多注册中心的,第一层容错机制被强制设置为available,
然后第二层,就和单个注册中心多服务提供者集群容错机制一样了,即默认为failover容错机制。这里看下这两种容错机制的代码实现
1,failover容错机制
通过spi机制我们找到Cluster failover扩展FailoverCluster类是这样实现的
public class FailoverCluster implements Cluster {public final static String NAME = "failover";public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker<T>(directory);}}
接着看FailoverClusterInvoker类,先看它的父类AbstractClusterInvoker,这个类实现了Invoker接口:
public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();LoadBalance loadbalance;//这里是获取负载均衡策略List<Invoker<T>> invokers = list(invocation);if (invokers != null && invokers.size() > 0) {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));} else {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);}RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);return doInvoke(invocation, invokers, loadbalance);//回调子类的doInvoke方法
}
然后再回到子类看doInvoke方法:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);public FailoverClusterInvoker(Directory<T> directory) {super(directory);}@SuppressWarnings({ "unchecked", "rawtypes" })public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;checkInvokers(copyinvokers, invocation);//获取重试次数 +1是因为第一次调用不算重试次数int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//重试时,进行重新选择,避免重试时invoker列表已发生变化.//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变if (i > 0) {checkWhetherDestroyed();copyinvokers = list(invocation);//重新检查一下checkInvokers(copyinvokers, invocation);}//这里是通过负载均衡策略获取下一个服务提供者Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List)invoked);try {Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("Although retry the method " + invocation.getMethodName()+ " in the service " + getInterface().getName()+ " was successful by the provider " + invoker.getUrl().getAddress()+ ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le);}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.如果是业务异常,则不再重试,直接抛出异常throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());//记录调用失败信息}}throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);}}
通过代码可以看到,
failvoer集群容错机制,总的逻辑是,以方法重复次数为限制,每次调用如果失败,
就利用负责均衡策略获取下一个提供者(invoker),直到调用成功,或者最后方法超限,抛出异常,
其中中间如果有业务异常,则不再重试,直接抛出异常。
2,available集群容错机制,我们找到AvailableCluster类,它只有一个方法
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {//它没通过扩展AbstractClusterInvoker抽象类,而是直接实现它,它没用负载均衡策略,而是简单选择一个可达的服务return new AbstractClusterInvoker<T>(directory) {public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {for (Invoker<T> invoker : invokers) {if (invoker.isAvailable()) {//获取第一个,可达的服务提供方,return invoker.invoke(invocation);}}throw new RpcException("No provider available in " + invokers);}};}
通过代码可以看到,
available集群容错机制,则是简单的调用第一个可到达的服务。都不可达是,抛出异常
最后
dubbo本身还有其他集群容错的扩展实现,这里https://my.oschina.net/u/146130/blog/1563305