diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index a2b3b51fba..4b995117a3 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -54,6 +54,7 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskInstanceCollector; import org.apache.samza.util.Clock; +import org.apache.samza.util.FileUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -274,7 +275,11 @@ private Map restoreStores() throws InterruptedException { taskCheckpoint = checkpointManager.readLastCheckpoint(taskName); LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName); } - taskCheckpoints.put(taskName, taskCheckpoint); + + // Only insert non-null checkpoints + if (taskCheckpoint != null) { + taskCheckpoints.put(taskName, taskCheckpoint); + } Map> backendFactoryToStoreNames = ContainerStorageManagerUtil.getBackendFactoryStoreNames( @@ -308,6 +313,15 @@ private Map restoreStores() throws InterruptedException { taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames); }); + // if we have received no input checkpoints, it can only be due to two reasons: + // a) Samza job is new, so it has no previous checkpoints. + // b) The checkpoints were cleared. + // We should be able to safely clear local logged stores in either case + if (taskCheckpoints.isEmpty()) { + LOG.info("No checkpoints read. Attempting to clear logged stores."); + clearLoggedStores(loggedStoreBaseDirectory); + } + // Init all taskRestores and if successful, restores all the task stores concurrently LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints); CompletableFuture> initRestoreAndNewCheckpointFuture = @@ -357,6 +371,19 @@ private Map restoreStores() throws InterruptedException { return taskCheckpoints; } + private static void clearLoggedStores(File loggedStoreBaseDir) { + final FileUtil fileUtil = new FileUtil(); + final File[] storeDirs = loggedStoreBaseDir.listFiles(); + if (storeDirs == null || storeDirs.length == 0) { + LOG.info("No stores to delete"); + return; + } + for (File storeDir: storeDirs) { + LOG.info("Clearing store dir {} from logged stores.", storeDir); + fileUtil.rm(storeDir); + } + } + /** * Get the {@link StorageEngine} instance with a given name for a given task. * @param taskName the task name for which the storage engine is desired. diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index b8996d4565..a79a3bfa04 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -102,7 +103,6 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({ReflectionUtil.class, ContainerStorageManagerRestoreUtil.class}) public class TestContainerStorageManager { - private static final String STORE_NAME = "store"; private static final String SYSTEM_NAME = "kafka"; private static final String STREAM_NAME = "store-stream"; @@ -116,6 +116,7 @@ public class TestContainerStorageManager { private SamzaContainerMetrics samzaContainerMetrics; private Map tasks; private StandbyTestContext testContext; + private CheckpointManager checkpointManager; private volatile int systemConsumerCreationCount; private volatile int systemConsumerStartCount; @@ -143,7 +144,7 @@ private void addMockedTask(String taskname, int changelogPartition) { * Method to create a containerStorageManager with mocked dependencies */ @Before - public void setUp() throws InterruptedException { + public void setUp() throws InterruptedException, IOException { taskRestoreMetricGauges = new HashMap<>(); this.tasks = new HashMap<>(); this.taskInstanceMetrics = new HashMap<>(); @@ -248,7 +249,7 @@ public Void answer(InvocationOnMock invocation) { .thenReturn( new scala.collection.immutable.Map.Map1(new SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata)); - CheckpointManager checkpointManager = mock(CheckpointManager.class); + this.checkpointManager = mock(CheckpointManager.class); when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>())); SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class); @@ -320,6 +321,40 @@ public void testParallelismAndMetrics() throws InterruptedException { Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount); } + /** + * This test will attempt to verify if logged stores are deleted if the input checkpoints are empty. + * */ + @Test + @SuppressWarnings("ResultOfMethodCallIgnored") + public void testDeleteLoggedStoreOnNoCheckpoints() { + // reset the mock to reset the stubs in setup method + reset(this.checkpointManager); + // redo stubbing to return null checkpoints + when(this.checkpointManager.readLastCheckpoint(any())).thenReturn(null); + // create store under logged stores to demonstrate deletion + final File storeFile = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME); + // add contents to store + final File storeFilePartition = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME + File.separator + "Partition_0"); + storeFilePartition.deleteOnExit(); + storeFile.deleteOnExit(); + try { + storeFile.mkdirs(); + storeFilePartition.createNewFile(); + Assert.assertTrue("Assert that stores are present prior to the test.", storeFile.exists()); + Assert.assertTrue("Assert that store files are present prior to the test.", storeFilePartition.exists()); + this.containerStorageManager.start(); + this.containerStorageManager.shutdown(); + Assert.assertFalse("Assert that stores are deleted after the test.", storeFile.exists()); + Assert.assertFalse("Assert that store files are deleted after the test.", storeFilePartition.exists()); + } catch (Exception e) { + System.out.printf("File %s could not be created.", storeFile); + Assert.fail(); + } finally { + storeFilePartition.delete(); + storeFile.delete(); + } + } + @Test public void testNoConfiguredDurableStores() throws InterruptedException { taskRestoreMetricGauges = new HashMap<>();