From cefe2b55a239d44df0713bc370c069b522670ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BA=AA=E5=8D=93=E5=BF=97?= Date: Tue, 14 Nov 2023 01:00:23 +0800 Subject: [PATCH] feat(lb): deterministic subsetting algorithm (#1289) --- .../spring-cloud-commons/loadbalancer.adoc | 28 +++ .../loadbalancer/LoadBalancerProperties.java | 48 +++++ .../LoadBalancerClientConfiguration.java | 28 +++ .../ServiceInstanceListSupplierBuilder.java | 11 ++ .../SubsetServiceInstanceListSupplier.java | 93 ++++++++++ ...SubsetServiceInstanceListSupplierTest.java | 174 ++++++++++++++++++ 6 files changed, 382 insertions(+) create mode 100644 spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplier.java create mode 100644 spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplierTest.java diff --git a/docs/modules/ROOT/pages/spring-cloud-commons/loadbalancer.adoc b/docs/modules/ROOT/pages/spring-cloud-commons/loadbalancer.adoc index 045d8ddc6..b8be4c338 100644 --- a/docs/modules/ROOT/pages/spring-cloud-commons/loadbalancer.adoc +++ b/docs/modules/ROOT/pages/spring-cloud-commons/loadbalancer.adoc @@ -379,6 +379,34 @@ For `WebClient`, you need to implement and define `LoadBalancerClientRequestTran If multiple transformers are defined, they are applied in the order in which Beans are defined. Alternatively, you can use `LoadBalancerRequestTransformer.DEFAULT_ORDER` or `LoadBalancerClientRequestTransformer.DEFAULT_ORDER` to specify the order. +[[loadbalancer-subset]] +== Spring Cloud LoadBalancer Subset + +`SubsetServiceInstanceListSupplier` implements a https://sre.google/sre-book/load-balancing-datacenter/[deterministic subsetting algorithm] to select a limited number of instances in the `ServiceInstanceListSupplier` delegates hierarchy. + +You can configure it either by setting the value of `spring.cloud.loadbalancer.configurations` to `subset` or by providing your own `ServiceInstanceListSupplier` bean -- for example: + +[[subset-custom-loadbalancer-configuration-example]] +[source,java,indent=0] +---- +public class CustomLoadBalancerConfiguration { + + @Bean + public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier( + ConfigurableApplicationContext context) { + return ServiceInstanceListSupplier.builder() + .withDiscoveryClient() + .withSubset() + .withCaching() + .build(context); + } + } +---- + +TIP: By default, each service instance is assigned a unique `instanceId`, and different `instanceId` values often select different subsets. Normally, you need not pay attention to it. However, if you need to have multiple instances select the same subset, you can set it with `spring.cloud.loadbalancer.subset.instance-id` (which supports placeholders). + +TIP: By default, the size of the subset is set to 100. You can also set it with `spring.cloud.loadbalancer.subset.size`. + [[spring-cloud-loadbalancer-starter]] == Spring Cloud LoadBalancer Starter diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java index dd93f89af..53965a344 100644 --- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java +++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java @@ -29,6 +29,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer; +import org.springframework.cloud.commons.util.IdUtils; +import org.springframework.core.env.PropertyResolver; import org.springframework.http.HttpMethod; import org.springframework.util.LinkedCaseInsensitiveMap; @@ -40,6 +42,7 @@ * * @author Olga Maciaszek-Sharma * @author Gandhimathi Velusamy + * @author Zhuozhi Ji * @since 2.2.1 */ public class LoadBalancerProperties { @@ -85,6 +88,12 @@ public class LoadBalancerProperties { */ private boolean callGetWithRequestOnDelegates = true; + /** + * Properties for + * {@link org.springframework.cloud.loadbalancer.core.SubsetServiceInstanceListSupplier}. + */ + private Subset subset = new Subset(); + public HealthCheck getHealthCheck() { return healthCheck; } @@ -142,6 +151,14 @@ public boolean isCallGetWithRequestOnDelegates() { return callGetWithRequestOnDelegates; } + public Subset getSubset() { + return subset; + } + + public void setSubset(Subset subset) { + this.subset = subset; + } + public void setCallGetWithRequestOnDelegates(boolean callGetWithRequestOnDelegates) { this.callGetWithRequestOnDelegates = callGetWithRequestOnDelegates; } @@ -490,4 +507,35 @@ public void setEnabled(boolean enabled) { } + public static class Subset { + + /** + * Instance id of deterministic subsetting. If not set, + * {@link IdUtils#getDefaultInstanceId(PropertyResolver)} will be used. + */ + private String instanceId = ""; + + /** + * Max subset size of deterministic subsetting. + */ + private int size = 100; + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public int getSize() { + return size; + } + + public void setSize(int size) { + this.size = size; + } + + } + } diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java index f0aae50c8..8e7a56834 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/annotation/LoadBalancerClientConfiguration.java @@ -143,6 +143,15 @@ public ServiceInstanceListSupplier weightedServiceInstanceListSupplier(Configura .build(context); } + @Bean + @ConditionalOnBean(ReactiveDiscoveryClient.class) + @ConditionalOnMissingBean + @Conditional(SubsetConfigurationCondition.class) + public ServiceInstanceListSupplier subsetServiceInstanceListSupplier(ConfigurableApplicationContext context) { + return ServiceInstanceListSupplier.builder().withDiscoveryClient().withSubset().withCaching() + .build(context); + } + } @Configuration(proxyBeanMethods = false) @@ -208,6 +217,15 @@ public ServiceInstanceListSupplier weightedServiceInstanceListSupplier(Configura .build(context); } + @Bean + @ConditionalOnBean(DiscoveryClient.class) + @ConditionalOnMissingBean + @Conditional(SubsetConfigurationCondition.class) + public ServiceInstanceListSupplier subsetServiceInstanceListSupplier(ConfigurableApplicationContext context) { + return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withSubset().withCaching() + .build(context); + } + } @Configuration(proxyBeanMethods = false) @@ -353,4 +371,14 @@ public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) } + static class SubsetConfigurationCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return LoadBalancerEnvironmentPropertyUtils.equalToForClientOrDefault(context.getEnvironment(), + "configurations", "subset"); + } + + } + } diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java index 74ee2e386..ff40abf8a 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplierBuilder.java @@ -35,6 +35,7 @@ import org.springframework.cloud.loadbalancer.config.LoadBalancerZoneConfig; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.PropertyResolver; import org.springframework.http.HttpStatus; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -301,6 +302,16 @@ public ServiceInstanceListSupplierBuilder withHints() { return this; } + public ServiceInstanceListSupplierBuilder withSubset() { + DelegateCreator creator = (context, delegate) -> { + PropertyResolver resolver = context.getBean(PropertyResolver.class); + LoadBalancerClientFactory factory = context.getBean(LoadBalancerClientFactory.class); + return new SubsetServiceInstanceListSupplier(delegate, resolver, factory); + }; + creators.add(creator); + return this; + } + /** * Support {@link ServiceInstanceListSupplierBuilder} can be added to the expansion * implementation of {@link ServiceInstanceListSupplier} by this method. diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplier.java new file mode 100644 index 000000000..26df97748 --- /dev/null +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplier.java @@ -0,0 +1,93 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.loadbalancer.core; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import reactor.core.publisher.Flux; + +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties; +import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer; +import org.springframework.cloud.commons.util.IdUtils; +import org.springframework.core.env.PropertyResolver; +import org.springframework.util.StringUtils; + +/** + * A {@link ServiceInstanceListSupplier} implementation that uses + * deterministic + * subsetting algorithm to limit the number of instances provided by delegate. + * + * @author Zhuozhi Ji + * @since 4.1.0 + */ +public class SubsetServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier { + + private final String instanceId; + + private final int size; + + public SubsetServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, PropertyResolver resolver, + ReactiveLoadBalancer.Factory factory) { + super(delegate); + LoadBalancerProperties properties = factory.getProperties(getServiceId()); + this.instanceId = resolveInstanceId(properties, resolver); + this.size = properties.getSubset().getSize(); + } + + @Override + public Flux> get() { + return delegate.get().map(instances -> { + if (instances.size() <= size) { + return instances; + } + + instances = new ArrayList<>(instances); + + int instanceId = this.instanceId.hashCode() & Integer.MAX_VALUE; + int count = instances.size() / size; + int round = instanceId / count; + + Random random = new Random(round); + Collections.shuffle(instances, random); + + int bucket = instanceId % count; + int start = bucket * size; + return instances.subList(start, start + size); + }); + } + + private static String resolveInstanceId(LoadBalancerProperties properties, PropertyResolver resolver) { + String instanceId = properties.getSubset().getInstanceId(); + if (StringUtils.hasText(instanceId)) { + return resolver.resolvePlaceholders(properties.getSubset().getInstanceId()); + } + return IdUtils.getDefaultInstanceId(resolver); + } + + public String getInstanceId() { + return instanceId; + } + + public int getSize() { + return size; + } + +} diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplierTest.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplierTest.java new file mode 100644 index 000000000..71772e20a --- /dev/null +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/SubsetServiceInstanceListSupplierTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.loadbalancer.core; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.client.DefaultServiceInstance; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties; +import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer; +import org.springframework.cloud.commons.util.IdUtils; +import org.springframework.mock.env.MockEnvironment; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.springframework.cloud.loadbalancer.core.LoadBalancerTestUtils.buildLoadBalancerClientFactory; + +/** + * Tests for {@link SubsetServiceInstanceListSupplier} + * + * @author Zhuozhi Ji + */ +class SubsetServiceInstanceListSupplierTest { + + private final DiscoveryClientServiceInstanceListSupplier delegate = mock( + DiscoveryClientServiceInstanceListSupplier.class); + + private MockEnvironment env; + + @BeforeEach + public void setup() { + env = new MockEnvironment(); + } + + @AfterEach + public void destroy() { + env = null; + } + + @Test + void shouldResolvePlaceholderWhenInstanceIdSet() { + env.setProperty("foo", "bar"); + when(delegate.getServiceId()).thenReturn("test"); + SubsetServiceInstanceListSupplier supplier = new SubsetServiceInstanceListSupplier(delegate, env, + factory("${foo}", 100)); + + assertThat(supplier.getInstanceId()).isEqualTo("bar"); + } + + @Test + void shouldUseIdUtilsWhenInstanceIdNotSet() { + SubsetServiceInstanceListSupplier supplier = new SubsetServiceInstanceListSupplier(delegate, env, + factory("", 100)); + + when(delegate.getServiceId()).thenReturn("test"); + assertThat(supplier.getInstanceId()).isEqualTo(IdUtils.getDefaultInstanceId(env)); + } + + @Test + void shouldReturnEmptyWhenDelegateReturnedEmpty() { + when(delegate.getServiceId()).thenReturn("test"); + when(delegate.get()).thenReturn(Flux.just(Collections.emptyList())); + SubsetServiceInstanceListSupplier supplier = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar", 100)); + + List serviceInstances = Objects.requireNonNull(supplier.get().blockFirst()); + assertThat(serviceInstances).isEmpty(); + } + + @Test + void shouldReturnSublistWithGivenSubsetSize() { + List instances = IntStream.range(0, 101) + .mapToObj(i -> new DefaultServiceInstance(Integer.toString(i), "test", "host" + i, 8080, false, null)) + .collect(Collectors.toList()); + + when(delegate.getServiceId()).thenReturn("test"); + when(delegate.get()).thenReturn(Flux.just(instances)); + SubsetServiceInstanceListSupplier supplier = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar", 5)); + + List serviceInstances = Objects.requireNonNull(supplier.get().blockFirst()); + assertThat(serviceInstances).hasSize(5); + } + + @Test + void shouldReturnRawWhenLessThanSubsetSize() { + List instances = IntStream.range(0, 101) + .mapToObj(i -> new DefaultServiceInstance(Integer.toString(i), "test", "host" + i, 8080, false, null)) + .collect(Collectors.toList()); + + when(delegate.getServiceId()).thenReturn("test"); + when(delegate.get()).thenReturn(Flux.just(instances)); + SubsetServiceInstanceListSupplier supplier = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar", 1000)); + + List serviceInstances = Objects.requireNonNull(supplier.get().blockFirst()); + assertThat(serviceInstances).hasSize(101); + } + + @Test + void shouldReturnSameSublistForSameInstanceId() { + List instances = IntStream.range(0, 101) + .mapToObj(i -> new DefaultServiceInstance(Integer.toString(i), "test", "host" + i, 8080, false, null)) + .collect(Collectors.toList()); + + when(delegate.getServiceId()).thenReturn("test"); + when(delegate.get()).thenReturn(Flux.just(instances)); + + SubsetServiceInstanceListSupplier supplier1 = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar", 5)); + List serviceInstances1 = Objects.requireNonNull(supplier1.get().blockFirst()); + + SubsetServiceInstanceListSupplier supplier2 = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar", 5)); + List serviceInstances2 = Objects.requireNonNull(supplier2.get().blockFirst()); + + assertThat(serviceInstances1).isEqualTo(serviceInstances2); + } + + @Test + void shouldReturnDifferentSublistForDifferentInstanceId() { + List instances = IntStream.range(0, 101) + .mapToObj(i -> new DefaultServiceInstance(Integer.toString(i), "test", "host" + i, 8080, false, null)) + .collect(Collectors.toList()); + + when(delegate.getServiceId()).thenReturn("test"); + when(delegate.get()).thenReturn(Flux.just(instances)); + + SubsetServiceInstanceListSupplier supplier1 = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar1", 5)); + List serviceInstances1 = Objects.requireNonNull(supplier1.get().blockFirst()); + + SubsetServiceInstanceListSupplier supplier2 = new SubsetServiceInstanceListSupplier(delegate, env, + factory("foobar2", 5)); + List serviceInstances2 = Objects.requireNonNull(supplier2.get().blockFirst()); + + assertThat(serviceInstances1).isNotEqualTo(serviceInstances2); + } + + ReactiveLoadBalancer.Factory factory(String instanceId, int size) { + LoadBalancerProperties properties = new LoadBalancerProperties(); + LoadBalancerProperties.Subset subset = new LoadBalancerProperties.Subset(); + subset.setInstanceId(instanceId); + subset.setSize(size); + properties.setSubset(subset); + + return buildLoadBalancerClientFactory("test", properties); + } + +}