diff --git a/common/src/main/java/com/hedera/block/common/utils/FileUtilities.java b/common/src/main/java/com/hedera/block/common/utils/FileUtilities.java index dcbaf5662..1458af04b 100644 --- a/common/src/main/java/com/hedera/block/common/utils/FileUtilities.java +++ b/common/src/main/java/com/hedera/block/common/utils/FileUtilities.java @@ -186,5 +186,30 @@ public static byte[] readFileBytesUnsafe( } } + /** + * This method creates a new file at the given path. The method ensures that + * the full path to the target file will be created, including all missing + * intermediary directories. + * + * @param pathToCreate the path to create + * @throws IOException if the file cannot be created or if it already exists + */ + public static void createFile(@NonNull final Path pathToCreate) throws IOException { + Files.createDirectories(pathToCreate.getParent()); + Files.createFile(pathToCreate); + } + + /** + * This method appends an extension to a given path. + * + * @param path to append the extension to + * @param extension to append to the path + * @return a new path with the extension appended + */ + @NonNull + public static Path appendExtension(@NonNull final Path path, @NonNull final String extension) { + return path.resolveSibling(path.getFileName() + Objects.requireNonNull(extension)); + } + private FileUtilities() {} } diff --git a/common/src/main/java/com/hedera/block/common/utils/Preconditions.java b/common/src/main/java/com/hedera/block/common/utils/Preconditions.java index a5be96f62..64dda3968 100644 --- a/common/src/main/java/com/hedera/block/common/utils/Preconditions.java +++ b/common/src/main/java/com/hedera/block/common/utils/Preconditions.java @@ -185,6 +185,50 @@ public static long requireGreaterOrEqual(final long toTest, final long base, fin throw new IllegalArgumentException(message); } + /** + * This method asserts a given int is within a range (boundaries + * included). If the given int is within the range, then we return it, + * else, an {@link IllegalArgumentException} is thrown. + * + * @param toCheck the int value to test + * @param lowerBoundary the lower boundary + * @param upperBoundary the upper boundary + * @return the input {@code toCheck} if it is within the range (boundaries + * included) + * @throws IllegalArgumentException if the input int does not pass the test + */ + public static int requireInRange(final int toCheck, final int lowerBoundary, final int upperBoundary) { + return requireInRange(toCheck, lowerBoundary, upperBoundary, null); + } + + /** + * This method asserts a given int is within a range (boundaries + * included). If the given int is within the range, then we return it, + * else, an {@link IllegalArgumentException} is thrown. + * + * @param toCheck the int value to check + * @param lowerBoundary the lower boundary + * @param upperBoundary the upper boundary + * @param errorMessage the error message to be used in the exception if the + * input int to test is not within the range, if null, a default message + * will be used + * @return the input {@code toCheck} if it is within the range (boundaries + * included) + * @throws IllegalArgumentException if the input int does not pass the test + */ + public static int requireInRange( + final int toCheck, final int lowerBoundary, final int upperBoundary, final String errorMessage) { + if (toCheck >= lowerBoundary && toCheck <= upperBoundary) { + return toCheck; + } else { + final String message = Objects.isNull(errorMessage) + ? "The input int [%d] is required to be in the range [%d, %d] boundaries included." + .formatted(toCheck, lowerBoundary, upperBoundary) + : errorMessage; + throw new IllegalArgumentException(message); + } + } + /** * This method asserts a given long is a whole number. A long is whole * if it is greater or equal to zero. diff --git a/common/src/test/java/com/hedera/block/common/utils/FileUtilitiesTest.java b/common/src/test/java/com/hedera/block/common/utils/FileUtilitiesTest.java index e736bee86..24bbf02f9 100644 --- a/common/src/test/java/com/hedera/block/common/utils/FileUtilitiesTest.java +++ b/common/src/test/java/com/hedera/block/common/utils/FileUtilitiesTest.java @@ -36,6 +36,9 @@ * Tests for {@link FileUtilities} functionality. */ class FileUtilitiesTest { + @TempDir + private Path tempDir; + /** * This test aims to verify that a folder path will be created for a given * path if it does not exist. First we assert that the path we want to make @@ -192,6 +195,39 @@ void testReadFileBytesUnsafeThrows(final Path filePath) { assertThatIOException().isThrownBy(() -> FileUtilities.readFileBytesUnsafe(filePath, ".blk", ".gz")); } + /** + * This test aims to verify that {@link FileUtilities#createFile(Path)} + * correctly creates a file at the given path alongside with any missing + * intermediary directories. + * + * @param toCreate parameterized, target to create + * @param tempDir junit temp dir + */ + @ParameterizedTest + @MethodSource("filesToCreate") + void testCreateFile(final Path toCreate, @TempDir final Path tempDir) throws IOException { + final Path expected = tempDir.resolve(toCreate); + assertThat(expected).doesNotExist(); + FileUtilities.createFile(expected); + assertThat(expected).exists().isRegularFile(); + } + + /** + * This test aims to verify that the + * {@link FileUtilities#appendExtension(Path, String)} method correctly + * appends the given extension to the given path. + * + * @param filePath parameterized, to append extensions to + * @param extension parameterized, extension to append + */ + @ParameterizedTest + @MethodSource("filesWithExtensions") + void testAppendFileExtension(final Path filePath, final String extension) { + final Path pathToTest = tempDir.resolve(filePath); + final Path actual = FileUtilities.appendExtension(pathToTest, extension); + assertThat(actual).hasFileName(filePath + extension); + } + private static Stream validGzipFiles() { return Stream.of( Arguments.of("src/test/resources/valid1.txt.gz", "valid1"), @@ -210,4 +246,16 @@ private static Stream invalidFiles() { Arguments.of("src/test/resources/nonexistent.gz"), Arguments.of("src/test/resources/nonexistent.blk")); } + + private static Stream filesToCreate() { + return Stream.of(Arguments.of("temp1.txt"), Arguments.of("some_folder/temp2.txt")); + } + + private static Stream filesWithExtensions() { + return Stream.of( + Arguments.of("valid1", ".blk"), + Arguments.of("valid1", ""), + Arguments.of("valid2", ".gz"), + Arguments.of("valid2", ".zstd")); + } } diff --git a/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java b/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java index c5eb94623..769ed95cc 100644 --- a/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java +++ b/common/src/test/java/com/hedera/block/common/utils/PreconditionsTest.java @@ -20,7 +20,9 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import java.util.function.Consumer; +import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; /** @@ -263,4 +265,72 @@ void testRequirePowerOfTwoFail(final int toTest) { .isThrownBy(() -> Preconditions.requirePowerOfTwo(toTest, testErrorMessage)) .withMessage(testErrorMessage); } + + /** + * This test aims to verify that the + * {@link Preconditions#requireInRange(int, int, int)} will return the + * input 'toTest' parameter if the range check passes. Test includes + * overloads. + * + * @param toTest parameterized, the number to test + * @param lowerBoundary parameterized, the lower boundary + * @param upperBoundary parameterized, the upper boundary + */ + @ParameterizedTest + @MethodSource("validRequireInRangeValues") + void testRequireInRangePass(final int toTest, final int lowerBoundary, final int upperBoundary) { + final Consumer asserts = actual -> + assertThat(actual).isBetween(lowerBoundary, upperBoundary).isEqualTo(toTest); + + final int actual = Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary); + assertThat(actual).satisfies(asserts); + + final int actualOverload = + Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary, "test error message"); + assertThat(actualOverload).satisfies(asserts); + } + + /** + * This test aims to verify that the + * {@link Preconditions#requireInRange(int, int, int)} will throw an + * {@link IllegalArgumentException} if the range check fails. Test includes + * overloads. + * + * @param toTest parameterized, the number to test + * @param lowerBoundary parameterized, the lower boundary + * @param upperBoundary parameterized, the upper boundary + */ + @ParameterizedTest + @MethodSource("invalidRequireInRangeValues") + void testRequireInRangeFail(final int toTest, final int lowerBoundary, final int upperBoundary) { + assertThatIllegalArgumentException() + .isThrownBy(() -> Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary)); + + final String testErrorMessage = "test error message"; + assertThatIllegalArgumentException() + .isThrownBy(() -> Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary, testErrorMessage)) + .withMessage(testErrorMessage); + } + + private static Stream validRequireInRangeValues() { + return Stream.of( + Arguments.of(0, 0, 0), + Arguments.of(0, 0, 1), + Arguments.of(1, 0, 1), + Arguments.of(1, 0, 2), + Arguments.of(-1, -1, -1), + Arguments.of(-2, -2, -1), + Arguments.of(-1, -2, -1), + Arguments.of(-1, -2, 0)); + } + + private static Stream invalidRequireInRangeValues() { + return Stream.of( + Arguments.of(0, 1, 1), + Arguments.of(0, 1, 2), + Arguments.of(1, 2, 3), + Arguments.of(-1, 0, 1), + Arguments.of(-1, 0, 0), + Arguments.of(1, 0, 0)); + } } diff --git a/gradle/modules.properties b/gradle/modules.properties index 2dd01cb42..1aa3f5dd0 100644 --- a/gradle/modules.properties +++ b/gradle/modules.properties @@ -49,3 +49,5 @@ dagger.compiler=com.google.dagger:dagger-compiler com.squareup.javapoet=com.squareup:javapoet javax.inject=javax.inject:javax.inject org.checkerframework.checker.qual=org.checkerframework:checker-qual + +com.github.luben.zstd_jni=com.github.luben:zstd-jni diff --git a/server/docs/configuration.md b/server/docs/configuration.md index c68f65f4b..69f880d59 100644 --- a/server/docs/configuration.md +++ b/server/docs/configuration.md @@ -14,6 +14,8 @@ defaults and can be left unchanged. It is recommended to browse the properties b | PERSISTENCE_STORAGE_LIVE_ROOT_PATH | The root path for the live storage. | | | PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH | The root path for the archive storage. | | | PERSISTENCE_STORAGE_TYPE | Type of the persistence storage | BLOCK_AS_LOCAL_FILE | +| PERSISTENCE_STORAGE_COMPRESSION | Compression algorithm used during persistence (could be none as well) | ZSTD | +| PERSISTENCE_STORAGE_COMPRESSION_LEVEL | Compression level to be used by the compression algorithm | 3 | | CONSUMER_TIMEOUT_THRESHOLD_MILLIS | Time to wait for subscribers before disconnecting in milliseconds | 1500 | | SERVICE_DELAY_MILLIS | Service shutdown delay in milliseconds | 500 | | MEDIATOR_RING_BUFFER_SIZE | Size of the ring buffer used by the mediator (must be a power of 2) | 67108864 | diff --git a/server/src/main/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializer.java b/server/src/main/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializer.java index 7d23f2e31..f347b882e 100644 --- a/server/src/main/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializer.java +++ b/server/src/main/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializer.java @@ -31,6 +31,8 @@ public final class ServerMappedConfigSourceInitializer { new ConfigMapping("persistence.storage.liveRootPath", "PERSISTENCE_STORAGE_LIVE_ROOT_PATH"), new ConfigMapping("persistence.storage.archiveRootPath", "PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH"), new ConfigMapping("persistence.storage.type", "PERSISTENCE_STORAGE_TYPE"), + new ConfigMapping("persistence.storage.compression", "PERSISTENCE_STORAGE_COMPRESSION"), + new ConfigMapping("persistence.storage.compressionLevel", "PERSISTENCE_STORAGE_COMPRESSION_LEVEL"), new ConfigMapping("service.delayMillis", "SERVICE_DELAY_MILLIS"), new ConfigMapping("mediator.ringBufferSize", "MEDIATOR_RING_BUFFER_SIZE"), new ConfigMapping("mediator.type", "MEDIATOR_TYPE"), diff --git a/server/src/main/java/com/hedera/block/server/persistence/PersistenceInjectionModule.java b/server/src/main/java/com/hedera/block/server/persistence/PersistenceInjectionModule.java index 59443411e..a4f3ec550 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/PersistenceInjectionModule.java +++ b/server/src/main/java/com/hedera/block/server/persistence/PersistenceInjectionModule.java @@ -20,7 +20,11 @@ import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.StorageType; +import com.hedera.block.server.persistence.storage.compression.Compression; +import com.hedera.block.server.persistence.storage.compression.NoOpCompression; +import com.hedera.block.server.persistence.storage.compression.ZstdCompression; import com.hedera.block.server.persistence.storage.path.BlockAsLocalDirPathResolver; import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver; import com.hedera.block.server.persistence.storage.path.BlockPathResolver; @@ -60,6 +64,7 @@ public interface PersistenceInjectionModule { * @param blockNodeContext the application context * @param blockRemover the block remover * @param blockPathResolver the block path resolver + * @param compression the compression used * @return a block writer singleton */ @Provides @@ -67,7 +72,8 @@ public interface PersistenceInjectionModule { static BlockWriter> providesBlockWriter( @NonNull final BlockNodeContext blockNodeContext, @NonNull final BlockRemover blockRemover, - @NonNull final BlockPathResolver blockPathResolver) { + @NonNull final BlockPathResolver blockPathResolver, + @NonNull final Compression compression) { Objects.requireNonNull(blockRemover); Objects.requireNonNull(blockPathResolver); final StorageType persistenceType = blockNodeContext @@ -76,7 +82,7 @@ static BlockWriter> providesBlockWriter( .type(); try { return switch (persistenceType) { - case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileWriter.of(blockNodeContext, blockPathResolver); + case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileWriter.of(blockNodeContext, blockPathResolver, compression); case BLOCK_AS_LOCAL_DIRECTORY -> BlockAsLocalDirWriter.of( blockNodeContext, blockRemover, blockPathResolver); case NO_OP -> NoOpBlockWriter.newInstance(); @@ -148,6 +154,23 @@ static BlockPathResolver providesPathResolver(@NonNull final PersistenceStorageC }; } + /** + * Provides a compression singleton using the persistence config. + * + * @param config the persistence storage configuration needed to build the + * compression + * @return a compression singleton + */ + @Provides + @Singleton + static Compression providesCompression(@NonNull final PersistenceStorageConfig config) { + final CompressionType compressionType = config.compression(); + return switch (compressionType) { + case ZSTD -> ZstdCompression.of(config); + case NONE -> NoOpCompression.newInstance(); + }; + } + /** * Binds the block node event handler to the stream persistence handler. * diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfig.java b/server/src/main/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfig.java index c385378b1..73311320d 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfig.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfig.java @@ -19,9 +19,12 @@ import static com.hedera.block.server.Constants.BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME; import static com.hedera.block.server.Constants.BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME; +import com.hedera.block.common.utils.Preconditions; import com.hedera.block.common.utils.StringUtilities; import com.swirlds.config.api.ConfigData; import com.swirlds.config.api.ConfigProperty; +import com.swirlds.config.api.validation.annotation.Max; +import com.swirlds.config.api.validation.annotation.Min; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.io.UncheckedIOException; @@ -34,7 +37,9 @@ * * @param liveRootPath provides the root path for saving blocks live * @param archiveRootPath provides the root path for archived blocks - * @param type use a predefined type string to replace the persistence component implementation. + * @param type storage type + * @param compression compression type to use for the storage + * @param compressionLevel compression level used by the compression algorithm * Non-PRODUCTION values should only be used for troubleshooting and development purposes. */ @ConfigData("persistence.storage") @@ -43,7 +48,9 @@ public record PersistenceStorageConfig( @ConfigProperty(defaultValue = "") String liveRootPath, // @todo(#371) - the default life/archive root path must be absolute starting from /opt @ConfigProperty(defaultValue = "") String archiveRootPath, - @ConfigProperty(defaultValue = "BLOCK_AS_LOCAL_FILE") StorageType type) { + @ConfigProperty(defaultValue = "BLOCK_AS_LOCAL_FILE") StorageType type, + @ConfigProperty(defaultValue = "ZSTD") CompressionType compression, + @ConfigProperty(defaultValue = "3") @Min(0) @Max(20) int compressionLevel) { // @todo(#371) - the default life/archive root path must be absolute starting from /opt private static final String LIVE_ROOT_PATH = Path.of("hashgraph/blocknode/data/live/").toAbsolutePath().toString(); @@ -56,6 +63,7 @@ public record PersistenceStorageConfig( */ public PersistenceStorageConfig { Objects.requireNonNull(type); + compression.verifyCompressionLevel(compressionLevel); liveRootPath = resolvePath(liveRootPath, LIVE_ROOT_PATH, BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME); archiveRootPath = resolvePath(archiveRootPath, ARCHIVE_ROOT_PATH, BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME); @@ -146,4 +154,32 @@ public enum StorageType { */ NO_OP } + + /** + * An enum that reflects the type of compression that is used to compress + * the blocks that are stored within the persistence storage. + */ + public enum CompressionType { + /** + * This type of compression is used to compress the blocks using the + * `Zstandard` algorithm. + */ + ZSTD(0, 20), + /** + * This type means no compression will be done. + */ + NONE(Integer.MIN_VALUE, Integer.MAX_VALUE); + + private final int minCompressionLevel; + private final int maxCompressionLevel; + + CompressionType(final int minCompressionLevel, final int maxCompressionLevel) { + this.minCompressionLevel = minCompressionLevel; + this.maxCompressionLevel = maxCompressionLevel; + } + + public void verifyCompressionLevel(final int levelToCheck) { + Preconditions.requireInRange(levelToCheck, minCompressionLevel, maxCompressionLevel); + } + } } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/compression/Compression.java b/server/src/main/java/com/hedera/block/server/persistence/storage/compression/Compression.java new file mode 100644 index 000000000..dccdf19a8 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/compression/Compression.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage.compression; + +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; +import java.io.OutputStream; + +/** + * A compression abstractions that allows for the compression of bytes using + * different compression algorithms based on specific implementation. + */ +public interface Compression { + /** + * This method takes a valid, {@code non-null} {@link OutputStream} instance + * and wraps it with the specific compression algorithm implementation. The + * resulting {@link OutputStream} is then returned. + * + * @param streamToWrap a valid {@code non-null} {@link OutputStream} to wrap + * @return a {@code non-null} {@link OutputStream} that wraps the provided + * {@link OutputStream} with the specific compression algorithm + * implementation + * @throws IOException if an I/O exception occurs + */ + @NonNull + OutputStream wrap(@NonNull final OutputStream streamToWrap) throws IOException; + + /** + * This method aims to return a valid, {@code non-blank} {@link String} that + * represents the file extension for the given specific implementation, + * based on the compression algorithm used. + * + * @return a valid, {@code non-blank} {@link String} that represents the + * file extension for the given specific implementation, based on the + * compression algorithm used + */ + @NonNull + String getCompressionFileExtension(); +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/compression/NoOpCompression.java b/server/src/main/java/com/hedera/block/server/persistence/storage/compression/NoOpCompression.java new file mode 100644 index 000000000..33f459227 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/compression/NoOpCompression.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage.compression; + +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.OutputStream; +import java.util.Objects; + +/** + * An implementation of {@link Compression} that does not compress the data. It + * does not use any algorithm, but simply generates a stream that writes the + * data to it`s destination, as it is received. + */ +public class NoOpCompression implements Compression { + /** + * Constructor. + */ + private NoOpCompression() {} + + /** + * Factory method. Returns a new, fully initialized instance of + * {@link NoOpCompression}. + * + * @return a new, fully initialized instance of {@link NoOpCompression} + */ + @NonNull + public static NoOpCompression newInstance() { + return new NoOpCompression(); + } + + /** + * This implementation does not compress the data. It uses no compression + * algorithm, but simply generates a stream that writes the data to it`s + * destination, as it is received. + * @see Compression#wrap(OutputStream) for API contract + */ + @NonNull + @Override + public OutputStream wrap(@NonNull final OutputStream streamToWrap) { + return Objects.requireNonNull(streamToWrap); + } + + @NonNull + @Override + public String getCompressionFileExtension() { + return ""; + } +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/compression/ZstdCompression.java b/server/src/main/java/com/hedera/block/server/persistence/storage/compression/ZstdCompression.java new file mode 100644 index 000000000..86d2f6e09 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/compression/ZstdCompression.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage.compression; + +import com.github.luben.zstd.ZstdOutputStream; +import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +/** + * An implementation of {@link Compression} that compresses the data using the + * Zstandard (Zstd) compression algorithm. + */ +public class ZstdCompression implements Compression { + private final int compressionLevel; + + /** + * Constructor. + * + * @param config the {@link PersistenceStorageConfig} instance that provides + * the configuration for the compression algorithm + */ + private ZstdCompression(@NonNull final PersistenceStorageConfig config) { + this.compressionLevel = config.compressionLevel(); + } + + /** + * Factory method. Returns a new, fully initialized instance of + * {@link ZstdCompression}. + * + * @param config the {@link PersistenceStorageConfig} instance that provides + * the configuration for the compression algorithm + * @return a new, fully initialized and valid instance of + * {@link ZstdCompression} + */ + @NonNull + public static ZstdCompression of(@NonNull final PersistenceStorageConfig config) { + return new ZstdCompression(config); + } + + /** + * This implementation uses the compression + * algorithm, but simply generates a stream that writes the data to it`s + * destination, as it is received. + * @see Compression#wrap(OutputStream) for API contract + */ + @NonNull + @Override + public OutputStream wrap(@NonNull final OutputStream streamToWrap) throws IOException { + return new ZstdOutputStream(Objects.requireNonNull(streamToWrap), compressionLevel); + } + + @NonNull + @Override + public String getCompressionFileExtension() { + return ".zstd"; + } +} diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/path/BlockPathResolver.java b/server/src/main/java/com/hedera/block/server/persistence/storage/path/BlockPathResolver.java index 4a141b3db..d812ca3f7 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/path/BlockPathResolver.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/path/BlockPathResolver.java @@ -28,7 +28,7 @@ public interface BlockPathResolver { * This method resolves the fs {@link Path} to a Block by a given input * number. This method does not guarantee that the returned {@link Path} * exists! This method is guaranteed to return a {@code non-null} - * {@link Path}. + * {@link Path}. No compression extension is appended to the file name. * * @param blockNumber to be resolved the path for * @return the resolved path to the given Block by a number diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriter.java index fb9fed0a1..a65f6865c 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriter.java @@ -18,17 +18,19 @@ import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted; +import com.hedera.block.common.utils.FileUtilities; import com.hedera.block.common.utils.Preconditions; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.metrics.MetricsService; +import com.hedera.block.server.persistence.storage.compression.Compression; import com.hedera.block.server.persistence.storage.path.BlockPathResolver; import com.hedera.hapi.block.BlockItemUnparsed; import com.hedera.hapi.block.BlockUnparsed; import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.pbj.runtime.ParseException; import edu.umd.cs.findbugs.annotations.NonNull; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedList; @@ -42,21 +44,27 @@ public final class BlockAsLocalFileWriter implements LocalBlockWriter> { private final MetricsService metricsService; private final BlockPathResolver blockPathResolver; + private final Compression compression; private List currentBlockItems; private long currentBlockNumber = -1; /** * Constructor. * - * @param blockNodeContext valid, {@code non-null} instance of {@link BlockNodeContext} used to - * get the {@link MetricsService} - * @param blockPathResolver valid, {@code non-null} instance of {@link BlockPathResolver} used - * to resolve paths to Blocks + * @param blockNodeContext valid, {@code non-null} instance of + * {@link BlockNodeContext} used to get the {@link MetricsService} + * @param blockPathResolver valid, {@code non-null} instance of + * {@link BlockPathResolver} used to resolve paths to Blocks + * @param compression valid, {@code non-null} instance of + * {@link Compression} used to compress the Block */ private BlockAsLocalFileWriter( - @NonNull final BlockNodeContext blockNodeContext, @NonNull final BlockPathResolver blockPathResolver) { + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final BlockPathResolver blockPathResolver, + @NonNull final Compression compression) { this.metricsService = Objects.requireNonNull(blockNodeContext.metricsService()); this.blockPathResolver = Objects.requireNonNull(blockPathResolver); + this.compression = Objects.requireNonNull(compression); } /** @@ -66,11 +74,15 @@ private BlockAsLocalFileWriter( * {@link BlockNodeContext} used to get the {@link MetricsService} * @param blockPathResolver valid, {@code non-null} instance of * {@link BlockPathResolver} used to resolve paths to Blocks + * @param compression valid, {@code non-null} instance of + * {@link Compression} used to compress the Block * @return a new, fully initialized instance of {@link BlockAsLocalFileWriter} */ public static BlockAsLocalFileWriter of( - @NonNull final BlockNodeContext blockNodeContext, @NonNull final BlockPathResolver blockPathResolver) { - return new BlockAsLocalFileWriter(blockNodeContext, blockPathResolver); + @NonNull final BlockNodeContext blockNodeContext, + @NonNull final BlockPathResolver blockPathResolver, + @NonNull final Compression compression) { + return new BlockAsLocalFileWriter(blockNodeContext, blockPathResolver, compression); } @NonNull @@ -97,13 +109,14 @@ public Optional> write(@NonNull final List writeToFs() throws IOException { - final Path blockToWritePathResolved = blockPathResolver.resolvePathToBlock(currentBlockNumber); - Files.createDirectories(blockToWritePathResolved.getParent()); - Files.createFile(blockToWritePathResolved); - try (final FileOutputStream fos = new FileOutputStream(blockToWritePathResolved.toFile())) { + final Path rawBlockPath = blockPathResolver.resolvePathToBlock(currentBlockNumber); + final Path resolvedBlockPath = + FileUtilities.appendExtension(rawBlockPath, compression.getCompressionFileExtension()); + FileUtilities.createFile(resolvedBlockPath); + try (final OutputStream out = compression.wrap(Files.newOutputStream(resolvedBlockPath))) { final BlockUnparsed blockToWrite = BlockUnparsed.newBuilder().blockItems(currentBlockItems).build(); - BlockUnparsed.PROTOBUF.toBytes(blockToWrite).writeTo(fos); + BlockUnparsed.PROTOBUF.toBytes(blockToWrite).writeTo(out); } return currentBlockItems; } diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index d00c922e4..dda0c6d22 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -6,6 +6,7 @@ exports com.hedera.block.server.consumer; exports com.hedera.block.server.exception; exports com.hedera.block.server.persistence.storage; + exports com.hedera.block.server.persistence.storage.compression; exports com.hedera.block.server.persistence.storage.path; exports com.hedera.block.server.persistence.storage.write; exports com.hedera.block.server.persistence.storage.read; @@ -23,6 +24,7 @@ requires com.hedera.block.common; requires com.hedera.block.stream; + requires com.github.luben.zstd_jni; requires com.hedera.pbj.grpc.helidon.config; requires com.hedera.pbj.grpc.helidon; requires com.hedera.pbj.runtime; diff --git a/server/src/main/resources/app.properties b/server/src/main/resources/app.properties index fcdc53254..986eb6efb 100644 --- a/server/src/main/resources/app.properties +++ b/server/src/main/resources/app.properties @@ -14,3 +14,4 @@ prometheus.endpointPortNumber=9999 #persistence.storage.archiveRootPath= # @todo(#372) - default persistence type should be BLOCK_AS_LOCAL_FILE persistence.storage.type=BLOCK_AS_LOCAL_DIRECTORY +#persistence.storage.compression=NONE diff --git a/server/src/test/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializerTest.java b/server/src/test/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializerTest.java index 7c0bdfb72..ee853ee89 100644 --- a/server/src/test/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializerTest.java +++ b/server/src/test/java/com/hedera/block/server/config/ServerMappedConfigSourceInitializerTest.java @@ -16,23 +16,39 @@ package com.hedera.block.server.config; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import com.swirlds.common.metrics.config.MetricsConfig; +import com.swirlds.config.api.ConfigData; +import com.swirlds.config.api.ConfigProperty; import com.swirlds.config.extensions.sources.ConfigMapping; import com.swirlds.config.extensions.sources.MappedConfigSource; import java.lang.reflect.Field; +import java.lang.reflect.RecordComponent; +import java.util.Arrays; +import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.function.Predicate; -import org.junit.jupiter.api.BeforeAll; +import java.util.stream.Stream; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +/** + * Tests for {@link ServerMappedConfigSourceInitializer}. + */ class ServerMappedConfigSourceInitializerTest { private static final ConfigMapping[] SUPPORTED_MAPPINGS = { new ConfigMapping("consumer.timeoutThresholdMillis", "CONSUMER_TIMEOUT_THRESHOLD_MILLIS"), new ConfigMapping("persistence.storage.liveRootPath", "PERSISTENCE_STORAGE_LIVE_ROOT_PATH"), new ConfigMapping("persistence.storage.archiveRootPath", "PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH"), new ConfigMapping("persistence.storage.type", "PERSISTENCE_STORAGE_TYPE"), + new ConfigMapping("persistence.storage.compression", "PERSISTENCE_STORAGE_COMPRESSION"), + new ConfigMapping("persistence.storage.compressionLevel", "PERSISTENCE_STORAGE_COMPRESSION_LEVEL"), new ConfigMapping("service.delayMillis", "SERVICE_DELAY_MILLIS"), new ConfigMapping("mediator.ringBufferSize", "MEDIATOR_RING_BUFFER_SIZE"), new ConfigMapping("mediator.type", "MEDIATOR_TYPE"), @@ -43,11 +59,54 @@ class ServerMappedConfigSourceInitializerTest { new ConfigMapping("prometheus.endpointEnabled", "PROMETHEUS_ENDPOINT_ENABLED"), new ConfigMapping("prometheus.endpointPortNumber", "PROMETHEUS_ENDPOINT_PORT_NUMBER") }; - private static MappedConfigSource toTest; - @BeforeAll - static void setUp() { - toTest = ServerMappedConfigSourceInitializer.getMappedConfigSource(); + /** + * This test aims to verify the state of all config extensions we have + * added. These configs are the ones that are returned from + * {@link BlockNodeConfigExtension#getConfigDataTypes()}. This test will + * verify: + *
+     *     - all added config classes are annotated with the {@link ConfigData}
+     *       annotation.
+     *     - all fields in all config classes are annotated with the
+     *       {@link ConfigProperty} annotation.
+     *     - a mapping for all fields in all config classes is present in the
+     *       {@link ServerMappedConfigSourceInitializer#MAPPINGS()}.
+     * 
+ * @param config parameterized, config class to test + */ + @Disabled("This test is disabled because it will start passing only after #285 gets implemented") + @ParameterizedTest + @MethodSource("allConfigDataTypes") + void testVerifyAllFieldsInRecordsAreMapped(final Class config) { + if (!config.isAnnotationPresent(ConfigData.class)) { + fail("Class %s is missing the ConfigData annotation! All config classes MUST have that annotation present!" + .formatted(config.getSimpleName())); + } else { + final ConfigData configDataAnnotation = config.getDeclaredAnnotation(ConfigData.class); + final String prefix = configDataAnnotation.value(); + for (final RecordComponent recordComponent : config.getRecordComponents()) { + if (!recordComponent.isAnnotationPresent(ConfigProperty.class)) { + fail( + "Field %s in %s is missing the ConfigProperty annotation! All fields in config classes MUST have that annotation present!" + .formatted(recordComponent.getName(), config.getSimpleName())); + } else { + final String expectedMappedName = "%s.%s".formatted(prefix, recordComponent.getName()); + final Optional matchingMapping = Arrays.stream(SUPPORTED_MAPPINGS) + .filter(mapping -> mapping.mappedName().equals(expectedMappedName)) + .findFirst(); + assertThat(matchingMapping) + .isNotNull() + .withFailMessage( + "Field [%s] in [%s] is not present in the environment variable mappings! Expected config key [%s] to be present and to be mapped to [%s]", + recordComponent.getName(), + config.getSimpleName(), + expectedMappedName, + transformToEnvVarConvention(expectedMappedName)) + .isPresent(); + } + } + } } /** @@ -60,29 +119,65 @@ static void setUp() { * ServerMappedConfigSourceInitializer#MAPPINGS} to make this pass. */ @Test - void test_VerifyAllSupportedMappingsAreAddedToInstance() throws ReflectiveOperationException { + void testVerifyAllSupportedMappingsAreAddedToInstance() throws ReflectiveOperationException { final Queue actual = extractConfigMappings(); - assertEquals(SUPPORTED_MAPPINGS.length, actual.size()); + // fail if the actual and this test have a different number of mappings + assertThat(SUPPORTED_MAPPINGS.length) + .withFailMessage( + "The number of supported mappings has changed! Please update the test to reflect the change.\nRUNTIME_MAPPING: %s\nTEST_MAPPING: %s", + actual, Arrays.toString(SUPPORTED_MAPPINGS)) + .isEqualTo(actual.size()); + // test this test against actual for (final ConfigMapping current : SUPPORTED_MAPPINGS) { final Predicate predicate = cm -> current.mappedName().equals(cm.mappedName()) && current.originalName().equals(cm.originalName()); - assertTrue( - actual.stream().anyMatch(predicate), - () -> "when testing for: [%s] it is not contained in mappings of the actual initialized object %s" - .formatted(current, actual)); + assertThat(actual.stream().anyMatch(predicate)) + .withFailMessage( + "When testing for: [%s] it is not contained in mappings of the actual initialized object %s", + current, actual) + .isTrue(); } + + // test actual against this test + for (final ConfigMapping current : actual) { + final Predicate predicate = + cm -> current.mappedName().equals(cm.mappedName()) + && current.originalName().equals(cm.originalName()); + assertThat(Arrays.stream(SUPPORTED_MAPPINGS).anyMatch(predicate)) + .withFailMessage( + "When testing for: [%s] it is not contained in mappings of this test %s", current, actual) + .isTrue(); + } + } + + private static String transformToEnvVarConvention(final String input) { + String underscored = input.replace(".", "_"); + String resolved = underscored.replaceAll("(? extractConfigMappings() throws ReflectiveOperationException { final Field configMappings = MappedConfigSource.class.getDeclaredField("configMappings"); try { configMappings.setAccessible(true); - return (Queue) configMappings.get(toTest); + return (Queue) + configMappings.get(ServerMappedConfigSourceInitializer.getMappedConfigSource()); } finally { configMappings.setAccessible(false); } } + + private static Stream allConfigDataTypes() { + // Add any classes that should be excluded from the test for any reason in the set below + // MetricsConfig is not supported by us + final Set> excluded = Set.of(MetricsConfig.class); + return new BlockNodeConfigExtension() + .getConfigDataTypes().stream() + .filter(configType -> !excluded.contains(configType)) + .map(Arguments::of); + } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/PersistenceInjectionModuleTest.java b/server/src/test/java/com/hedera/block/server/persistence/PersistenceInjectionModuleTest.java index a9f4b7acb..73e73b055 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/PersistenceInjectionModuleTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/PersistenceInjectionModuleTest.java @@ -30,7 +30,11 @@ import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.notifier.Notifier; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.StorageType; +import com.hedera.block.server.persistence.storage.compression.Compression; +import com.hedera.block.server.persistence.storage.compression.NoOpCompression; +import com.hedera.block.server.persistence.storage.compression.ZstdCompression; import com.hedera.block.server.persistence.storage.path.BlockAsLocalDirPathResolver; import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver; import com.hedera.block.server.persistence.storage.path.BlockPathResolver; @@ -82,6 +86,9 @@ class PersistenceInjectionModuleTest { @Mock private BlockPathResolver blockPathResolverMock; + @Mock + private Compression compressionMock; + @Mock private SubscriptionHandler subscriptionHandlerMock; @@ -121,7 +128,7 @@ void testProvidesBlockWriter(final StorageType storageType) { when(persistenceStorageConfigMock.type()).thenReturn(storageType); final BlockWriter> actual = PersistenceInjectionModule.providesBlockWriter( - blockNodeContextMock, blockRemoverMock, blockPathResolverMock); + blockNodeContextMock, blockRemoverMock, blockPathResolverMock, compressionMock); final Class targetInstanceType = switch (storageType) { @@ -156,7 +163,7 @@ void testProvidesBlockWriter_IOException() { // Expect an UncheckedIOException due to the IOException assertThatExceptionOfType(UncheckedIOException.class) .isThrownBy(() -> PersistenceInjectionModule.providesBlockWriter( - blockNodeContextMock, blockRemoverMock, blockPathResolverMock)) + blockNodeContextMock, blockRemoverMock, blockPathResolverMock, compressionMock)) .withCauseInstanceOf(IOException.class) .withMessage("Failed to create BlockWriter"); } @@ -248,6 +255,32 @@ void testProvidesBlockPathResolver(final StorageType storageType) { assertThat(actual).isNotNull().isExactlyInstanceOf(targetInstanceType); } + /** + * This test aims to verify that the + * {@link PersistenceInjectionModule#providesCompression(PersistenceStorageConfig)} + * method will return the correct {@link Compression} instance based on the + * {@link CompressionType} parameter. The test verifies only the result type + * and not what is inside the instance! For the purpose of this test, what + * is inside the instance is not important. We aim to test the branch that + * will be taken based on the {@link CompressionType} parameter in terms of + * the returned instance type. + * + * @param compressionType parameterized, the {@link CompressionType} to test + */ + @ParameterizedTest + @MethodSource("compressionTypes") + void testProvidesCompression(final CompressionType compressionType) { + when(persistenceStorageConfigMock.compression()).thenReturn(compressionType); + final Compression actual = PersistenceInjectionModule.providesCompression(persistenceStorageConfigMock); + + final Class targetInstanceType = + switch (compressionType) { + case ZSTD -> ZstdCompression.class; + case NONE -> NoOpCompression.class; + }; + assertThat(actual).isNotNull().isExactlyInstanceOf(targetInstanceType); + } + @Test void testProvidesStreamValidatorBuilder() throws IOException { final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); @@ -265,4 +298,11 @@ void testProvidesStreamValidatorBuilder() throws IOException { private static Stream storageTypes() { return Arrays.stream(StorageType.values()).map(Arguments::of); } + + /** + * All {@link CompressionType} dynamically generated. + */ + private static Stream compressionTypes() { + return Arrays.stream(CompressionType.values()).map(Arguments::of); + } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java index 771bd4652..9818eae70 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/PersistenceStorageConfigTest.java @@ -18,8 +18,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.from; +import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.StorageType; import java.io.IOException; import java.io.UncheckedIOException; @@ -42,6 +44,16 @@ class PersistenceStorageConfigTest { Path.of("hashgraph/").toAbsolutePath(); private static final Path PERSISTENCE_STORAGE_ROOT_ABSOLUTE_PATH = HASHGRAPH_ROOT_ABSOLUTE_PATH.resolve("blocknode/data/"); + // Default compression level (as set in the config annotation) + private static final int DEFAULT_COMPRESSION_LEVEL = 3; + // NoOp compression level boundaries + private static final int LOWER_BOUNDARY_FOR_NO_OP_COMPRESSION = Integer.MIN_VALUE; + private static final int DEFAULT_VALUE_FOR_NO_OP_COMPRESSION = DEFAULT_COMPRESSION_LEVEL; + private static final int UPPER_BOUNDARY_FOR_NO_OP_COMPRESSION = Integer.MAX_VALUE; + // Zstd compression level boundaries + private static final int LOWER_BOUNDARY_FOR_ZSTD_COMPRESSION = 0; + private static final int DEFAULT_VALUE_FOR_ZSTD_COMPRESSION = DEFAULT_COMPRESSION_LEVEL; + private static final int UPPER_BOUNDARY_FOR_ZSTD_COMPRESSION = 20; @AfterEach void tearDown() { @@ -70,7 +82,8 @@ void tearDown() { @ParameterizedTest @MethodSource("storageTypes") void testPersistenceStorageConfigStorageTypes(final StorageType storageType) { - final PersistenceStorageConfig actual = new PersistenceStorageConfig("", "", storageType); + final PersistenceStorageConfig actual = + new PersistenceStorageConfig("", "", storageType, CompressionType.NONE, DEFAULT_COMPRESSION_LEVEL); assertThat(actual).returns(storageType, from(PersistenceStorageConfig::type)); } @@ -92,7 +105,11 @@ void testPersistenceStorageConfigHappyPaths( final String archiveRootPathToTest, final String expectedArchiveRootPathToTest) { final PersistenceStorageConfig actual = new PersistenceStorageConfig( - liveRootPathToTest, archiveRootPathToTest, StorageType.BLOCK_AS_LOCAL_FILE); + liveRootPathToTest, + archiveRootPathToTest, + StorageType.BLOCK_AS_LOCAL_FILE, + CompressionType.NONE, + DEFAULT_COMPRESSION_LEVEL); assertThat(actual) .returns(expectedLiveRootPathToTest, from(PersistenceStorageConfig::liveRootPath)) .returns(expectedArchiveRootPathToTest, from(PersistenceStorageConfig::archiveRootPath)); @@ -114,7 +131,56 @@ void testPersistenceStorageConfigInvalidRootPaths( final String invalidLiveRootPathToTest, final String invalidArchiveRootPathToTest) { assertThatExceptionOfType(UncheckedIOException.class) .isThrownBy(() -> new PersistenceStorageConfig( - invalidLiveRootPathToTest, invalidArchiveRootPathToTest, StorageType.BLOCK_AS_LOCAL_FILE)); + invalidLiveRootPathToTest, + invalidArchiveRootPathToTest, + StorageType.BLOCK_AS_LOCAL_FILE, + CompressionType.NONE, + DEFAULT_COMPRESSION_LEVEL)); + } + + /** + * This test aims to verify that the {@link PersistenceStorageConfig} class + * correctly returns the compression level that was set in the constructor. + * + * @param compressionLevel parameterized, the compression level to test + */ + @ParameterizedTest + @MethodSource("validCompressionLevels") + void testPersistenceStorageConfigValidCompressionLevel( + final CompressionType compressionType, final int compressionLevel) { + final PersistenceStorageConfig actual = new PersistenceStorageConfig( + "", "", StorageType.BLOCK_AS_LOCAL_FILE, compressionType, compressionLevel); + assertThat(actual).returns(compressionLevel, from(PersistenceStorageConfig::compressionLevel)); + } + + /** + * This test aims to verify that the {@link PersistenceStorageConfig} class + * correctly throws an {@link IllegalArgumentException} when the compression + * level is invalid. + * + * @param compressionLevel parameterized, the compression level to test + */ + @ParameterizedTest + @MethodSource("invalidCompressionLevels") + void testPersistenceStorageConfigInvalidCompressionLevel( + final CompressionType compressionType, final int compressionLevel) { + assertThatIllegalArgumentException() + .isThrownBy(() -> new PersistenceStorageConfig( + "", "", StorageType.BLOCK_AS_LOCAL_FILE, compressionType, compressionLevel)); + } + + /** + * This test aims to verify that the {@link PersistenceStorageConfig} class + * correctly returns the compression type that was set in the constructor. + * + * @param compressionType parameterized, the compression type to test + */ + @ParameterizedTest + @MethodSource("compressionTypes") + void testPersistenceStorageConfigCompressionTypes(final CompressionType compressionType) { + final PersistenceStorageConfig actual = + new PersistenceStorageConfig("", "", StorageType.NO_OP, compressionType, DEFAULT_COMPRESSION_LEVEL); + assertThat(actual).returns(compressionType, from(PersistenceStorageConfig::compression)); } /** @@ -124,6 +190,13 @@ private static Stream storageTypes() { return Arrays.stream(StorageType.values()).map(Arguments::of); } + /** + * All compression types dynamically provided. + */ + private static Stream compressionTypes() { + return Arrays.stream(CompressionType.values()).map(Arguments::of); + } + /** * The default absolute paths. We expect these to allow the persistence * config to be instantiated. Providing a blank string is accepted, it will @@ -223,4 +296,32 @@ private static Stream invalidRootPaths() { return Stream.of( Arguments.of("", invalidPath), Arguments.of(invalidPath, ""), Arguments.of(invalidPath, invalidPath)); } + + private static Stream validCompressionLevels() { + return Stream.of( + Arguments.of( + CompressionType.NONE, + LOWER_BOUNDARY_FOR_NO_OP_COMPRESSION), // lower boundary for NO_OP compression + Arguments.of( + CompressionType.NONE, + DEFAULT_VALUE_FOR_NO_OP_COMPRESSION), // default value for NO_OP compression + Arguments.of( + CompressionType.NONE, + UPPER_BOUNDARY_FOR_NO_OP_COMPRESSION), // upper boundary for NO_OP compression + Arguments.of( + CompressionType.ZSTD, + LOWER_BOUNDARY_FOR_ZSTD_COMPRESSION), // lower boundary for ZSTD compression + Arguments.of( + CompressionType.ZSTD, DEFAULT_VALUE_FOR_ZSTD_COMPRESSION), // default value for ZSTD compression + Arguments.of( + CompressionType.ZSTD, + UPPER_BOUNDARY_FOR_ZSTD_COMPRESSION) // upper boundary for ZSTD compression + ); + } + + private static Stream invalidCompressionLevels() { + return Stream.of( + Arguments.of(CompressionType.ZSTD, LOWER_BOUNDARY_FOR_ZSTD_COMPRESSION - 1), + Arguments.of(CompressionType.ZSTD, UPPER_BOUNDARY_FOR_ZSTD_COMPRESSION + 1)); + } } diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/compression/NoOpCompressionTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/compression/NoOpCompressionTest.java new file mode 100644 index 000000000..036fb964c --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/compression/NoOpCompressionTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage.compression; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for {@link NoOpCompression}. + */ +class NoOpCompressionTest { + @TempDir + private Path testTempDir; + + private NoOpCompression toTest; + + @BeforeEach + void setUp() { + toTest = NoOpCompression.newInstance(); + } + + /** + * This test aims to verify that the + * {@link NoOpCompression#getCompressionFileExtension()} method returns a + * blank string. + */ + @Test + void testGetCompressionFileExtension() { + assertThat(toTest.getCompressionFileExtension()).isNotNull().isBlank(); + } + + /** + * This test aims to verify that the + * {@link NoOpCompression#wrap(OutputStream)} correctly wraps a valid + * provided {@link OutputStream} and writes the test data to it`s + * destination as it is provided, no compression is done. + * + * @param testData parameterized, test data + * @throws IOException if an I/O exception occurs + */ + @ParameterizedTest + @MethodSource("testData") + void testSuccessfulCompression(final String testData) throws IOException { + final Path actual = testTempDir.resolve("successfulCompression.txt"); + Files.createFile(actual); + + // assert that the target file exists + assertThat(actual).exists(); + + final byte[] byteArrayTestData = testData.getBytes(StandardCharsets.UTF_8); + try (final OutputStream out = toTest.wrap(Files.newOutputStream(actual))) { + out.write(byteArrayTestData); + } + assertThat(actual) + .exists() + .isReadable() + .isRegularFile() + .hasSameBinaryContentAs(actualNoOpCompression(byteArrayTestData)); + } + + private Path actualNoOpCompression(final byte[] byteArrayTestData) throws IOException { + final Path tempFile = testTempDir.resolve("tempComparisonFile.txt"); + try (final OutputStream out = Files.newOutputStream(tempFile)) { + out.write(byteArrayTestData); + return tempFile; + } + } + + private static Stream testData() { + return Stream.of( + Arguments.of(""), + Arguments.of(" "), + Arguments.of("\t "), + Arguments.of("Some Random test data"), + Arguments.of("Other Random test data"), + Arguments.of("11110000"), + Arguments.of(" a "), + Arguments.of("\t a "), + Arguments.of("\n a ")); + } +} diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/compression/ZstdCompressionTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/compression/ZstdCompressionTest.java new file mode 100644 index 000000000..1a532ad17 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/compression/ZstdCompressionTest.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.persistence.storage.compression; + +import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_COMPRESSION_LEVEL; +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.luben.zstd.ZstdOutputStream; +import com.hedera.block.common.utils.FileUtilities; +import com.hedera.block.server.config.BlockNodeContext; +import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.util.TestConfigUtil; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for the {@link ZstdCompression} class. + */ +@SuppressWarnings("FieldCanBeLocal") +class ZstdCompressionTest { + @TempDir + private Path testTempDir; + + private BlockNodeContext blockNodeContext; + private PersistenceStorageConfig testConfig; + private ZstdCompression toTest; + + @BeforeEach + void setUp() throws IOException { + blockNodeContext = TestConfigUtil.getTestBlockNodeContext( + Map.of(PERSISTENCE_STORAGE_COMPRESSION_LEVEL, String.valueOf(6))); + testConfig = blockNodeContext.configuration().getConfigData(PersistenceStorageConfig.class); + toTest = ZstdCompression.of(testConfig); + } + + /** + * This test aims to verify that the + * {@link ZstdCompression#getCompressionFileExtension()} method returns the + * Zstandard (Zstd) compression file extension ".zstd". + */ + @Test + void testGetCompressionFileExtension() { + assertThat(toTest.getCompressionFileExtension()) + .isNotNull() + .isNotBlank() + .isEqualTo(".zstd"); + } + + /** + * This test aims to verify that the + * {@link NoOpCompression#wrap(OutputStream)} correctly wraps a valid + * provided {@link OutputStream} and utilizes the Zstandard compression + * algorithm when writing the data to it`s destination. + * + * @param testData parameterized, test data + * @throws IOException if an I/O exception occurs + */ + @ParameterizedTest + @MethodSource("testData") + void testSuccessfulCompression(final String testData) throws IOException { + final Path actual = testTempDir.resolve(FileUtilities.appendExtension( + Path.of("successfulCompression.txt"), toTest.getCompressionFileExtension())); + Files.createFile(actual); + + // assert that the raw target file exists + assertThat(actual).exists(); + + final byte[] byteArrayTestData = testData.getBytes(StandardCharsets.UTF_8); + try (final OutputStream out = toTest.wrap(Files.newOutputStream(actual))) { + out.write(byteArrayTestData); + } + assertThat(actual) + .exists() + .isReadable() + .isRegularFile() + .isNotEmptyFile() + .hasSameBinaryContentAs(actualZstdCompression(byteArrayTestData)); + } + + private Path actualZstdCompression(final byte[] byteArrayTestData) throws IOException { + final Path tempFile = testTempDir.resolve( + FileUtilities.appendExtension(Path.of("tempComparisonFile.txt"), toTest.getCompressionFileExtension())); + try (final ZstdOutputStream out = + new ZstdOutputStream(Files.newOutputStream(tempFile), testConfig.compressionLevel())) { + out.write(byteArrayTestData); + return tempFile; + } + } + + private static Stream testData() { + return Stream.of( + Arguments.of(""), + Arguments.of(" "), + Arguments.of("\t "), + Arguments.of("Some Random test data"), + Arguments.of("Other Random test data"), + Arguments.of("11110000"), + Arguments.of(" a "), + Arguments.of("\t a "), + Arguments.of("\n a ")); + } +} diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/read/BlockAsLocalFileReaderTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/read/BlockAsLocalFileReaderTest.java index 9b0d86d14..b94e3025e 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/read/BlockAsLocalFileReaderTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/read/BlockAsLocalFileReaderTest.java @@ -24,6 +24,8 @@ import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.compression.Compression; +import com.hedera.block.server.persistence.storage.compression.NoOpCompression; import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver; import com.hedera.block.server.persistence.storage.write.BlockAsLocalFileWriter; import com.hedera.block.server.util.PersistTestUtils; @@ -49,8 +51,10 @@ /** * Tests for the {@link BlockAsLocalFileReader} class. */ +@SuppressWarnings("FieldCanBeLocal") class BlockAsLocalFileReaderTest { private BlockAsLocalFileWriter blockAsLocalFileWriterMock; + private Compression compressionMock; private BlockAsLocalFileReader toTest; @TempDir @@ -66,9 +70,11 @@ void setUp() throws IOException { final String testConfigLiveRootPath = testConfig.liveRootPath(); assertThat(testConfigLiveRootPath).isEqualTo(testLiveRootPath.toString()); + compressionMock = spy(NoOpCompression.newInstance()); final BlockAsLocalFilePathResolver blockAsLocalFileResolverMock = spy(BlockAsLocalFilePathResolver.of(testLiveRootPath)); - blockAsLocalFileWriterMock = spy(BlockAsLocalFileWriter.of(blockNodeContext, blockAsLocalFileResolverMock)); + blockAsLocalFileWriterMock = + spy(BlockAsLocalFileWriter.of(blockNodeContext, blockAsLocalFileResolverMock, compressionMock)); toTest = BlockAsLocalFileReader.of(blockAsLocalFileResolverMock); } diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriterTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriterTest.java index 042b0bde3..93f30aa6a 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriterTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileWriterTest.java @@ -24,6 +24,8 @@ import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.compression.Compression; +import com.hedera.block.server.persistence.storage.compression.NoOpCompression; import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver; import com.hedera.block.server.util.TestConfigUtil; import com.hedera.hapi.block.BlockItemUnparsed; @@ -48,10 +50,12 @@ /** * Test class for {@link BlockAsLocalFileWriter}. */ +@SuppressWarnings("FieldCanBeLocal") class BlockAsLocalFileWriterTest { private BlockNodeContext blockNodeContext; private PersistenceStorageConfig testConfig; private BlockAsLocalFilePathResolver pathResolverMock; + private Compression compressionMock; @TempDir private Path testLiveRootPath; @@ -68,7 +72,9 @@ void setUp() throws IOException { assertThat(testConfigLiveRootPath).isEqualTo(testLiveRootPath.toString()); pathResolverMock = spy(BlockAsLocalFilePathResolver.of(testLiveRootPath)); - toTest = BlockAsLocalFileWriter.of(blockNodeContext, pathResolverMock); + compressionMock = spy(NoOpCompression.newInstance()); + + toTest = BlockAsLocalFileWriter.of(blockNodeContext, pathResolverMock, compressionMock); } /** diff --git a/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java b/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java index bc189808c..bde470922 100644 --- a/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java +++ b/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java @@ -35,6 +35,7 @@ public final class PersistTestUtils { private static final Logger LOGGER = System.getLogger(PersistTestUtils.class.getName()); public static final String PERSISTENCE_STORAGE_LIVE_ROOT_PATH_KEY = "persistence.storage.liveRootPath"; + public static final String PERSISTENCE_STORAGE_COMPRESSION_LEVEL = "persistence.storage.compressionLevel"; private PersistTestUtils() {} diff --git a/settings.gradle.kts b/settings.gradle.kts index 15a18a521..37e1a23fb 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -103,6 +103,7 @@ dependencyResolutionManagement { version("org.apache.commons.lang3", "3.14.0") version("org.apache.commons.compress", "1.26.0") version("org.apache.logging.log4j.slf4j2.impl", "2.21.1") + version("com.github.luben.zstd_jni","1.5.6-8") // needed for dagger version("dagger", daggerVersion)