Skip to content

Commit

Permalink
Clear local logged stores if input checkpoints are empty
Browse files Browse the repository at this point in the history
  • Loading branch information
ajothomas committed Dec 18, 2024
1 parent 63c86b5 commit ea4ec8c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -274,7 +275,11 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
}
taskCheckpoints.put(taskName, taskCheckpoint);

// Only insert non-null checkpoints
if (taskCheckpoint != null) {
taskCheckpoints.put(taskName, taskCheckpoint);
}

Map<String, Set<String>> backendFactoryToStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
Expand Down Expand Up @@ -308,6 +313,15 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});

// if we have received no input checkpoints, it can only be due to two reasons:
// a) Samza job is new, so it has no previous checkpoints.
// b) The checkpoints were cleared.
// We should be able to safely clear local logged stores in either case
if (taskCheckpoints.isEmpty()) {
LOG.info("No checkpoints read. Attempting to clear logged stores.");
clearLoggedStores(loggedStoreBaseDirectory);
}

// Init all taskRestores and if successful, restores all the task stores concurrently
LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
CompletableFuture<Map<TaskName, Checkpoint>> initRestoreAndNewCheckpointFuture =
Expand Down Expand Up @@ -357,6 +371,19 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
return taskCheckpoints;
}

private static void clearLoggedStores(File loggedStoreBaseDir) {
final FileUtil fileUtil = new FileUtil();
final File[] storeDirs = loggedStoreBaseDir.listFiles();
if (storeDirs == null || storeDirs.length == 0) {
LOG.info("No stores to delete");
return;
}
for (File storeDir: storeDirs) {
LOG.info("Clearing store dir {} from logged stores.", storeDir);
fileUtil.rm(storeDir);
}
}

/**
* Get the {@link StorageEngine} instance with a given name for a given task.
* @param taskName the task name for which the storage engine is desired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -102,11 +103,10 @@
@RunWith(PowerMockRunner.class)
@PrepareForTest({ReflectionUtil.class, ContainerStorageManagerRestoreUtil.class})
public class TestContainerStorageManager {

private static final String STORE_NAME = "store";
private static final String SYSTEM_NAME = "kafka";
private static final String STREAM_NAME = "store-stream";
private static final File DEFAULT_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "store");
private static final File DEFAULT_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "store");
private static final File
DEFAULT_LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore");

Expand All @@ -116,6 +116,7 @@ public class TestContainerStorageManager {
private SamzaContainerMetrics samzaContainerMetrics;
private Map<TaskName, TaskModel> tasks;
private StandbyTestContext testContext;
private CheckpointManager checkpointManager;

private volatile int systemConsumerCreationCount;
private volatile int systemConsumerStartCount;
Expand Down Expand Up @@ -143,7 +144,7 @@ private void addMockedTask(String taskname, int changelogPartition) {
* Method to create a containerStorageManager with mocked dependencies
*/
@Before
public void setUp() throws InterruptedException {
public void setUp() throws InterruptedException, IOException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
Expand Down Expand Up @@ -248,7 +249,7 @@ public Void answer(InvocationOnMock invocation) {
.thenReturn(
new scala.collection.immutable.Map.Map1(new SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));

CheckpointManager checkpointManager = mock(CheckpointManager.class);
this.checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));

SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
Expand Down Expand Up @@ -320,6 +321,40 @@ public void testParallelismAndMetrics() throws InterruptedException {
Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount);
}

/**
* This test will attempt to verify if logged stores are deleted if the input checkpoints are empty.
* */
@Test
@SuppressWarnings("ResultOfMethodCallIgnored")
public void testDeleteLoggedStoreOnNoCheckpoints() {
// reset the mock to reset the stubs in setup method
reset(this.checkpointManager);
// redo stubbing to return null checkpoints
when(this.checkpointManager.readLastCheckpoint(any())).thenReturn(null);
// create store under logged stores to demonstrate deletion
final File storeFile = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME);
// add contents to store
final File storeFilePartition = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME + File.separator + "Partition_0");
storeFilePartition.deleteOnExit();
storeFile.deleteOnExit();
try {
storeFile.mkdirs();
storeFilePartition.createNewFile();
Assert.assertTrue("Assert that stores are present prior to the test.", storeFile.exists());
Assert.assertTrue("Assert that store files are present prior to the test.", storeFilePartition.exists());
this.containerStorageManager.start();
this.containerStorageManager.shutdown();
Assert.assertFalse("Assert that stores are deleted after the test.", storeFile.exists());
Assert.assertFalse("Assert that store files are deleted after the test.", storeFile.exists());
} catch (Exception e) {
System.out.printf("File %s could not be created.", storeFile);
Assert.fail();
} finally {
storeFilePartition.delete();
storeFile.delete();
}
}

@Test
public void testNoConfiguredDurableStores() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();
Expand Down

0 comments on commit ea4ec8c

Please sign in to comment.