Skip to content

Commit

Permalink
[admin-tool] Add a cluster batch processing framework command and a s…
Browse files Browse the repository at this point in the history
…ystem store empty push task
  • Loading branch information
sixpluszero committed Oct 21, 2024
1 parent 9a3f892 commit 53d2c05
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.io.BufferedReader;
import java.io.Console;
Expand All @@ -122,6 +123,7 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.time.Duration;
Expand All @@ -142,6 +144,10 @@
import java.util.Set;
import java.util.StringJoiner;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -573,6 +579,9 @@ public static void main(String[] args) throws Exception {
case EXTRACT_VENICE_ZK_PATHS:
extractVeniceZKPaths(cmd);
break;
case RUN_CLUSTER_COMMAND:
runClusterCommand(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -837,6 +846,81 @@ private static void deleteStore(CommandLine cmd) throws IOException {
printObject(response);
}

private static void runClusterCommand(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.RUN_CLUSTER_COMMAND);
String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.RUN_CLUSTER_COMMAND);
String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.RUN_CLUSTER_COMMAND);
int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1"));
System.out.println(
"[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: "
+ checkpointFile + ", Parallelism: " + parallelism);

// Create child data center controller client map.
ChildAwareResponse childAwareResponse = controllerClient.listChildControllers(clusterName);
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName, childAwareResponse);

// Fetch list cluster store list from parent region.
Map<String, Boolean> progressMap = new VeniceConcurrentHashMap<>();
MultiStoreResponse clusterStoreResponse = controllerClient.queryStoreList(false);
if (clusterStoreResponse.isError()) {
throw new VeniceException("Unable to fetch cluster store list: " + clusterStoreResponse.getError());
}
for (String storeName: clusterStoreResponse.getStores()) {
progressMap.put(storeName, Boolean.FALSE);
}

// Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing.
try {
Path filePath = Paths.get(checkpointFile).toAbsolutePath();
if (!Files.exists(filePath)) {
System.out.println("Checkpoint file path does not exist, will create a new checkpoint file: " + filePath);
} else {
List<String> fileLines = Files.readAllLines(Paths.get(checkpointFile));
for (String line: fileLines) {
String storeName = line.split(",")[0];
// For now, it is boolean to start with, we can add more states to support retry.
boolean status = false;
if (line.split(",").length > 1) {
status = Boolean.parseBoolean(line.split(",")[1]);
}
progressMap.put(storeName, status);
}
}
} catch (IOException e) {
throw new VeniceException(e);
}
List<String> taskList =
progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList());

// Validate task type.
Supplier<Function<String, Boolean>> functionSupplier;
if ("PushSystemStore".equals(task)) {
functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName);
} else {
System.out.println("Undefined task: " + task);
return;
}

// Create thread pool and start parallel processing.
ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
ClusterTaskRunner clusterTaskRunner =
new ClusterTaskRunner(progressMap, checkpointFile, taskList, functionSupplier.get());
futureList.add(executorService.submit(clusterTaskRunner));
}
for (int i = 0; i < parallelism; i++) {
try {
futureList.get(i).get();
System.out.println("Cluster task completed for thread : " + i);
} catch (InterruptedException | ExecutionException e) {
System.out.println(e.getMessage());
executorService.shutdownNow();
}
}
executorService.shutdownNow();
}

private static void backfillSystemStores(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.BACKFILL_SYSTEM_STORES);
String systemStoreType = getRequiredArgument(cmd, Arg.SYSTEM_STORE_TYPE, Command.BACKFILL_SYSTEM_STORES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ public enum Arg {
SYSTEM_STORE_TYPE(
"system-store-type", "sst", true,
"Type of system store to backfill. Supported types are davinci_push_status_store and meta_store"
), RETRY("retry", "r", false, "Retry this operation"),
), TASK_NAME("task-name", "tn", true, "Name of the task for cluster command. Supported command [PushSystemStore]."),
CHECKPOINT_FILE("checkpoint-file", "cf", true, "Checkpoint file path for cluster command."),
THREAD_COUNT("thread-count", "tc", true, "Number of threads to execute. 1 if not specified"),
RETRY("retry", "r", false, "Retry this operation"),
DISABLE_LOG("disable-log", "dl", false, "Disable logs from internal classes. Only print command output on console"),
STORE_VIEW_CONFIGS(
"storage-view-configs", "svc", true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.linkedin.venice;

import com.linkedin.venice.exceptions.VeniceException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is a simple runnable which keeps fetching task from list and execute the assigned task. The task fetching
* and progress tracking / checkpointing is thread-safe, so it can be run in parallel.
*/
public class ClusterTaskRunner implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ClusterTaskRunner.class);
private static final String TASK_LOG_PREFIX = "[**** TASK INFO ****]";

private static final ReentrantLock LOCK = new ReentrantLock();
private static final AtomicInteger INDEX = new AtomicInteger(-1);
private final List<String> taskList;
private final Function<String, Boolean> storeRunnable;
private final Map<String, Boolean> progressMap;
private final String checkpointFile;

public ClusterTaskRunner(
Map<String, Boolean> progressMap,
String checkpointFile,
List<String> taskList,
Function<String, Boolean> storeRunnable) {
this.taskList = taskList;
this.storeRunnable = storeRunnable;
this.progressMap = progressMap;
this.checkpointFile = checkpointFile;
}

@Override
public void run() {
while (true) {
int fetchedTaskIndex = INDEX.incrementAndGet();
if (fetchedTaskIndex >= taskList.size()) {
LOGGER.info("Cannot find new store from queue, will exit.");
break;
}
String store = taskList.get(fetchedTaskIndex);
try {
LOGGER.info("{} Running store job: {} for store: {}", TASK_LOG_PREFIX, fetchedTaskIndex + 1, store);
boolean result = storeRunnable.apply(store);
if (result) {
LOGGER.info(
"{} Complete store task for job: {}/{} store: {}",
TASK_LOG_PREFIX,
fetchedTaskIndex + 1,
taskList.size(),
store);
progressMap.put(store, true);
} else {
LOGGER.info(
"{} Failed store task for job: {}/{} store: {}",
TASK_LOG_PREFIX,
fetchedTaskIndex + 1,
taskList.size(),
store);
}
// Periodically update the checkpoint file.
if ((fetchedTaskIndex % 100) == 0) {
LOGGER.info("{} Preparing to checkpoint status at index {}", TASK_LOG_PREFIX, fetchedTaskIndex);
checkpoint(checkpointFile);
}
} catch (Exception e) {
LOGGER.info("{} Caught exception: {}. Will exit.", TASK_LOG_PREFIX, e.getMessage());
}
}
// Perform one final checkpointing before existing the runnable.
checkpoint(checkpointFile);
}

public void checkpoint(String checkpointFile) {
try {
LOCK.lock();
LOGGER.info("Updating checkpoint...");

List<String> status =
progressMap.entrySet().stream().map(e -> e.getKey() + "," + e.getValue()).collect(Collectors.toList());
Files.write(Paths.get(checkpointFile), status);
LOGGER.info("Updated checkpoint...");

} catch (IOException e) {
throw new VeniceException(e);
} finally {
LOCK.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.Arg.BATCH_GET_LIMIT;
import static com.linkedin.venice.Arg.BLOB_TRANSFER_ENABLED;
import static com.linkedin.venice.Arg.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOUR;
import static com.linkedin.venice.Arg.CHECKPOINT_FILE;
import static com.linkedin.venice.Arg.CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED;
import static com.linkedin.venice.Arg.CHUNKING_ENABLED;
import static com.linkedin.venice.Arg.CLIENT_DECOMPRESSION_ENABLED;
Expand Down Expand Up @@ -119,6 +120,8 @@
import static com.linkedin.venice.Arg.STORE_TYPE;
import static com.linkedin.venice.Arg.STORE_VIEW_CONFIGS;
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TASK_NAME;
import static com.linkedin.venice.Arg.THREAD_COUNT;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
import static com.linkedin.venice.Arg.VALUE_SCHEMA;
Expand Down Expand Up @@ -192,6 +195,10 @@ public enum Command {
"backfill-system-stores", "Create system stores of a given type for user stores in a cluster",
new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE }
),
RUN_CLUSTER_COMMAND(
"run-cluster-command", "Run specific task for all user stores in a cluster",
new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT }
),
SET_VERSION(
"set-version", "Set the version that will be served", new Arg[] { URL, STORE, VERSION }, new Arg[] { CLUSTER }
), ADD_SCHEMA("add-schema", "", new Arg[] { URL, STORE, VALUE_SCHEMA }, new Arg[] { CLUSTER }),
Expand Down
Loading

0 comments on commit 53d2c05

Please sign in to comment.