Skip to content

Commit

Permalink
Fix get future exception message
Browse files Browse the repository at this point in the history
  • Loading branch information
ajothomas committed Oct 15, 2024
1 parent d6ffeeb commit ea7615d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata
.thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
.handle((snapshotIndex, ex) -> {
if (ex != null) {
throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex);
throw new SamzaException(String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId), ex);
}
return snapshotIndex;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.util.FileUtil;
import org.apache.samza.util.FutureUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -920,6 +923,24 @@ public void testGetSSIThrowsExceptionOnSyncBlobStoreErrors() {
checkpoint, storesToBackupOrRestore, false);
}

@Test
public void testSerdeException() throws ExecutionException, InterruptedException {
final String blobId = "foo";

final BlobStoreManager testBlobStoreManager = new DeserTestBlobStoreManager();
final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, Executors.newSingleThreadExecutor(), blobStoreConfig, null, null);

final CompletableFuture<SnapshotIndex> future = util.getSnapshotIndex(blobId, mock(Metadata.class), true)
.handle((snapshotIndex, throwable) -> {
if (throwable != null) {
Assert.assertEquals(throwable.getMessage(), String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId));
Assert.assertEquals(throwable.getCause().getMessage(), "org.apache.samza.SamzaException: Exception in deserializing SnapshotIndex bytes foobar");
}
return snapshotIndex;
});
future.get();
}

@Test
public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() {
String store = "storeName1";
Expand Down Expand Up @@ -1045,4 +1066,57 @@ private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map<String,
factoryStoreSCMs.put(stateBackendFactory, storeSCMs);
return new CheckpointV2(checkpointId, ImmutableMap.of(), factoryStoreSCMs);
}

/**
* Test {@link BlobStoreManager} to be used to assert SnapshotIndex deserialization failure
* exception message.
* We write a dummy string's bytes to the OutputStream parameter of get method instead of a SnapshotIndex
* blob. The OutputStream is used by SnapshotIndexSerde which will fail during deserialization.
* */
private static class DeserTestBlobStoreManager extends TestBlobStoreManager {
@Override
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
final String randBlob = "foobar";
final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8);
try {
outputStream.write(byteArray);
} catch (IOException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture(null);
}
}

/**
* Test BlobStoreManager for unit tests.
* */
private static class TestBlobStoreManager implements BlobStoreManager {
@Override
public void init() {
}

@Override
public CompletionStage<String> put(InputStream inputStream, Metadata metadata) {
return null;
}

@Override
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
return null;
}

@Override
public CompletionStage<Void> delete(String id, Metadata metadata) {
return null;
}

@Override
public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
return null;
}

@Override
public void close() {
}
}
}

0 comments on commit ea7615d

Please sign in to comment.