diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java index 327eb7f33c..bdacbd4476 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java @@ -182,7 +182,13 @@ public CompletableFuture getSnapshotIndex(String blobId, Metadata return FutureUtil.executeAsyncWithRetries(opName, () -> { ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream return blobStoreManager.get(blobId, indexBlobStream, metadata, getDeleted).toCompletableFuture() - .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + .thenApplyAsync(f -> { + try { + return snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()); + } catch (Exception e) { + throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), e); + } + }, executor); }, isCauseNonRetriable(), executor, retryPolicyConfig); }