前面我们探讨了如何获取某一个Dubbo的服务的提供者列表,本节我们探讨如何使用Dubbo的扩展,实现指定IP调用。
	 
	    动态指定IP调用Dubbo服务
	 
	    在Dubbo中集群容错策略Cluster是SPI扩展接口,DUbbo框架提供了丰富的集群容错策略实现,集群容错策略里面会先从RegistryDirectory获取所有服务提供者的Invoker列表,然后使用负载均衡策略从中选择一个Inovker来发起远程调用;基于此原理我们可以实现自己的集群容错策略,一开始还是使用RegistryDirectory获取所有服务提供者的Invoker列表,但是不进行负载均衡策略,而是使用我们在发起远程调用前指定的ip相匹配的Invoker来进行发远程调用;至于如何在每次调用前指定ip,可以使用RpcContext.getContext().set("ip", "30.10.67.231")来做。
	 
	    动态VPN基于上面思想本节我们就基于扩展接口实现指定IP调用功能,首先我们实现扩展接口Cluster:
	 
	    public class MyCluster implements Cluster{
	 
	    @Override
	 
	    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
	 
	    return new MyClusterInvoker(directory);
	 
	    }
	 
	    }
	 
	    然后我们看自己实现的MyClusterInvoker
	 
	    public class MyClusterInvoker<T> extends MyAbstractClusterInvoker<T> {
	 
	    public MyClusterInvoker(Directory<T> directory) {
	 
	    super(directory);
	 
	    }
	 
	    @Override
	 
	    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
	 
	    throws RpcException {
	 
	    // 1.查看是否设置了指定ip
	 
	    String ip = (String) RpcContext.getContext().get("ip");
	 
	    if (StringUtils.isBlank(ip)) {
	 
	    throw new RuntimeException("ip is blank ");
	 
	    }
	 
	    // 2.检查是否有可用invoker
	 
	    checkInvokers(invokers, invocation);
	 
	    // 3.根据指定ip获取对应invoker
	 
	    Invoker<T> invoked = invokers.stream().filter(invoker -> invoker.getUrl().getHost().equals(ip)).findFirst()
	 
	    .orElseThrow(new Supplier<RpcException>() {
	 
	    @Override
	 
	    public RpcException get() {
	 
	    return  new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER,
	 
	    "Failed to invoke the method " + invocation.getMethodName() + " in the service "
	 
	    + getInterface().getName() + ". No provider available for the service "
	 
	    + directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer "
	 
	    + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()
	 
	    + ". Please check if the providers have been started and registered.");
	 
	    }
	 
	    });
	 
	    // 4.发起远程调用,失败则抛出异常
	 
	    try {
	 
	    return invoked.invoke(invocation);
	 
	    } catch (Throwable e) {
	 
	    if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
	 
	    throw (RpcException) e;
	 
	    }
	 
	    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
	 
	    "Fail invoke providers " + (invoked != null ? invoked.getUrl() : "") + " "
	 
	    + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers
	 
	    + " for service " + getInterface().getName() + " method " + invocation.getMethodName()
	 
	    + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()
	 
	    + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
	 
	    e.getCause() != null ? e.getCause() : e);
	 
	    }
	 
	    }
	 
	    }
	 
	    如上代码1,我们从RpcContext.getContext()获取了属性值ip,如果改值不为空,说明指定了ip。
	 
	    代码2则检查是否有可用的服务提供者,如果没有则抛出异常。
	 
	    代码3遍历invokers列表查找指定IP对应的Invoker,如果没有指定IP对应的Invoker,则抛出异常。
	 
	    代码4 具体使用选择的invoker发起远程调用。
	 
	    注意我们还修改了框架的AbstractClusterInvoker为MyAbstractClusterInvoker:
	 
	    public Result invoke(final Invocation invocation) throws RpcException {
	 
	    checkWhetherDestroyed();
	 
	    // binding attachments into invocation.
	 
	    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
	 
	    if (contextAttachments != null && contextAttachments.size() != 0) {
	 
	    ((RpcInvocation) invocation).addAttachments(contextAttachments);
	 
	    }
	 
	    List<Invoker<T>> invokers = list(invocation);
	 
	    LoadBalance loadbalance = null;//initLoadBalance(invokers, invocation);
	 
	    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
	 
	    return doInvoke(invocation, invokers, loadbalance);
	 
	    }
	 
	    这里我们把 LoadBalance loadbalance = initLoadBalance(invokers, invocation);
	 
	    修改为了 LoadBalance loadbalance = null;因为我们不需要负载均衡了。
	 
	    扩展实现写好后,要把扩展实现配置到下面文件
	 
	    image.png
	 
	    然后在消费端调用时候进行下面设置就可以指定ip调用了,比如在demo的consumer模块的APiConsumerSetIpcall类:
	 
	    public class APiConsumerSetIpcall {
	 
	    public static void main(String[] args) throws InterruptedException {
	 
	    // 1.创建服务引用对象实例
	 
	    ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
	 
	    ...
	 
	    // 5.设置自定义负载均衡策略与集群容错策略(以便实现指定ip)
	 
	    referenceConfig.setCluster("myCluster");
	 
	    // 7.引用服务
	 
	    GreetingService greetingService = referenceConfig.get();
	 
	    //8.获取地址列表
	 
	    ZookeeperIpList zk = new ZookeeperIpList();
	 
	    zk.init("127.0.0.1:2181", "dubbo", "com.books.dubbo.demo.api.GreetingService:1.0.0", "dubbo");
	 
	    //9.指定ip调用
	 
	    for (String ip : zk.getIpList()) {
	 
	    RpcContext.getContext().set("ip", ip);
	 
	    System.out.println(greetingService.sayHello("world" + ip));
	 
	    }
	 
	    }
	 
	    }
	 
	    如上代码(8)创建ZookeeperIpList对象并初始化,代码9则调用ZookeeperIpList的getIpList()方法获取所有服务提供者地址列表,然后轮询指定ip进行调用,也就是我们实现了动态ip指定路由调用,每次发起远程调用前,可以指定那一台服务提供方来提供服务。