springCloudLoadbalance源码解读 本文讲解spring-cloud-loadbalance
这个包,这个包提供了微服务下的负载均衡功能。要想使用负载均衡前提是必须拥有注册中心,同一个名字的服务在注册中心上存在多个实例,这是就需要负载均衡来实现对多个实例的均衡请求。
类比一下dns系统,一个域名在dns上可能存在多个ip地址,这就相当于同一个微服务模块在注册中心上存在多个实例,我们进行dns查询后获取到的多个ip地址,我们会选择其中一个进行连接,具体如何选择就需要一种策略来实现。一般查询dns会选择第一个ip地址,而微服务里的负载均衡会采用轮询的方式平均使用所有的实例。
堵塞式负载均衡接口api
首先看它的继承图
负载均衡的接口是这样的,他的核心接口是LoadBalancerClient
。
1 2 3 4 5 public interface ServiceInstanceChooser { ServiceInstance choose (String serviceId) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface LoadBalancerClient extends ServiceInstanceChooser { <T> T execute (String serviceId, LoadBalancerRequest<T> request) throws IOException; <T> T execute (String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; URI reconstructURI (ServiceInstance instance, URI original) ; }
而LoadBalancerRequest
对象像下面这样,只需要传入微服务实例,就能得到结果。
1 2 3 4 5 public interface LoadBalancerRequest <T> { T apply (ServiceInstance instance) throws Exception; }
所以核心方法就是choose(String serviceId)
,他能由字符串形式的服务id得到服务实例。LoadBalancerClient
的默认实现是这样的。我已经将多余部分删除了,他的逻辑就是使用choose
方法获取到服务实例,然后调用request
对象的apply方法就能得到结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class BlockingLoadBalancerClient implements LoadBalancerClient { @Override public <T> T execute (String serviceId, LoadBalancerRequest<T> request) throws IOException { ServiceInstance serviceInstance = choose(serviceId); return execute(serviceId, serviceInstance, request); } @Override public <T> T execute (String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { return request.apply(serviceInstance); } @Override public URI reconstructURI (ServiceInstance serviceInstance, URI original) { return LoadBalancerUriTools.reconstructURI(serviceInstance, original); } @Override public ServiceInstance choose (String serviceId) { ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId); if (loadBalancer == null ) { return null ; } Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose()).block(); if (loadBalancerResponse == null ) { return null ; } return loadBalancerResponse.getServer(); }
可以看到核心方法choose
里面的逻辑是委托给ReactiveLoadBalancer
来实现的,这是一种常见做法,将堵塞式的方法委托给非堵塞式的方法。下面看看这个非堵塞式接口是如何实现的。
非堵塞式api 上面是堵塞式的方式,而这种纯网络请求,不落库的场景是很容易做成异步形式的,下面看看非堵塞方式的api骨架。
1 2 3 4 5 6 7 8 9 public interface ReactorLoadBalancer <T> extends ReactiveLoadBalancer <T> { Mono<Response<T>> choose (Request request) ; default Mono<Response<T>> choose () { return choose(REQUEST); } }
非堵塞式的api更简单,他有一个choose
方法,返回值是Response<T>
,这个泛型T就是想要的到的结果数据。
我们要做的事情就是由服务id,选取一个服务实例返回,这就是choose方法应该做到的。
具体实现 上面的就是spring-cloud
负载均衡方面的接口定义,具体实现都在spring-cloud-loadbalance
依赖里面。
打开spring.factories文件
这个包下的配置文件很简单,只有三个,下面一个一个分析。
1. LoadBalancerCacheAutoConfiguration 很显然此类与缓存有关,我们从注册中心获取到的服务信息是要缓存起来的,不可能每次都重新获取,此类上会创建一个LoadBalancerCacheManager
对象,底层根据当前chasspath来选择Caffeine或evictor来缓存实现。
1 2 3 4 5 6 7 8 9 10 11 12 @Configuration(proxyBeanMethods = false) @ConditionalOnClass(Caffeine.class) protected static class CaffeineLoadBalancerCacheManagerConfiguration { @Bean(autowireCandidate = false) @ConditionalOnMissingBean LoadBalancerCacheManager caffeineLoadBalancerCacheManager ( LoadBalancerCacheProperties cacheProperties) { return new CaffeineBasedLoadBalancerCacheManager (cacheProperties); } }
2. BlockingLoadBalancerClientAutoConfiguration 此类也很简单,他创建了BlockingLoadBalancerClient
的对象,并且传入LoadBalancerClientFactory
,因为堵塞式的处理逻辑是委托给非堵塞式的,这里没有太多逻辑。
1 2 3 4 5 6 7 @Bean @ConditionalOnBean(LoadBalancerClientFactory.class) @Primary public BlockingLoadBalancerClient blockingLoadBalancerClient ( LoadBalancerClientFactory loadBalancerClientFactory) { return new BlockingLoadBalancerClient (loadBalancerClientFactory); }
此堵塞式的客户端会将所有逻辑委托给非堵塞式的客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 public ServiceInstance choose (String serviceId) { ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId); if (loadBalancer == null ) { return null ; } Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose()).block(); if (loadBalancerResponse == null ) { return null ; } return loadBalancerResponse.getServer(); }
3. LoadBalancerAutoConfiguration 此方法创建LoadBalancerClientFactory
对象,这个类是一个NamedContextFactory
类,他能根据当前容器创建子容器。
1 2 3 4 5 6 7 8 @ConditionalOnMissingBean @Bean public LoadBalancerClientFactory loadBalancerClientFactory () { LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory (); clientFactory.setConfigurations( this .configurations.getIfAvailable(Collections::emptyList)); return clientFactory; }
构造方法如下,默认将LoadBalancerClientConfiguration
类注册到子容器内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class LoadBalancerClientFactory extends NamedContextFactory <LoadBalancerClientSpecification> implements ReactiveLoadBalancer .Factory<ServiceInstance> { public static final String NAMESPACE = "loadbalancer" ; public static final String PROPERTY_NAME = NAMESPACE + ".client.name" ; public LoadBalancerClientFactory () { super (LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME); } public String getName (Environment environment) { return environment.getProperty(PROPERTY_NAME); } @Override public ReactiveLoadBalancer<ServiceInstance> getInstance (String serviceId) { return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class); } }
主要看他的getInstance
方法
1 2 3 4 5 6 7 8 9 10 public <T> T getInstance (String name, Class<T> type) { AnnotationConfigApplicationContext context = getContext(name); if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ) { return context.getBean(type); } return null ; }
获取子容器的过程,这里使用缓存存储创建的每个子容器,子容器不会重复创建
1 2 3 4 5 6 7 8 9 10 protected AnnotationConfigApplicationContext getContext (String name) { if (!this .contexts.containsKey(name)) { synchronized (this .contexts) { if (!this .contexts.containsKey(name)) { this .contexts.put(name, createContext(name)); } } } return this .contexts.get(name); }
创建子容器的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 protected AnnotationConfigApplicationContext createContext (String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext (); if (this .configurations.containsKey(name)) { for (Class<?> configuration : this .configurations.get(name) .getConfiguration()) { context.register(configuration); } } for (Map.Entry<String, C> entry : this .configurations.entrySet()) { if (entry.getKey().startsWith("default." )) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } } context.register(PropertyPlaceholderAutoConfiguration.class, this .defaultConfigType); context.getEnvironment().getPropertySources().addFirst(new MapPropertySource ( this .propertySourceName, Collections.<String, Object>singletonMap(this .propertyName, name))); if (this .parent != null ) { context.setParent(this .parent); context.setClassLoader(this .parent.getClassLoader()); } context.setDisplayName(generateDisplayName(name)); context.refresh(); return context; }
下面是LoadBalancerClientConfiguration
配置类,每个serverId
都会创建一个子容器,每个子容器都会解析一遍此类,将解析出来的bean注册进容器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 public class LoadBalancerClientConfiguration { private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465 ; @Bean @ConditionalOnMissingBean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer ( Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinLoadBalancer (loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); } @Configuration(proxyBeanMethods = false) @ConditionalOnReactiveDiscoveryEnabled @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER) public static class ReactiveSupportConfiguration { @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withDiscoveryClient() .withCaching().build(context); } @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference") public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withDiscoveryClient() .withZonePreference().withCaching().build(context); } @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "health-check") public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withDiscoveryClient() .withHealthChecks().withCaching().build(context); } @Bean @ConditionalOnBean(ReactiveDiscoveryClient.class) @ConditionalOnMissingBean public ServiceInstanceSupplier discoveryClientServiceInstanceSupplier ( ReactiveDiscoveryClient discoveryClient, Environment env, ApplicationContext context) { DiscoveryClientServiceInstanceSupplier delegate = new DiscoveryClientServiceInstanceSupplier ( discoveryClient, env); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null ) { return new CachingServiceInstanceSupplier (delegate, cacheManagerProvider.getIfAvailable()); } return delegate; } } @Configuration(proxyBeanMethods = false) @ConditionalOnBlockingDiscoveryEnabled @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1) public static class BlockingSupportConfiguration { @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true) public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient() .withCaching().build(context); } @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference") public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient() .withZonePreference().withCaching().build(context); } @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "health-check") public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient() .withHealthChecks().withCaching().build(context); } @Bean @ConditionalOnBean(DiscoveryClient.class) @ConditionalOnMissingBean public ServiceInstanceSupplier discoveryClientServiceInstanceSupplier ( DiscoveryClient discoveryClient, Environment env, ApplicationContext context) { DiscoveryClientServiceInstanceSupplier delegate = new DiscoveryClientServiceInstanceSupplier ( discoveryClient, env); ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); if (cacheManagerProvider.getIfAvailable() != null ) { return new CachingServiceInstanceSupplier (delegate, cacheManagerProvider.getIfAvailable()); } return delegate; } } }
下面看RoundRobinLoadBalancer
是如何处理的。他从serviceInstanceListSupplierProvider
里面获取到List<ServiceInstance>
,并使用getInstanceResponse
方法轮询选择一个服务对象返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer { public Mono<Response<ServiceInstance>> choose (Request request) { if (serviceInstanceListSupplierProvider != null ) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new ); return supplier.get().next().map(this ::getInstanceResponse); } ServiceInstanceSupplier supplier = this .serviceInstanceSupplier .getIfAvailable(NoopServiceInstanceSupplier::new ); return supplier.get().collectList().map(this ::getInstanceResponse); } private Response<ServiceInstance> getInstanceResponse ( List<ServiceInstance> instances) { int pos = Math.abs(this .position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse (instance); } }
默认ServiceInstanceListSupplier
的生成是有ServiceInstanceListSupplier.builder
工具类实现的,最后调用的build
方法将当前容器传入进去,所需的discoverClient
,cache
等都能从容器内获取。
1 2 3 4 5 public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier ( ConfigurableApplicationContext context) { return ServiceInstanceListSupplier.builder().withDiscoveryClient() .withCaching().build(context); }
@LoadBalancerClient注解 在创建LoadBalancerClientFactory
时注意到调用setConfigurations
方法,这些配置类是自动注入进来的,那么这些配置类是从哪里创建的,又是在哪里起作用的呢,通过@LoadBalancerClient
注解可以方便的注入这些配置进去。
1 2 3 4 5 6 7 8 @ConditionalOnMissingBean @Bean public LoadBalancerClientFactory loadBalancerClientFactory () { LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory (); clientFactory.setConfigurations( this .configurations.getIfAvailable(Collections::emptyList)); return clientFactory; }
注解需要提供名称表明配置要注册到哪个子容器里。
1 2 3 4 5 6 7 8 9 10 11 12 public @interface LoadBalancerClient { @AliasFor("name") String value () default "" ; @AliasFor("value") String name () default "" ; Class<?>[] configuration() default {}; }
此注解引入了LoadBalancerClientConfigurationRegistrar
类,这里将注解上name和configuration包装成LoadBalancerClientSpecification
对象,注册到容器里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override public void registerBeanDefinitions (AnnotationMetadata metadata, BeanDefinitionRegistry registry) { Map<String, Object> attrs = metadata .getAnnotationAttributes(LoadBalancerClients.class.getName(), true ); if (attrs != null && attrs.containsKey("value" )) { AnnotationAttributes[] clients = (AnnotationAttributes[]) attrs.get("value" ); for (AnnotationAttributes client : clients) { registerClientConfiguration(registry, getClientName(client), client.get("configuration" )); } } if (attrs != null && attrs.containsKey("defaultConfiguration" )) { String name; if (metadata.hasEnclosingClass()) { name = "default." + metadata.getEnclosingClassName(); } else { name = "default." + metadata.getClassName(); } registerClientConfiguration(registry, name, attrs.get("defaultConfiguration" )); } Map<String, Object> client = metadata .getAnnotationAttributes(LoadBalancerClient.class.getName(), true ); String name = getClientName(client); if (name != null ) { registerClientConfiguration(registry, name, client.get("configuration" )); } }private static void registerClientConfiguration (BeanDefinitionRegistry registry, Object name, Object configuration) { BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(configuration); registry.registerBeanDefinition(name + ".LoadBalancerClientSpecification" , builder.getBeanDefinition()); }
创建子容器时,根据名字从列表里获取配置并注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected AnnotationConfigApplicationContext createContext (String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext (); if (this .configurations.containsKey(name)) { for (Class<?> configuration : this .configurations.get(name) .getConfiguration()) { context.register(configuration); } } for (Map.Entry<String, C> entry : this .configurations.entrySet()) { if (entry.getKey().startsWith("default." )) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } }
上面可以看到@LoadBalancerClient
是引入一个名字和配置类,配置类最终会注册到名称指定的子容器里。
总结一下负载均衡获取根据实例id获取实例的过程
创建LoadBalancerClientFactory
工厂类,其是一个子容器的工厂
LoadBalancerClientFactory
获取bean时会自动创建子容器,创建过程中会解析配置类(构造方法里传入的和@LoadBalancerClient
注解引入的)
由工厂类获取ReactiveLoadBalancer
对象(解析好配置类后,此对象就会在子容器里)
创建BlockingLoadBalancerClient
供堵塞式api使用,其将逻辑委托给ReactiveLoadBalancer
这样我们就能从容器里获取堵塞式和非堵塞式的LoadbalancerClient
了
总结 spring-loadbalance
是一个小巧的库,它支持堵塞式和非堵塞式的api,其中堵塞式是委托给非堵塞式来完成的,它的原理是使用DiscoverClient
获取指定服务id对应的服务实例列表,通过轮询的方式从其中选择一个实例,并且可以配置缓存。