Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Admintool #1343

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,9 @@ public static void main(String[] args) throws Exception {
case DUMP_HOST_HEARTBEAT:
dumpHostHeartbeat(cmd);
break;
case CREATE_REAL_TIME_TOPIC:
createRealTimeTopic(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -1176,6 +1179,7 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet);
integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet);
booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet);
genericParam(cmd, Arg.REAL_TIME_TOPIC_NAME, s -> s, params::setRealTimeTopicName, argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
Expand Down Expand Up @@ -3396,4 +3400,10 @@ private static PubSubConsumerAdapter getConsumer(
.create(new VeniceProperties(consumerProps), false, pubSubMessageDeserializer, "admin-tool-topic-dumper");
}

private static void createRealTimeTopic(CommandLine cmd) {
String storeName = getRequiredArgument(cmd, Arg.STORE, Command.CREATE_REAL_TIME_TOPIC);
String partitionNum = getOptionalArgument(cmd, Arg.PARTITION_COUNT);
PartitionResponse response = controllerClient.createRealTimeTopic(storeName, partitionNum);
printSuccess(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public enum Arg {
),
DAVINCI_HEARTBEAT_REPORTED(
"dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats"
), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config");
), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"),
REAL_TIME_TOPIC_NAME("real-time-topic-name", "rttn", true, "Create and set a new real time topic");

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ public enum Command {
"dump-host-heartbeat",
"Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.",
new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED }
),
CREATE_REAL_TIME_TOPIC(
"create-real-time-topic", "Create a real time topic for an existing store", new Arg[] { URL, STORE },
new Arg[] { CLUSTER, PARTITION_COUNT }
);

private final String commandName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,11 @@ public StoppableNodeStatusResponse getAggregatedHealthStatus(
requestString.getBytes());
}

public PartitionResponse createRealTimeTopic(String storeName, String partitionNum) {
QueryParams params = newParams().add(NAME, storeName).add(PARTITION_COUNT, partitionNum);
return request(ControllerRoute.CREATE_REAL_TIME_TOPIC, params, PartitionResponse.class);
}

public MultiNodesStatusResponse listInstancesStatuses(boolean enableReplicas) {
QueryParams params = newParams();
if (enableReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ public enum ControllerRoute {
DELETE_UNUSED_VALUE_SCHEMAS(
"/delete_unused_value_schemas", HttpMethod.POST, Arrays.asList(CLUSTER, NAME),
ControllerApiConstants.VALUE_SCHEMA_IDS
), GET_INUSE_SCHEMA_IDS("/get_inuse_schema_ids", HttpMethod.GET, Arrays.asList(CLUSTER, NAME));
), GET_INUSE_SCHEMA_IDS("/get_inuse_schema_ids", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)),
CREATE_REAL_TIME_TOPIC("/create_real_time_topic", HttpMethod.POST, Arrays.asList(NAME, PARTITION_COUNT));

private final String path;
private final HttpMethod httpMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,29 @@ private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, Stri
return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName;
}

public static String createNewRealTimeTopicName(String oldRealTimeTopicName) {
if (oldRealTimeTopicName == null || !oldRealTimeTopicName.endsWith(Version.REAL_TIME_TOPIC_SUFFIX)) {
throw new IllegalArgumentException("Invalid old name format");
}

// Extract the base name and current version
String base =
oldRealTimeTopicName.substring(0, oldRealTimeTopicName.length() - Version.REAL_TIME_TOPIC_SUFFIX.length());
String[] parts = base.split("_v");

String newBase;
if (parts.length == 2) {
// Increment the version
int version = Integer.parseInt(parts[1]) + 1;
newBase = parts[0] + "_v" + version;
} else {
// Start with version 2
newBase = base + "_v2";
}

return newBase + Version.REAL_TIME_TOPIC_SUFFIX;
}

private static class TimeUnitInfo {
String suffix;
int multiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.InstanceStatus;
import com.linkedin.venice.meta.Store;
Expand Down Expand Up @@ -1120,6 +1121,66 @@ public void testCleanupInstanceCustomizedStates() {
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testCreateRealTimeTopicCall() throws IOException, ExecutionException, InterruptedException {
String clusterName = venice.getClusterNames()[0];
String storeName = Utils.getUniqueString("testCreateRealTimeTopicCall");
CloseableHttpAsyncClient httpClient = HttpClientUtils.getMinimalHttpClient(1, 1, Optional.empty());
httpClient.start();

VeniceHelixAdmin childControllerAdmin = venice.getChildRegions().get(0).getRandomController().getVeniceHelixAdmin();
childControllerAdmin.createStore(clusterName, storeName, "test", "\"string\"", "\"string\"");

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(3)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);

childControllerAdmin.updateStore(clusterName, storeName, updateStoreParams);
childControllerAdmin.incrementVersionIdempotent(clusterName, storeName, "1", 1, 1);

// API call with all fields
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair(ControllerApiConstants.CLUSTER, clusterName));
params.add(new BasicNameValuePair(ControllerApiConstants.NAME, storeName));
params.add(new BasicNameValuePair(ControllerApiConstants.PARTITION_COUNT, "8"));

makeRealTimeTopicCall(httpClient, params);

childControllerAdmin.incrementVersionIdempotent(clusterName, storeName, "2", 1, 1);
this.controllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

List<Version> versions = childControllerAdmin.getStore(clusterName, storeName).getVersions();
Assert.assertEquals(versions.size(), 2);

String oldRealTimeTopicName = Utils.getRealTimeTopicName(versions.get(0));
String newRealTimeTopicName = Utils.getRealTimeTopicName(versions.get(1));

Assert.assertNotEquals(oldRealTimeTopicName, newRealTimeTopicName);
Assert.assertTrue(
childControllerAdmin.getTopicManager()
.containsTopic(childControllerAdmin.getPubSubTopicRepository().getTopic(newRealTimeTopicName)));

httpClient.close();
}

private void makeRealTimeTopicCall(HttpAsyncClient httpClient, List<NameValuePair> params)
throws IOException, ExecutionException, InterruptedException {
// StringEntity entity = new StringEntity(OBJECT_MAPPER.writeValueAsString(payloads), ContentType.APPLICATION_JSON);

final HttpPost post = new HttpPost(
venice.getChildRegions().get(0).getControllerConnectString()
+ ControllerRoute.CREATE_REAL_TIME_TOPIC.getPath());
post.setEntity(new UrlEncodedFormEntity(params));
HttpResponse httpResponse = httpClient.execute(post, null).get();

Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200);
String responseString = IOUtils.toString(httpResponse.getEntity().getContent());
}

private void deleteStore(String storeName) {
parentControllerClient.enableStoreReadWrites(storeName, false);
parentControllerClient.deleteStore(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4799,7 +4799,8 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
Optional<Long> hybridTimeLagThreshold = params.getHybridTimeLagThreshold();
Optional<DataReplicationPolicy> hybridDataReplicationPolicy = params.getHybridDataReplicationPolicy();
Optional<BufferReplayPolicy> hybridBufferReplayPolicy = params.getHybridBufferReplayPolicy();
Optional<String> realTimeTopicName = params.getRealTimeTopicName();
Optional<String> realTimeTopicName = Optional.empty(); // to update real time topic name, there is a separate admin
// tool command
Optional<Boolean> accessControlled = params.getAccessControlled();
Optional<CompressionStrategy> compressionStrategy = params.getCompressionStrategy();
Optional<Boolean> clientDecompressionEnabled = params.getClientDecompressionEnabled();
Expand Down Expand Up @@ -6447,7 +6448,7 @@ private void throwStoreAlreadyExists(String clusterName, String storeName) {
throw new VeniceStoreAlreadyExistsException(storeName, clusterName);
}

private void throwStoreDoesNotExist(String clusterName, String storeName) {
public static void throwStoreDoesNotExist(String clusterName, String storeName) {
String errorMessage = "Store:" + storeName + " does not exist in cluster:" + clusterName;
LOGGER.error(errorMessage);
throw new VeniceNoStoreException(storeName, clusterName);
Expand All @@ -6462,7 +6463,7 @@ private void throwClusterNotInitialized(String clusterName) {
throw new VeniceNoClusterException(clusterName);
}

private void logAndThrow(String msg) {
public static void logAndThrow(String msg) {
LOGGER.info(msg);
throw new VeniceException(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE;
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION;
import static com.linkedin.venice.controllerapi.ControllerRoute.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_REAL_TIME_TOPIC;
import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_STORAGE_PERSONA;
import static com.linkedin.venice.controllerapi.ControllerRoute.ClUSTER_HEALTH_INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerRoute.DATA_RECOVERY;
Expand Down Expand Up @@ -650,6 +651,10 @@ public boolean startInner() throws Exception {
CLEANUP_INSTANCE_CUSTOMIZED_STATES.getPath(),
new VeniceParentControllerRegionStateHandler(admin, clusterRoutes.cleanupInstanceCustomizedStates(admin)));

httpService.post(
CREATE_REAL_TIME_TOPIC.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.createRealTimeTopic(admin)));

httpService.awaitInitialization(); // Wait for server to be initialized
Exception e = initFailure.get();
if (e != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.controller.server;

import static com.linkedin.venice.controller.VeniceHelixAdmin.logAndThrow;
import static com.linkedin.venice.controller.VeniceHelixAdmin.throwStoreDoesNotExist;
import static com.linkedin.venice.controller.server.VeniceRouteHandler.ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER_DEST;
Expand All @@ -11,6 +13,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.OPERATION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.OWNER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PARTITION_COUNT;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PARTITION_DETAIL_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_OPERATION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_WRITE_OPERATION;
Expand All @@ -29,6 +32,7 @@
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE;
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION;
import static com.linkedin.venice.controllerapi.ControllerRoute.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_REAL_TIME_TOPIC;
import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_ALL_VERSIONS;
import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_KAFKA_TOPIC;
import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_STORE;
Expand Down Expand Up @@ -59,6 +63,8 @@
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.AdminCommandExecutionTracker;
import com.linkedin.venice.controller.HelixVeniceClusterResources;
import com.linkedin.venice.controller.VeniceControllerClusterConfig;
import com.linkedin.venice.controller.kafka.TopicCleanupService;
import com.linkedin.venice.controllerapi.ClusterStaleDataAuditResponse;
import com.linkedin.venice.controllerapi.ControllerResponse;
Expand Down Expand Up @@ -87,6 +93,7 @@
import com.linkedin.venice.exceptions.ResourceStillExistsException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.RegionPushDetails;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataAudit;
Expand All @@ -100,6 +107,7 @@
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.systemstore.schemas.StoreProperties;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1097,4 +1105,65 @@ public Route getHeartbeatFromSystemStore(Admin admin) {
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
};
}

public Route createRealTimeTopic(Admin admin) {
return new VeniceRouteHandler<>(StoreResponse.class) {
@Override
public void internalHandle(Request request, StoreResponse veniceResponse) {
if (!isAllowListUser(request)) {
veniceResponse.setError("Access Denied!! Only admins can change topic compaction policy!");
veniceResponse.setErrorType(ErrorType.BAD_REQUEST);
return;
}
AdminSparkServer.validateParams(request, CREATE_REAL_TIME_TOPIC.getParams(), admin);
try {
String clusterName = request.queryParams(CLUSTER);
String storeName = request.queryParams(NAME);
int partitionCount = Integer.parseInt(request.queryParams(PARTITION_COUNT));
String oldRealTimeTopicName = admin.getRealTimeTopic(clusterName, storeName);
String newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName);
PubSubTopic newRealTimeTopic = admin.getPubSubTopicRepository().getTopic(newRealTimeTopicName);

HelixVeniceClusterResources resources = admin.getHelixVeniceClusterResources(clusterName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
// The topic might be created by another thread already. Check before creating.
if (admin.getTopicManager().containsTopic(newRealTimeTopic)) {
return;
}
ReadWriteStoreRepository repository = resources.getStoreMetadataRepository();
Store store = repository.getStore(storeName);
if (store == null) {
throwStoreDoesNotExist(clusterName, storeName);
}
if (!store.isHybrid() && !store.isWriteComputationEnabled() && !store.isSystemStore()) {
logAndThrow("Store " + storeName + " is not hybrid, refusing to return a realtime topic");
}

VeniceControllerClusterConfig clusterConfig = admin.getHelixVeniceClusterResources(clusterName).getConfig();
admin.getTopicManager()
.createTopic(
newRealTimeTopic,
partitionCount,
clusterConfig.getKafkaReplicationFactorRTTopics(),
store.getRetentionTime(),
false,
// Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck
clusterConfig.getMinInSyncReplicasRealTimeTopics(),
false);
// TODO: if there is an online version from a batch push before this store was hybrid then we won't start
// replicating to it. A new version must be created.
LOGGER.warn(
"Creating real time topic per topic request for store: {}. "
+ "Buffer replay won't start for any existing versions",
storeName);
store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName);
repository.updateStore(store);
}

} catch (PubSubTopicDoesNotExistException e) {
veniceResponse.setError("Topic does not exist!! Message: " + e.getMessage());
}
}
};
}
}
Loading