Skip to content

Commit

Permalink
Skip disabled system store
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Dec 6, 2024
1 parent 014e265 commit bd3bc34
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ public static void main(String[] args) throws Exception {
break;
case DUMP_HOST_HEARTBEAT:
dumpHostHeartbeat(cmd);
case RUN_CLUSTER_COMMAND:
case CLUSTER_BATCH_TASK:
runClusterCommand(cmd);
break;
default:
Expand Down Expand Up @@ -852,9 +852,9 @@ private static void deleteStore(CommandLine cmd) throws IOException {
}

private static void runClusterCommand(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.RUN_CLUSTER_COMMAND);
String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.RUN_CLUSTER_COMMAND);
String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.RUN_CLUSTER_COMMAND);
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.CLUSTER_BATCH_TASK);
String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.CLUSTER_BATCH_TASK);
String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.CLUSTER_BATCH_TASK);
int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1"));
System.out.println(
"[**** Cluster Command Params ****] Cluster: " + clusterName + ", Task: " + task + ", Checkpoint: "
Expand All @@ -876,11 +876,13 @@ private static void runClusterCommand(CommandLine cmd) {

// Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing.
try {
Path filePath = Paths.get(checkpointFile).toAbsolutePath();
if (!Files.exists(filePath)) {
System.out.println("Checkpoint file path does not exist, will create a new checkpoint file: " + filePath);
Path checkpointFilePath = Paths.get(checkpointFile);
if (!Files.exists(checkpointFilePath.toAbsolutePath())) {
System.out.println(
"Checkpoint file path does not exist, will create a new checkpoint file: "
+ checkpointFilePath.toAbsolutePath());
} else {
List<String> fileLines = Files.readAllLines(Paths.get(checkpointFile));
List<String> fileLines = Files.readAllLines(checkpointFilePath);
for (String line: fileLines) {
String storeName = line.split(",")[0];
// For now, it is boolean to start with, we can add more states to support retry.
Expand All @@ -897,9 +899,9 @@ private static void runClusterCommand(CommandLine cmd) {
List<String> taskList =
progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList());

// Validate task type.
// Validate task type. For now, we only has one task, if we have more task in the future, we can extend this logic.
Supplier<Function<String, Boolean>> functionSupplier;
if ("PushSystemStore".equals(task)) {
if (SystemStorePushTask.TASK_NAME.equals(task)) {
functionSupplier = () -> new SystemStorePushTask(controllerClient, controllerClientMap, clusterName);
} else {
System.out.println("Undefined task: " + task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.TASK_NAME;
import static com.linkedin.venice.Arg.THREAD_COUNT;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
import static com.linkedin.venice.Arg.VALUE_SCHEMA;
Expand Down Expand Up @@ -204,8 +204,8 @@ public enum Command {
"backfill-system-stores", "Create system stores of a given type for user stores in a cluster",
new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE }
),
RUN_CLUSTER_COMMAND(
"run-cluster-command", "Run specific task for all user stores in a cluster",
CLUSTER_BATCH_TASK(
"cluster-batch-task", "Run specific task against all user stores in a cluster in parallel",
new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT }
),
SET_VERSION(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* It will also skip empty push to store which is being migrated and is in the destination cluster.
*/
public class SystemStorePushTask implements Function<String, Boolean> {
public static final String TASK_NAME = "PushSystemStore";
private static final Logger LOGGER = LogManager.getLogger(SystemStorePushTask.class);
private static final int JOB_POLLING_RETRY_COUNT = 200;
private static final int JOB_POLLING_RETRY_PERIOD_IN_SECONDS = 5;
Expand Down Expand Up @@ -65,6 +66,19 @@ public Boolean apply(String storeName) {

for (VeniceSystemStoreType type: SYSTEM_STORE_TYPE) {
String systemStoreName = type.getSystemStoreName(storeName);
/**
* In current implementation, a push to system store will flip the flag to true, which can introduce unexpected
* behavior to the store. Here, we skip the system store push if it is turned off.
*/
boolean isSystemStoreEnabled = VeniceSystemStoreType.META_STORE.equals(type)
? storeResponse.getStore().isStoreMetaSystemStoreEnabled()
: storeResponse.getStore().isDaVinciPushStatusStoreEnabled();
if (!isSystemStoreEnabled) {
LOGGER.warn(
"{} System store: {} is disabled. Will skip the push.",
SYSTEM_STORE_PUSH_TASK_LOG_PREFIX,
systemStoreName);
}
VersionResponse response = parentControllerClient.getStoreLargestUsedVersion(clusterName, systemStoreName);
if (response.isError()) {
LOGGER.error(
Expand Down

0 comments on commit bd3bc34

Please sign in to comment.