Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2784: Remove excessive commit logs #1695

Merged
merged 1 commit into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId,
CompletionStage<String> snapshotIndexBlobIdFuture =
snapshotIndexFuture
.thenComposeAsync(si -> {
LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
LOG.debug("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
return blobStoreUtil.putSnapshotIndex(si);
}, executor);

Expand Down Expand Up @@ -296,7 +296,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 1. remove TTL of index blob and all of its files and sub-dirs marked for retention
CompletionStage<Void> removeTTLFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata);
}, executor);
Expand All @@ -305,7 +305,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 2. delete the files/subdirs marked for deletion in the snapshot index.
CompletionStage<Void> cleanupRemoteSnapshotFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata);
}, executor);
Expand All @@ -317,7 +317,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get();
LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
blobId, taskName, storeName);
return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC
if (!compareLargeFileChecksums && isLargeFile) {
// Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations
// which requires reading the entire file.
LOG.debug("Local file: {} and remote file: {} are same. " +
LOG.debug("Local file: {} and remote file: {} both present. " +
"Skipping checksum calculation for large file of size: {}.",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size());
return true;
Expand All @@ -234,7 +234,7 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC

boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum();
if (!areSameChecksum) {
LOG.warn("Local file: {} and remote file: {} are not same. " +
LOG.debug("Local file: {} and remote file: {} are not same. " +
"Local checksum: {}. Remote checksum: {}",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class TaskInstance(
// WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for
// tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may
// be deleted in the background by the blob store, leading to data loss.
info("Cleaning up stale state from previous run for taskName: %s" format taskName)
debug("Cleaning up stale state from previous run for taskName: %s" format taskName)
commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers)
}

Expand Down Expand Up @@ -474,7 +474,7 @@ class TaskInstance(
new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] {
override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)
Expand Down
Loading