diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index f46a823e31..2bcd8eb467 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -45,6 +45,7 @@ import org.apache.samza.context.JobContext; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskMode; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeManager; import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory; @@ -192,6 +193,20 @@ public ContainerStorageManager( this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers( activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config); + // The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage of the store + // directories. The stores itself does not need to be created but the store directory paths need to be set to be + // able to monitor them, once they're created and in use. + Set storesToCreate = + ContainerStorageManagerUtil.getNonSideInputNonInMemoryStores(storageEngineFactories, sideInputStoreNames, config); + for (String storeName : storesToCreate) { + for (Map.Entry task : containerModel.getTasks().entrySet()) { + File storeDirPath = ContainerStorageManagerUtil.getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, + sideInputStoreNames, task.getKey(), task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, + nonLoggedStoreBaseDirectory); + storeDirectoryPaths.add(storeDirPath.toPath()); + } + } + JobConfig jobConfig = new JobConfig(config); int restoreThreadPoolSize = Math.min( diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java index a221552612..2402e42943 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java @@ -79,7 +79,6 @@ public static Map> createTaskStores(Set> taskStores = new HashMap<>(); - StorageConfig storageConfig = new StorageConfig(config); // iterate over each task and each storeName for (Map.Entry task : containerModel.getTasks().entrySet()) { @@ -90,15 +89,8 @@ public static Map> createTaskStores(Set storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); - // A store is considered durable if it is backed by a changelog or another backupManager factory - boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); - boolean isSideInput = sideInputStoreNames.contains(storeName); - // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir - // for non logged stores - File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory; - File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, - taskModel.getTaskMode()); + File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, + sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory); storeDirectoryPaths.add(storeDirectory.toPath()); // if taskInstanceMetrics are specified use those for store metrics, @@ -412,4 +404,32 @@ public static Set getSideInputStoreNames( } return sideInputStores; } + + public static Set getNonSideInputNonInMemoryStores(Map> storageEngineFactories, + Set sideInputStoreNames, Config config) { + Set inMemoryStoreNames = + ContainerStorageManagerUtil.getInMemoryStoreNames(storageEngineFactories, config); + Set nonSideInputStoreNames = + storageEngineFactories.keySet().stream().filter(storeName -> !sideInputStoreNames.contains(storeName)) + .collect(Collectors.toSet()); + Set storeNames = nonSideInputStoreNames.stream() + .filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet()); + return storeNames; + } + + public static File getStoreDirPath(String storeName, Config config, Map activeTaskChangelogSystemStreams, + Set sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil, + File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) { + StorageConfig storageConfig = new StorageConfig(config); + List storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); + // A store is considered durable if it is backed by a changelog or another backupManager factory + boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); + boolean isSideInput = sideInputStoreNames.contains(storeName); + // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir + // for non logged stores + File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory; + File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, + taskModel.getTaskMode()); + return storeDirectory; + } }