Skip to content

Commit

Permalink
[server] Add ability to configure Helix InstanceOperation to UNKNOWN (#…
Browse files Browse the repository at this point in the history
…1229)

For a participant to auto-register with Helix without causing a rebalance every time a new participant joins the cluster (i.e. during deployment), we need to set the instance operation to UNKNOWN. Then these participants would be ENABLED in a batch, so it only rebalances once.
  • Loading branch information
kvargha authored Oct 15, 2024
1 parent a1d33e4 commit 776149e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_ENABLE_PARALLEL_BATCH_GET;
import static com.linkedin.venice.ConfigKeys.SERVER_FORKED_PROCESS_JVM_ARGUMENT_LIST;
import static com.linkedin.venice.ConfigKeys.SERVER_GLOBAL_RT_DIV_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_HELIX_JOIN_AS_UNKNOWN;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_HEADER_TABLE_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INITIAL_WINDOW_SIZE;
Expand Down Expand Up @@ -338,6 +339,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
* number of worker threads for the netty listener. If not specified, netty uses twice cpu count.
*/
private final int nettyWorkerThreadCount;
private final boolean helixJoinAsUnknown;
private final int grpcWorkerThreadCount;

private final long databaseSyncBytesIntervalForTransactionalMode;
Expand Down Expand Up @@ -618,6 +620,7 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
.getInt(PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE, topicManagerMetadataFetcherConsumerPoolSize);
nettyGracefulShutdownPeriodSeconds = serverProperties.getInt(SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 30);
nettyWorkerThreadCount = serverProperties.getInt(SERVER_NETTY_WORKER_THREADS, 0);
helixJoinAsUnknown = serverProperties.getBoolean(SERVER_HELIX_JOIN_AS_UNKNOWN, false);
grpcWorkerThreadCount =
serverProperties.getInt(GRPC_SERVER_WORKER_THREAD_COUNT, Runtime.getRuntime().availableProcessors());

Expand Down Expand Up @@ -1101,6 +1104,10 @@ public int getNettyWorkerThreadCount() {
return nettyWorkerThreadCount;
}

public boolean isHelixJoinAsUnknownEnabled() {
return helixJoinAsUnknown;
}

public int getGrpcWorkerThreadCount() {
return grpcWorkerThreadCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -158,13 +161,34 @@ private ThreadPoolExecutor initHelixStateTransitionThreadPool(int size, String t
return helixStateTransitionThreadPool;
}

public HelixManagerProperty buildHelixManagerProperty(VeniceServerConfig config) {
InstanceConfig.Builder defaultInstanceConfigBuilder =
new InstanceConfig.Builder().setPort(Integer.toString(config.getListenerPort()));

// For a participant to auto-register with Helix without causing a rebalance everytime a new participant joins
// the cluster (i.e. during deployment), we need to set the instance operation to UNKNOWN. Then these participants
// would be ENABLED in a batch, so it only rebalances once.
if (config.isHelixJoinAsUnknownEnabled()) {
defaultInstanceConfigBuilder.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
}

return new HelixManagerProperty.Builder().setDefaultInstanceConfigBuilder(defaultInstanceConfigBuilder).build();
}

@Override
public boolean startInner() {
LOGGER.info("Attempting to start HelixParticipation service");
VeniceServerConfig config = veniceConfigLoader.getVeniceServerConfig();
HelixManagerProperty helixManagerProperty = buildHelixManagerProperty(config);
helixManager = new SafeHelixManager(
HelixManagerFactory.getZKHelixManager(clusterName, this.participantName, InstanceType.PARTICIPANT, zkAddress));
new ZKHelixManager(
clusterName,
this.participantName,
InstanceType.PARTICIPANT,
zkAddress,
null,
helixManagerProperty));

VeniceServerConfig config = veniceConfigLoader.getVeniceServerConfig();
leaderFollowerHelixStateTransitionThreadPool = initHelixStateTransitionThreadPool(
config.getMaxLeaderFollowerStateTransitionThreadNumber(),
"Venice-L/F-state-transition");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.linkedin.davinci.helix;

import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import java.util.Arrays;
Expand Down Expand Up @@ -40,4 +43,15 @@ public void testRestAllInstanceCVStates() {
verify(mockAccessor).deleteReplicaStatus(resourceV2, 2);
verify(mockAccessor).deleteReplicaStatus(resourceV2, 3);
}

@Test
public void testUnknownHelixInstanceOperation() {
HelixParticipationService mockHelixParticipationService = mock(HelixParticipationService.class);
VeniceServerConfig mockServerConfig = mock(VeniceServerConfig.class);

when(mockServerConfig.isHelixJoinAsUnknownEnabled()).thenReturn(true);

doCallRealMethod().when(mockHelixParticipationService).buildHelixManagerProperty(mockServerConfig);
mockHelixParticipationService.buildHelixManagerProperty(mockServerConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ private ConfigKeys() {
public static final String SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS =
"server.netty.graceful.shutdown.period.seconds";
public static final String SERVER_NETTY_WORKER_THREADS = "server.netty.worker.threads";
/**
* Whether to join a Helix cluster in an UNKNOWN state
*/
public static final String SERVER_HELIX_JOIN_AS_UNKNOWN = "server.helix.join.as.unknown";

/**
* This config key is a misspelling. It is now considered deprecated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -352,4 +354,22 @@ public void testCloudConfig() {
task.call();
}
}

@Test
public void testHelixUnknownInstanceOperation() {
try (VeniceClusterWrapper venice = ServiceFactory.getVeniceCluster(0, 0, 0, 1);
HelixAsAServiceWrapper helixAsAServiceWrapper = startAndWaitForHAASToBeAvailable(venice.getZk().getAddress())) {
VeniceControllerWrapper controllerWrapper =
venice.addVeniceController(enableControllerAndStorageClusterHAASProperties);
Properties serverProperties = new Properties();
serverProperties.put(ConfigKeys.SERVER_HELIX_JOIN_AS_UNKNOWN, true);
venice.addVeniceServer(new Properties(), serverProperties);

HelixAdmin helixAdmin = controllerWrapper.getVeniceHelixAdmin().getHelixAdmin();
String clusterName = venice.getClusterName();
List<String> instances = helixAdmin.getInstancesInCluster(clusterName);
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, instances.get(0));
assertEquals(instanceConfig.getInstanceOperation().getOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
}
}
}

0 comments on commit 776149e

Please sign in to comment.