From 93b982840a6beba8ba8a48c5c7b4645385349b07 Mon Sep 17 00:00:00 2001 From: Daniel Chen Date: Fri, 19 Jan 2024 18:46:09 -0800 Subject: [PATCH] SAMZA-2784: Remove excessive commit logs (#1695) --- .../samza/storage/blobstore/BlobStoreBackupManager.java | 8 ++++---- .../apache/samza/storage/blobstore/util/DirDiffUtil.java | 4 ++-- .../scala/org/apache/samza/container/TaskInstance.scala | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index 997c5e6ca4..bf26eea6f7 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -236,7 +236,7 @@ public CompletableFuture> upload(CheckpointId checkpointId, CompletionStage snapshotIndexBlobIdFuture = snapshotIndexFuture .thenComposeAsync(si -> { - LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); + LOG.debug("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); return blobStoreUtil.putSnapshotIndex(si); }, executor); @@ -296,7 +296,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map removeTTLFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", + LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata); }, executor); @@ -305,7 +305,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map cleanupRemoteSnapshotFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", + LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata); }, executor); @@ -317,7 +317,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map { if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) { String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get(); - LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", + LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", blobId, taskName, storeName); return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata); } else { diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java index b89f42a31e..9b84ac7560 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java @@ -219,7 +219,7 @@ public static BiPredicate areSameFile(boolean compareLargeFileC if (!compareLargeFileChecksums && isLargeFile) { // Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations // which requires reading the entire file. - LOG.debug("Local file: {} and remote file: {} are same. " + + LOG.debug("Local file: {} and remote file: {} both present. " + "Skipping checksum calculation for large file of size: {}.", localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size()); return true; @@ -234,7 +234,7 @@ public static BiPredicate areSameFile(boolean compareLargeFileC boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum(); if (!areSameChecksum) { - LOG.warn("Local file: {} and remote file: {} are not same. " + + LOG.debug("Local file: {} and remote file: {} are not same. " + "Local checksum: {}. Remote checksum: {}", localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum()); } else { diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 89738e2de0..70d9ca3800 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -186,7 +186,7 @@ class TaskInstance( // WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for // tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may // be deleted in the background by the blob store, leading to data loss. - info("Cleaning up stale state from previous run for taskName: %s" format taskName) + debug("Cleaning up stale state from previous run for taskName: %s" format taskName) commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers) } @@ -474,7 +474,7 @@ class TaskInstance( new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] { override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = { // Perform cleanup on unused checkpoints - info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) + debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) val cleanUpStartTime = System.nanoTime() try { commitManager.cleanUp(checkpointId, uploadSCMs)