Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add compression mode for BlockAsFileWriter #387

Merged
merged 27 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bbd85f9
adding core logic for the compression
ata-nas Dec 9, 2024
238a27c
adding some documentation
ata-nas Dec 10, 2024
537f494
adding some tests
ata-nas Dec 10, 2024
0235f32
adding some tests
ata-nas Dec 10, 2024
f49ab48
adding some tests
ata-nas Dec 10, 2024
ce682a3
suppressing some warnings
ata-nas Dec 10, 2024
7896617
correct dependency resolution for zstd lib
ata-nas Dec 10, 2024
fa18b82
compression level is now configurable, also addressing a pr comment a…
ata-nas Dec 11, 2024
d1f4a9b
some improvements
ata-nas Dec 11, 2024
aa0def1
adding some tests
ata-nas Dec 11, 2024
415ceb4
using the compression level
ata-nas Dec 11, 2024
cfe0522
noop compression static factory method
ata-nas Dec 11, 2024
babb2f0
adding some logic (partially ready) to support proposed changes
ata-nas Dec 11, 2024
4ac2f6e
cleanup
ata-nas Dec 12, 2024
d84c138
adding test to ensure environment variable mappings
ata-nas Dec 12, 2024
8c9d847
temporary disabling test for checking env var configs
ata-nas Dec 12, 2024
abfa545
some javadoc
ata-nas Dec 12, 2024
3ed6bd7
environment variable support for compression configuration
ata-nas Dec 12, 2024
80d8eb9
cleanup
ata-nas Dec 12, 2024
124e1eb
another test for compression types
ata-nas Dec 12, 2024
15f1968
another test for file utils
ata-nas Dec 12, 2024
3a44365
renaming tests
ata-nas Dec 12, 2024
1364a59
code compiles after rebase
ata-nas Dec 16, 2024
aa7774b
addressing pr comment for preconditions of compression level range
ata-nas Dec 16, 2024
7d2320b
adding newline character at the end of app.properties
ata-nas Dec 16, 2024
fb6e7c5
adding another edge case for preconditions required in range
ata-nas Dec 16, 2024
b8709c8
address pr comment
ata-nas Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,30 @@ public static byte[] readFileBytesUnsafe(
}
}

/**
* This method creates a new file at the given path. The method ensures that
* the full path to the target file will be created, including all missing
* intermediary directories.
*
* @param pathToCreate the path to create
* @throws IOException if the file cannot be created or if it already exists
*/
public static void createFile(@NonNull final Path pathToCreate) throws IOException {
Files.createDirectories(pathToCreate.getParent());
Files.createFile(pathToCreate);
}

/**
* This method appends an extension to a given path.
*
* @param path to append the extension to
* @param extension to append to the path
* @return a new path with the extension appended
*/
@NonNull
public static Path appendExtension(@NonNull final Path path, @NonNull final String extension) {
return path.resolveSibling(path.getFileName() + Objects.requireNonNull(extension));
}

private FileUtilities() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,50 @@ public static long requireGreaterOrEqual(final long toTest, final long base, fin
throw new IllegalArgumentException(message);
}

