Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[admin-tool] Add a cluster batch processing framework command and a system store empty push task #1254

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.io.BufferedReader;
import java.io.Console;
Expand All @@ -124,6 +125,7 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -142,6 +144,10 @@
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -584,6 +590,9 @@ public static void main(String[] args) throws Exception {
case DUMP_HOST_HEARTBEAT:
dumpHostHeartbeat(cmd);
break;
case CLUSTER_BATCH_TASK:
runClusterCommand(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -848,6 +857,83 @@ private static void deleteStore(CommandLine cmd) throws IOException {
printObject(response);
}

private static void runClusterCommand(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.CLUSTER_BATCH_TASK);
String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.CLUSTER_BATCH_TASK);
String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.CLUSTER_BATCH_TASK);
int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1"));
System.out.println(
"[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: "
+ checkpointFile + ", Parallelism: " + parallelism);

// Create child data center controller client map.
ChildAwareResponse childAwareResponse = controllerClient.listChildControllers(clusterName);
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName, childAwareResponse);

// Fetch list cluster store list from parent region.
Map<String, Boolean> progressMap = new VeniceConcurrentHashMap<>();
MultiStoreResponse clusterStoreResponse = controllerClient.queryStoreList(false);
if (clusterStoreResponse.isError()) {
throw new VeniceException("Unable to fetch cluster store list: " + clusterStoreResponse.getError());
}
for (String storeName: clusterStoreResponse.getStores()) {
progressMap.put(storeName, Boolean.FALSE);
}

// Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing.
try {
Path checkpointFilePath = Paths.get(checkpointFile);
if (!Files.exists(checkpointFilePath.toAbsolutePath())) {
System.out.println(
"Checkpoint file path does not exist, will create a new checkpoint file: "
+ checkpointFilePath.toAbsolutePath());
} else {
List<String> fileLines = Files.readAllLines(checkpointFilePath);
for (String line: fileLines) {
String storeName = line.split(",")[0];
// For now, it is boolean to start with, we can add more states to support retry.
boolean status = false;
if (line.split(",").length > 1) {
status = Boolean.parseBoolean(line.split(",")[1]);
}
progressMap.put(storeName, status);
}
}
} catch (IOException e) {
throw new VeniceException(e);
}
List<String> taskList =
progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList());

// Validate task type. For now, we only has one task, if we have more task in the future, we can extend this logic.
Supplier<Function<String, Boolean>> functionSupplier;
if (SystemStorePushTask.TASK_NAME.equals(task)) {
functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName);
} else {
System.out.println("Undefined task: " + task);
return;
}

// Create thread pool and start parallel processing.
ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
ClusterTaskRunner clusterTaskRunner =
new ClusterTaskRunner(progressMap, checkpointFile, taskList, functionSupplier.get());
futureList.add(executorService.submit(clusterTaskRunner));
}
for (int i = 0; i < parallelism; i++) {
try {
futureList.get(i).get();
System.out.println("Cluster task completed for thread : " + i);
} catch (InterruptedException | ExecutionException e) {
System.out.println(e.getMessage());
executorService.shutdownNow();
}
}
executorService.shutdownNow();
}

private static void backfillSystemStores(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.BACKFILL_SYSTEM_STORES);
String systemStoreType = getRequiredArgument(cmd, Arg.SYSTEM_STORE_TYPE, Command.BACKFILL_SYSTEM_STORES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ public enum Arg {
SYSTEM_STORE_TYPE(
"system-store-type", "sst", true,
"Type of system store to backfill. Supported types are davinci_push_status_store and meta_store"
), RETRY("retry", "r", false, "Retry this operation"),
), TASK_NAME("task-name", "tn", true, "Name of the task for cluster command. Supported command [PushSystemStore]."),
CHECKPOINT_FILE("checkpoint-file", "cf", true, "Checkpoint file path for cluster command."),
THREAD_COUNT("thread-count", "tc", true, "Number of threads to execute. 1 if not specified"),
RETRY("retry", "r", false, "Retry this operation"),
DISABLE_LOG("disable-log", "dl", false, "Disable logs from internal classes. Only print command output on console"),
STORE_VIEW_CONFIGS(
"storage-view-configs", "svc", true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.linkedin.venice;

import com.linkedin.venice.exceptions.VeniceException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is a simple runnable which keeps fetching task from list and execute the assigned task. The task fetching
* and progress tracking / checkpointing is thread-safe, so it can be run in parallel.
*/
public class ClusterTaskRunner implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ClusterTaskRunner.class);
private static final String TASK_LOG_PREFIX = "[**** TASK INFO ****]";

private static final ReentrantLock LOCK = new ReentrantLock();
private static final AtomicInteger INDEX = new AtomicInteger(-1);
private final List<String> taskList;
private final Function<String, Boolean> storeRunnable;
private final Map<String, Boolean> progressMap;
private final String checkpointFile;

public ClusterTaskRunner(
Map<String, Boolean> progressMap,
String checkpointFile,
List<String> taskList,
Function<String, Boolean> storeRunnable) {
this.taskList = taskList;
this.storeRunnable = storeRunnable;
this.progressMap = progressMap;
this.checkpointFile = checkpointFile;
}

@Override
public void run() {
while (true) {
int fetchedTaskIndex = INDEX.incrementAndGet();
if (fetchedTaskIndex >= taskList.size()) {
LOGGER.info("Cannot find new store from queue, will exit.");
break;
}
String store = taskList.get(fetchedTaskIndex);
try {
LOGGER.info("{} Running store job: {} for store: {}", TASK_LOG_PREFIX, fetchedTaskIndex + 1, store);
boolean result = storeRunnable.apply(store);
if (result) {
LOGGER.info(
"{} Complete store task for job: {}/{} store: {}",
TASK_LOG_PREFIX,
fetchedTaskIndex + 1,
taskList.size(),
store);
progressMap.put(store, true);
} else {
LOGGER.info(
"{} Failed store task for job: {}/{} store: {}",
TASK_LOG_PREFIX,
fetchedTaskIndex + 1,
taskList.size(),
store);
}
// Periodically update the checkpoint file.
if ((fetchedTaskIndex % 100) == 0) {
LOGGER.info("{} Preparing to checkpoint status at index {}", TASK_LOG_PREFIX, fetchedTaskIndex);
checkpoint(checkpointFile);
}
} catch (Exception e) {
LOGGER.info("{} Caught exception: {}. Will exit.", TASK_LOG_PREFIX, e.getMessage());
}
}
// Perform one final checkpointing before existing the runnable.
checkpoint(checkpointFile);
}

public void checkpoint(String checkpointFile) {
try {
LOCK.lock();
LOGGER.info("Updating checkpoint...");

List<String> status =
progressMap.entrySet().stream().map(e -> e.getKey() + "," + e.getValue()).collect(Collectors.toList());
Files.write(Paths.get(checkpointFile), status);
LOGGER.info("Updated checkpoint...");

} catch (IOException e) {
throw new VeniceException(e);
} finally {
LOCK.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.Arg.BATCH_GET_LIMIT;
import static com.linkedin.venice.Arg.BLOB_TRANSFER_ENABLED;
import static com.linkedin.venice.Arg.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOUR;
import static com.linkedin.venice.Arg.CHECKPOINT_FILE;
import static com.linkedin.venice.Arg.CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED;
import static com.linkedin.venice.Arg.CHUNKING_ENABLED;
import static com.linkedin.venice.Arg.CLIENT_DECOMPRESSION_ENABLED;
Expand Down Expand Up @@ -127,6 +128,8 @@
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.Arg.TASK_NAME;
import static com.linkedin.venice.Arg.THREAD_COUNT;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
Expand Down Expand Up @@ -201,6 +204,10 @@ public enum Command {
"backfill-system-stores", "Create system stores of a given type for user stores in a cluster",
new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE }
),
CLUSTER_BATCH_TASK(
"cluster-batch-task", "Run specific task against all user stores in a cluster in parallel",
new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT }
),
SET_VERSION(
"set-version", "Set the version that will be served", new Arg[] { URL, STORE, VERSION }, new Arg[] { CLUSTER }
), ADD_SCHEMA("add-schema", "", new Arg[] { URL, STORE, VALUE_SCHEMA }, new Arg[] { CLUSTER }),
Expand Down
Loading
Loading