Skip to content

Commit

Permalink
loadbalancer-experimental: remove random-subsetting API from LoadBala…
Browse files Browse the repository at this point in the history
…ncerBuilder

Motivation:

There are many forms of subsetting and I don't think we have sorted out
the best way to make that configurable from an API perspective. As we
make the LoadBalancerBuilder API non-experimental, we don't want to yet
commit to the existing API.

Modifications:

- Remove the configuration from LoadBalancerBuilder
- Extract the random subsetting logic into it's own abstraction to prep for future subsetting strategies

Result:

- Smaller API commitments
- More flexibility to experiment internally
  • Loading branch information
bryce-anderson committed Dec 6, 2024
1 parent 3a45a23 commit 10133e7
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -67,7 +66,6 @@
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.utils.internal.NumberUtils.ensureNonNegative;
import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.lang.Integer.toHexString;
import static java.lang.System.identityHashCode;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -110,8 +108,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private final Publisher<Object> eventStream;
private final SequentialCancellable discoveryCancellable = new SequentialCancellable();
private final ConnectionSelector<C> connectionSelector;
private final Subsetter subsetter;
private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
private final int randomSubsetSize;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final HostPriorityStrategy priorityStrategy;
Expand All @@ -129,7 +127,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
* @param eventPublisher provides a stream of addresses to connect to.
* @param priorityStrategyFactory a builder of the {@link HostPriorityStrategy} to use with the load balancer.
* @param loadBalancingPolicy a factory of the initial host selector to use with this load balancer.
* @param randomSubsetSize the maximum number of health hosts to use when load balancing.
* @param subsetter a subset builder.
* @param connectionSelectorFactory factory of the connection pool strategy to use with this load balancer.
* @param connectionFactory a function which creates new connections.
* @param loadBalancerObserverFactory factory used to build a {@link LoadBalancerObserver} to use with this
Expand All @@ -145,7 +143,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final Function<String, HostPriorityStrategy> priorityStrategyFactory,
final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final int randomSubsetSize,
final Subsetter subsetter,
final ConnectionSelector.ConnectionSelectorFactory<C> connectionSelectorFactory,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final LoadBalancerObserverFactory loadBalancerObserverFactory,
Expand All @@ -163,7 +161,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.eventStream = fromSource(eventStreamProcessor)
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
this.connectionFactory = requireNonNull(connectionFactory);
this.randomSubsetSize = ensurePositive(randomSubsetSize, "randomSubsetSize");
this.subsetter = requireNonNull(subsetter, "subsetter");
this.loadBalancerObserver = requireNonNull(loadBalancerObserverFactory, "loadBalancerObserverFactory")
.newObserver(lbDescription);
this.healthCheckConfig = healthCheckConfig;
Expand Down Expand Up @@ -514,38 +512,11 @@ private void sequentialUpdateUsedHosts(List<PrioritizedHostImpl<ResolvedAddress,
host.loadBalancingWeight(host.serviceDiscoveryWeight());
}
nextHosts = priorityStrategy.prioritize(nextHosts);
nextHosts = makeSubset(nextHosts);
nextHosts = subsetter.subset(nextHosts);
this.hostSelector = hostSelector.rebuildWithHosts(nextHosts);
loadBalancerObserver.onHostSetChanged(Collections.unmodifiableList(nextHosts));
}

private List<PrioritizedHostImpl<ResolvedAddress, C>> makeSubset(
final List<PrioritizedHostImpl<ResolvedAddress, C>> nextHosts) {
if (nextHosts.size() <= randomSubsetSize) {
return nextHosts;
}

// We need to sort, and then return the list with the subsetSize number of healthy elements.
List<PrioritizedHostImpl<ResolvedAddress, C>> result = new ArrayList<>(nextHosts);
result.sort(Comparator.comparingLong(a -> a.randomSeed));

// We don't want to consider the unhealthy elements to be a part of our subset, so we're going to grow it
// to account for un-health endpoints. However, we need to know how many that is.
for (int i = 0, healthyCount = 0; i < result.size(); i++) {
if (result.get(i).isHealthy()) {
++healthyCount;
if (healthyCount == randomSubsetSize) {
// Trim elements after i to form the subset.
while (result.size() > i + 1) {
result.remove(result.size() - 1);
}
break;
}
}
}
return result;
}

@Override
public Single<C> selectConnection(final Predicate<C> selector, @Nullable final ContextMap context) {
return defer(() -> selectConnection0(selector, context, false).shareContextOnSubscribe());
Expand Down Expand Up @@ -700,6 +671,11 @@ static final class PrioritizedHostImpl<ResolvedAddress, C extends LoadBalancedCo
this.loadBalancingWeight = serviceDiscoveryWeight;
}

@Override
public long randomSeed() {
return randomSeed;
}

Host<ResolvedAddress, C> delegate() {
return delegate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@
import java.util.function.Function;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.util.Objects.requireNonNull;

final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancerBuilder<ResolvedAddress, C> {

private final String id;
private int randomSubsetSize = Integer.MAX_VALUE;
private LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy = defaultLoadBalancingPolicy();

@Nullable
Expand All @@ -58,12 +56,6 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> randomSubsetSize(int randomSubsetSize) {
this.randomSubsetSize = ensurePositive(randomSubsetSize, "randomSubsetSize");
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
@Nullable LoadBalancerObserverFactory loadBalancerObserverFactory) {
Expand Down Expand Up @@ -93,7 +85,7 @@ public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backg

@Override
public LoadBalancerFactory<ResolvedAddress, C> build() {
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, randomSubsetSize, loadBalancerObserverFactory,
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, loadBalancerObserverFactory,
connectionSelectorFactory, outlierDetectorConfig, getExecutor());
}

Expand All @@ -102,22 +94,19 @@ static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends LoadBal

private final String id;
private final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy;
private final int subsetSize;
@Nullable
private final LoadBalancerObserverFactory loadBalancerObserverFactory;
private final ConnectionSelectorFactory<C> connectionSelectorFactory;
private final OutlierDetectorConfig outlierDetectorConfig;
private final Executor executor;

DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final int subsetSize,
@Nullable final LoadBalancerObserverFactory loadBalancerObserverFactory,
final ConnectionSelectorFactory<C> connectionSelectorFactory,
final OutlierDetectorConfig outlierDetectorConfig,
final Executor executor) {
this.id = requireNonNull(id, "id");
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
this.subsetSize = ensurePositive(subsetSize, "subsetSize");
this.loadBalancerObserverFactory = loadBalancerObserverFactory;
this.outlierDetectorConfig = requireNonNull(outlierDetectorConfig, "outlierDetectorConfig");
this.connectionSelectorFactory = requireNonNull(
Expand Down Expand Up @@ -167,7 +156,7 @@ public LoadBalancer<C> newLoadBalancer(
new XdsOutlierDetector<>(executor, outlierDetectorConfig, lbDescription);
}
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
DefaultHostPriorityStrategy::new, loadBalancingPolicy, subsetSize,
DefaultHostPriorityStrategy::new, loadBalancingPolicy, new RandomSubsetter(Integer.MAX_VALUE),
connectionSelectorFactory, connectionFactory,
loadBalancerObserverFactory, healthCheckConfig, outlierDetectorFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> randomSubsetSize(int randomSubsetSize) {
delegate = delegate.randomSubsetSize(randomSubsetSize);
return this;
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
@Nullable LoadBalancerObserverFactory loadBalancerObserverFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,6 @@ public interface LoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedConn
LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy);

/**
* Set the random host subset size for the load balancer.
* This is valuable for limiting the number of outgoing connections when calling services that have
* a very high replica count. It does so by selecting the specified number of hosts randomly from the total host
* set and routing traffic only to these hosts.
* @param randomSubsetSize the maximum number of healthy hosts to establish connections to
* @return {@code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> randomSubsetSize(int randomSubsetSize);

/**
* Set the {@link LoadBalancerObserverFactory} to use with this load balancer.
* @param loadBalancerObserverFactory the {@link LoadBalancerObserverFactory} to use, or {@code null} to not use an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
*/
interface PrioritizedHost {

/**
* A random seed given to a host on it's initial creation.
* @return a random seed given to a host on it's initial creation.
*/
long randomSeed();

/**
* The current priority of the host.
* @return the current priority of the host.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;

/**
* A {@link Subsetter} that takes a random subset of the provided hosts.
*/
final class RandomSubsetter implements Subsetter {

private final int randomSubsetSize;

RandomSubsetter(int randomSubsetSize) {
this.randomSubsetSize = ensurePositive(randomSubsetSize, "randomSubsetSize");
}

@Override
public <T extends PrioritizedHost> List<T> subset(List<T> nextHosts) {
if (nextHosts.size() <= randomSubsetSize) {
return nextHosts;
}

// We need to sort, and then return the list with the subsetSize number of healthy elements.
List<T> result = new ArrayList<>(nextHosts);
result.sort(Comparator.comparingLong(PrioritizedHost::randomSeed));

// We don't want to consider the unhealthy elements to be a part of our subset, so we're going to grow it
// to account for un-health endpoints. However, we need to know how many that is.
for (int i = 0, healthyCount = 0; i < result.size(); i++) {
if (result.get(i).isHealthy()) {
++healthyCount;
if (healthyCount == randomSubsetSize) {
// Trim elements after i to form the subset.
while (result.size() > i + 1) {
result.remove(result.size() - 1);
}
break;
}
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import java.util.List;

/**
* An abstraction for picking a subset of hosts to use for load balancing purposes.
*/
interface Subsetter {

/**
* Subset the provided host list into the list of hosts that will be used for load balancing.
* @param hosts the eligible list of hosts.
* @return the list of hosts that should be used for load balancing.
* @param <T> the type of the {@link PrioritizedHost}
*/
<T extends PrioritizedHost> List<T> subset(List<T> hosts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
Expand Down Expand Up @@ -280,6 +281,7 @@ private static Matcher<Double> approxEqual(double expected) {
private static class TestPrioritizedHost implements PrioritizedHost {

private final String address;
private final long randomSeed = ThreadLocalRandom.current().nextLong();

private boolean isHealthy = true;
private int priority;
Expand All @@ -293,6 +295,11 @@ String address() {
return address;
}

@Override
public long randomSeed() {
return randomSeed;
}

@Override
public int priority() {
return priority;
Expand Down
Loading

0 comments on commit 10133e7

Please sign in to comment.