/**
* This method asserts a given int is within a range (boundaries
* included). If the given int is within the range, then we return it,
* else, an {@link IllegalArgumentException} is thrown.
*
* @param toCheck the int value to test
* @param lowerBoundary the lower boundary
* @param upperBoundary the upper boundary
* @return the input {@code toCheck} if it is within the range (boundaries
* included)
* @throws IllegalArgumentException if the input int does not pass the test
*/
public static int requireInRange(final int toCheck, final int lowerBoundary, final int upperBoundary) {
return requireInRange(toCheck, lowerBoundary, upperBoundary, null);
mattp-swirldslabs marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* This method asserts a given int is within a range (boundaries
* included). If the given int is within the range, then we return it,
* else, an {@link IllegalArgumentException} is thrown.
*
* @param toCheck the int value to check
* @param lowerBoundary the lower boundary
* @param upperBoundary the upper boundary
* @param errorMessage the error message to be used in the exception if the
* input int to test is not within the range, if null, a default message
* will be used
* @return the input {@code toCheck} if it is within the range (boundaries
* included)
* @throws IllegalArgumentException if the input int does not pass the test
*/
public static int requireInRange(
final int toCheck, final int lowerBoundary, final int upperBoundary, final String errorMessage) {
if (toCheck >= lowerBoundary && toCheck <= upperBoundary) {
return toCheck;
} else {
final String message = Objects.isNull(errorMessage)
? "The input int [%d] is required to be in the range [%d, %d] boundaries included."
.formatted(toCheck, lowerBoundary, upperBoundary)
: errorMessage;
throw new IllegalArgumentException(message);
}
}

jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
/**
* This method asserts a given long is a whole number. A long is whole
* if it is greater or equal to zero.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
* Tests for {@link FileUtilities} functionality.
*/
class FileUtilitiesTest {
@TempDir
private Path tempDir;

/**
* This test aims to verify that a folder path will be created for a given
* path if it does not exist. First we assert that the path we want to make
Expand Down Expand Up @@ -192,6 +195,39 @@ void testReadFileBytesUnsafeThrows(final Path filePath) {
assertThatIOException().isThrownBy(() -> FileUtilities.readFileBytesUnsafe(filePath, ".blk", ".gz"));
}

/**
* This test aims to verify that {@link FileUtilities#createFile(Path)}
* correctly creates a file at the given path alongside with any missing
* intermediary directories.
*
* @param toCreate parameterized, target to create
* @param tempDir junit temp dir
*/
@ParameterizedTest
@MethodSource("filesToCreate")
void testCreateFile(final Path toCreate, @TempDir final Path tempDir) throws IOException {
final Path expected = tempDir.resolve(toCreate);
assertThat(expected).doesNotExist();
FileUtilities.createFile(expected);
assertThat(expected).exists().isRegularFile();
}

/**
* This test aims to verify that the
* {@link FileUtilities#appendExtension(Path, String)} method correctly
* appends the given extension to the given path.
*
* @param filePath parameterized, to append extensions to
* @param extension parameterized, extension to append
*/
@ParameterizedTest
@MethodSource("filesWithExtensions")
void testAppendFileExtension(final Path filePath, final String extension) {
final Path pathToTest = tempDir.resolve(filePath);
final Path actual = FileUtilities.appendExtension(pathToTest, extension);
assertThat(actual).hasFileName(filePath + extension);
}

private static Stream<Arguments> validGzipFiles() {
return Stream.of(
Arguments.of("src/test/resources/valid1.txt.gz", "valid1"),
Expand All @@ -210,4 +246,16 @@ private static Stream<Arguments> invalidFiles() {
Arguments.of("src/test/resources/nonexistent.gz"),
Arguments.of("src/test/resources/nonexistent.blk"));
}

private static Stream<Arguments> filesToCreate() {
return Stream.of(Arguments.of("temp1.txt"), Arguments.of("some_folder/temp2.txt"));
}

private static Stream<Arguments> filesWithExtensions() {
return Stream.of(
Arguments.of("valid1", ".blk"),
Arguments.of("valid1", ""),
Arguments.of("valid2", ".gz"),
Arguments.of("valid2", ".zstd"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;

import java.util.function.Consumer;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/**
Expand Down Expand Up @@ -263,4 +265,71 @@ void testRequirePowerOfTwoFail(final int toTest) {
.isThrownBy(() -> Preconditions.requirePowerOfTwo(toTest, testErrorMessage))
.withMessage(testErrorMessage);
}

/**
* This test aims to verify that the
* {@link Preconditions#requireInRange(int, int, int)} will return the
* input 'toTest' parameter if the range check passes. Test includes
* overloads.
*
* @param toTest parameterized, the number to test
* @param lowerBoundary parameterized, the lower boundary
* @param upperBoundary parameterized, the upper boundary
*/
@ParameterizedTest
@MethodSource("validRequireInRangeValues")
void testRequireInRangePass(final int toTest, final int lowerBoundary, final int upperBoundary) {
final Consumer<Integer> asserts = actual ->
assertThat(actual).isBetween(lowerBoundary, upperBoundary).isEqualTo(toTest);

final int actual = Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary);
assertThat(actual).satisfies(asserts);

final int actualOverload =
Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary, "test error message");
assertThat(actualOverload).satisfies(asserts);
}

/**
* This test aims to verify that the
* {@link Preconditions#requireInRange(int, int, int)} will throw an
* {@link IllegalArgumentException} if the range check fails. Test includes
* overloads.
*
* @param toTest parameterized, the number to test
* @param lowerBoundary parameterized, the lower boundary
* @param upperBoundary parameterized, the upper boundary
*/
@ParameterizedTest
@MethodSource("invalidRequireInRangeValues")
void testRequireInRangeFail(final int toTest, final int lowerBoundary, final int upperBoundary) {
assertThatIllegalArgumentException()
.isThrownBy(() -> Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary));

final String testErrorMessage = "test error message";
assertThatIllegalArgumentException()
.isThrownBy(() -> Preconditions.requireInRange(toTest, lowerBoundary, upperBoundary, testErrorMessage))
.withMessage(testErrorMessage);
}

private static Stream<Arguments> validRequireInRangeValues() {
return Stream.of(
Arguments.of(0, 0, 0),
Arguments.of(0, 0, 1),
Arguments.of(1, 0, 1),
Arguments.of(1, 0, 2),
Arguments.of(-1, -1, -1),
Arguments.of(-2, -2, -1),
Arguments.of(-1, -2, -1),
Arguments.of(-1, -2, 0));
}

private static Stream<Arguments> invalidRequireInRangeValues() {
return Stream.of(
Arguments.of(0, 1, 1),
Arguments.of(0, 1, 2),
Arguments.of(1, 2, 3),
Arguments.of(-1, 0, 1),
Arguments.of(-1, 0, 0));
}
}
2 changes: 2 additions & 0 deletions gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ dagger.compiler=com.google.dagger:dagger-compiler
com.squareup.javapoet=com.squareup:javapoet
javax.inject=javax.inject:javax.inject
org.checkerframework.checker.qual=org.checkerframework:checker-qual

com.github.luben.zstd_jni=com.github.luben:zstd-jni
2 changes: 2 additions & 0 deletions server/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defaults and can be left unchanged. It is recommended to browse the properties b
| PERSISTENCE_STORAGE_LIVE_ROOT_PATH | The root path for the live storage. | |
| PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH | The root path for the archive storage. | |
| PERSISTENCE_STORAGE_TYPE | Type of the persistence storage | BLOCK_AS_LOCAL_FILE |
| PERSISTENCE_STORAGE_COMPRESSION | Compression algorithm used during persistence (could be none as well) | ZSTD |
| PERSISTENCE_STORAGE_COMPRESSION_LEVEL | Compression level to be used by the compression algorithm | 3 |
| CONSUMER_TIMEOUT_THRESHOLD_MILLIS | Time to wait for subscribers before disconnecting in milliseconds | 1500 |
| SERVICE_DELAY_MILLIS | Service shutdown delay in milliseconds | 500 |
| MEDIATOR_RING_BUFFER_SIZE | Size of the ring buffer used by the mediator (must be a power of 2) | 67108864 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public final class ServerMappedConfigSourceInitializer {
new ConfigMapping("persistence.storage.liveRootPath", "PERSISTENCE_STORAGE_LIVE_ROOT_PATH"),
new ConfigMapping("persistence.storage.archiveRootPath", "PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH"),
new ConfigMapping("persistence.storage.type", "PERSISTENCE_STORAGE_TYPE"),
new ConfigMapping("persistence.storage.compression", "PERSISTENCE_STORAGE_COMPRESSION"),
new ConfigMapping("persistence.storage.compressionLevel", "PERSISTENCE_STORAGE_COMPRESSION_LEVEL"),
new ConfigMapping("service.delayMillis", "SERVICE_DELAY_MILLIS"),
new ConfigMapping("mediator.ringBufferSize", "MEDIATOR_RING_BUFFER_SIZE"),
new ConfigMapping("mediator.type", "MEDIATOR_TYPE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.StorageType;
import com.hedera.block.server.persistence.storage.compression.Compression;
import com.hedera.block.server.persistence.storage.compression.NoOpCompression;
import com.hedera.block.server.persistence.storage.compression.ZstdCompression;
import com.hedera.block.server.persistence.storage.path.BlockAsLocalDirPathResolver;
import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
Expand Down Expand Up @@ -60,14 +64,16 @@ public interface PersistenceInjectionModule {
* @param blockNodeContext the application context
* @param blockRemover the block remover
* @param blockPathResolver the block path resolver
* @param compression the compression used
* @return a block writer singleton
*/
@Provides
@Singleton
static BlockWriter<List<BlockItemUnparsed>> providesBlockWriter(
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final BlockRemover blockRemover,
@NonNull final BlockPathResolver blockPathResolver) {
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final Compression compression) {
Objects.requireNonNull(blockRemover);
Objects.requireNonNull(blockPathResolver);
final StorageType persistenceType = blockNodeContext
Expand All @@ -76,7 +82,7 @@ static BlockWriter<List<BlockItemUnparsed>> providesBlockWriter(
.type();
try {
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileWriter.of(blockNodeContext, blockPathResolver);
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileWriter.of(blockNodeContext, blockPathResolver, compression);
case BLOCK_AS_LOCAL_DIRECTORY -> BlockAsLocalDirWriter.of(
blockNodeContext, blockRemover, blockPathResolver);
case NO_OP -> NoOpBlockWriter.newInstance();
Expand Down Expand Up @@ -148,6 +154,23 @@ static BlockPathResolver providesPathResolver(@NonNull final PersistenceStorageC
};
}

/**
* Provides a compression singleton using the persistence config.
*
* @param config the persistence storage configuration needed to build the
* compression
* @return a compression singleton
*/
@Provides
@Singleton
static Compression providesCompression(@NonNull final PersistenceStorageConfig config) {
final CompressionType compressionType = config.compression();
return switch (compressionType) {
case ZSTD -> ZstdCompression.of(config);
case NONE -> NoOpCompression.newInstance();
};
}

/**
* Binds the block node event handler to the stream persistence handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static com.hedera.block.server.Constants.BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME;
import static com.hedera.block.server.Constants.BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME;

import com.github.luben.zstd.Zstd;
import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.common.utils.StringUtilities;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
Expand All @@ -34,7 +36,9 @@
*
* @param liveRootPath provides the root path for saving blocks live
* @param archiveRootPath provides the root path for archived blocks
* @param type use a predefined type string to replace the persistence component implementation.
* @param type storage type
* @param compression compression type to use for the storage
* @param compressionLevel compression level used by the compression algorithm
* Non-PRODUCTION values should only be used for troubleshooting and development purposes.
*/
@ConfigData("persistence.storage")
Expand All @@ -43,7 +47,9 @@ public record PersistenceStorageConfig(
@ConfigProperty(defaultValue = "") String liveRootPath,
// @todo(#371) - the default life/archive root path must be absolute starting from /opt
@ConfigProperty(defaultValue = "") String archiveRootPath,
@ConfigProperty(defaultValue = "BLOCK_AS_LOCAL_FILE") StorageType type) {
@ConfigProperty(defaultValue = "BLOCK_AS_LOCAL_FILE") StorageType type,
@ConfigProperty(defaultValue = "ZSTD") CompressionType compression,
@ConfigProperty(defaultValue = "3") int compressionLevel) {
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
// @todo(#371) - the default life/archive root path must be absolute starting from /opt
private static final String LIVE_ROOT_PATH =
Path.of("hashgraph/blocknode/data/live/").toAbsolutePath().toString();
Expand All @@ -56,11 +62,28 @@ public record PersistenceStorageConfig(
*/
public PersistenceStorageConfig {
Objects.requireNonNull(type);
Objects.requireNonNull(compression);
verifyCompressionLevel(compression, compressionLevel);
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
liveRootPath = resolvePath(liveRootPath, LIVE_ROOT_PATH, BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME);
archiveRootPath =
resolvePath(archiveRootPath, ARCHIVE_ROOT_PATH, BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME);
}

/**
* This method verifies that the compression level is within the bounds of
* the given compression type.
*
* @param compressionType the compression type
* @param compressionLevelToCheck the compression level to check
*/
private void verifyCompressionLevel(final CompressionType compressionType, final int compressionLevelToCheck) {
switch (compressionType) {
case ZSTD -> Preconditions.requireInRange(
compressionLevelToCheck, Zstd.minCompressionLevel(), Zstd.maxCompressionLevel());
case NONE -> Preconditions.requireInRange(compressionLevelToCheck, Integer.MIN_VALUE, Integer.MAX_VALUE);
}
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* This method attempts to resolve a given configured path. If the input
* path is blank, a default path is used. The resolved path must be
Expand Down Expand Up @@ -146,4 +169,20 @@ public enum StorageType {
*/
NO_OP
}

/**
* An enum that reflects the type of compression that is used to compress
* the blocks that are stored within the persistence storage.
*/
public enum CompressionType {
/**
* This type of compression is used to compress the blocks using the
* `Zstandard` algorithm.
*/
ZSTD,
/**
* This type means no compression will be done.
*/
NONE
}
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading