diff --git a/pylib/cqlshlib/test/test_keyspace_init.cql b/pylib/cqlshlib/test/test_keyspace_init.cql index 5ff4108131b1..9eea478be3f4 100644 --- a/pylib/cqlshlib/test/test_keyspace_init.cql +++ b/pylib/cqlshlib/test/test_keyspace_init.cql @@ -34,7 +34,7 @@ CREATE TABLE has_all_types ( uuidcol uuid, varcharcol varchar, varintcol varint -) WITH compression = {'sstable_compression':'LZ4Compressor'}; +) WITH compression = {'class':'LZ4Compressor'}; INSERT INTO has_all_types (num, intcol, asciicol, bigintcol, blobcol, booleancol, decimalcol, doublecol, durationcol, floatcol, smallintcol, textcol, diff --git a/test/conf/cassandra-with-sstable-compressor.yaml b/test/conf/cassandra-with-sstable-compressor.yaml new file mode 100644 index 000000000000..e20c7c187927 --- /dev/null +++ b/test/conf/cassandra-with-sstable-compressor.yaml @@ -0,0 +1,110 @@ + +cluster_name: Test Cluster +# memtable_allocation_type: heap_buffers +memtable_allocation_type: offheap_objects +commitlog_sync: batch +commitlog_sync_batch_window_in_ms: 1.0 +commitlog_segment_size: 5MiB +commitlog_directory: build/test/cassandra/commitlog +# commitlog_compression: +# - class_name: LZ4Compressor +cdc_raw_directory: build/test/cassandra/cdc_raw +cdc_enabled: false +hints_directory: build/test/cassandra/hints +partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner +listen_address: 127.0.0.1 +storage_port: 7012 +ssl_storage_port: 17012 +start_native_transport: true +native_transport_port: 9042 +column_index_size: 4KiB +saved_caches_directory: build/test/cassandra/saved_caches +data_file_directories: + - build/test/cassandra/data +disk_access_mode: mmap +seed_provider: + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + - seeds: "127.0.0.1:7012" +endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch +dynamic_snitch: true +server_encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra +incremental_backups: true +concurrent_compactors: 4 +compaction_throughput: 0MiB/s +row_cache_class_name: org.apache.cassandra.cache.OHCProvider +row_cache_size: 16MiB +user_defined_functions_enabled: true +scripted_user_defined_functions_enabled: true +prepared_statements_cache_size: 1MiB +corrupted_tombstone_strategy: exception +stream_entire_sstables: true +stream_throughput_outbound: 23841858MiB/s +sasi_indexes_enabled: true +materialized_views_enabled: true +drop_compact_storage_enabled: true +file_cache_enabled: true +auto_hints_cleanup_enabled: true + +read_thresholds_enabled: true +coordinator_read_size_warn_threshold: 1024KiB +coordinator_read_size_fail_threshold: 4096KiB +local_read_size_warn_threshold: 4096KiB +local_read_size_fail_threshold: 8192KiB +row_index_read_size_warn_threshold: 4096KiB +row_index_read_size_fail_threshold: 8192KiB + +memtable: + configurations: + skiplist: + inherits: default + class_name: SkipListMemtable + skiplist_sharded: + class_name: ShardedSkipListMemtable + parameters: + serialize_writes: false + shards: 4 + skiplist_sharded_locking: + inherits: skiplist_sharded + parameters: + serialize_writes: true + skiplist_remapped: + inherits: skiplist + test_fullname: + inherits: default + class_name: org.apache.cassandra.db.memtable.TestMemtable + test_shortname: + class_name: TestMemtable + parameters: + skiplist: true # note: YAML must interpret this as string, not a boolean + test_empty_class: + class_name: "" + test_missing_class: + parameters: + test_unknown_class: + class_name: NotExisting + test_invalid_param: + class_name: SkipListMemtable + parameters: + invalid: throw + test_invalid_extra_param: + inherits: test_shortname + parameters: + invalid: throw + test_invalid_factory_method: + class_name: org.apache.cassandra.cql3.validation.operations.CreateTest$InvalidMemtableFactoryMethod + test_invalid_factory_field: + class_name: org.apache.cassandra.cql3.validation.operations.CreateTest$InvalidMemtableFactoryField + +sstable_compression: + class_name: lz4 + parameters: + lz4_compressor_type: high + lz4_high_compressor_level: "8" + chunk_length: 32MiB + min_compress_ratio: "1.5" diff --git a/test/distributed/org/apache/cassandra/distributed/util/PyDtest.java b/test/distributed/org/apache/cassandra/distributed/util/PyDtest.java index 3b2425f74d8c..8b71efcd70c1 100644 --- a/test/distributed/org/apache/cassandra/distributed/util/PyDtest.java +++ b/test/distributed/org/apache/cassandra/distributed/util/PyDtest.java @@ -160,7 +160,7 @@ public String build() query += String.format(" AND CLUSTERING ORDER BY (%s)", clustering); if (compression != null) - query += String.format(" AND compression = { \'sstable_compression\': \'%sCompressor\' }", compression); + query += String.format(" AND compression = { \'class\': \'%sCompressor\' }", compression); else query += " AND compression = {}"; diff --git a/test/microbench/org/apache/cassandra/test/microbench/CachingBenchTest.java b/test/microbench/org/apache/cassandra/test/microbench/CachingBenchTest.java index 2cc09ed330ad..3a0f09fc2a4d 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/CachingBenchTest.java +++ b/test/microbench/org/apache/cassandra/test/microbench/CachingBenchTest.java @@ -203,7 +203,7 @@ public void testSetup(String compactionClass, String compressorClass, DiskAccess DatabaseDescriptor.setFileCacheEnabled(cacheEnabled); DatabaseDescriptor.setDiskAccessMode(mode); alterTable("ALTER TABLE %s WITH compaction = { 'class' : '" + compactionClass + "' };"); - alterTable("ALTER TABLE %s WITH compression = { 'sstable_compression' : '" + compressorClass + "' };"); + alterTable("ALTER TABLE %s WITH compression = { 'class' : '" + compressorClass + "' };"); ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); cfs.disableAutoCompaction(); diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index f341fb6eb803..bd1fe92ace2b 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -770,15 +770,15 @@ private static CompressionParams compressionParams(int chunkLength) switch (algo) { case "deflate": - return CompressionParams.deflate(chunkLength); + return TestCompressionParamsFactory.deflate(chunkLength); case "lz4": - return CompressionParams.lz4(chunkLength); + return TestCompressionParamsFactory.lz4(chunkLength); case "snappy": - return CompressionParams.snappy(chunkLength); + return TestCompressionParamsFactory.snappy(chunkLength); case "noop": - return CompressionParams.noop(); + return TestCompressionParamsFactory.noop(); case "zstd": - return CompressionParams.zstd(chunkLength); + return TestCompressionParamsFactory.zstd(chunkLength); case "none": return CompressionParams.noCompression(); default: diff --git a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java index efa47b05d760..dbbfdf53dd24 100644 --- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java +++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.distributed.shared.WithProperties; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.schema.CompressionParams; import org.yaml.snakeyaml.error.YAMLException; import static org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_ALLOW_SYSTEM_PROPERTIES; @@ -44,6 +45,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -170,6 +173,23 @@ public void readThresholdsFromConfig() assertThat(c.row_index_read_size_fail_threshold).isEqualTo(new DataStorageSpec.LongBytesBound(1 << 13, KIBIBYTES)); } + @Test + public void readSSTableCompressionFromConfig() + { + Config c = load("test/conf/cassandra.yaml"); + + assertNull(c.sstable_compression); + + c = load("test/conf/cassandra-with-sstable-compressor.yaml"); + + assertNotNull(c.sstable_compression); + assertThat(c.sstable_compression.class_name).isEqualTo("lz4"); + assertThat(c.sstable_compression.parameters.remove(CompressionParams.CHUNK_LENGTH)).isEqualTo("32MiB"); + assertThat(c.sstable_compression.parameters.remove(CompressionParams.MIN_COMPRESS_RATIO)).isEqualTo("1.5"); + assertThat(c.sstable_compression.parameters.remove(CompressionParams.ENABLED)).isNull(); + assertThat(c.sstable_compression.parameters.size()).isEqualTo(2); + } + @Test public void readThresholdsFromMap() { diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java index bac7e7917c06..a01c676d8c9c 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java @@ -36,31 +36,12 @@ import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; - public class CrcCheckChanceTest extends CQLTester { - - - @Test - public void testChangingCrcCheckChanceNewFormat() throws Throwable - { - testChangingCrcCheckChance(true); - } - @Test - public void testChangingCrcCheckChanceOldFormat() throws Throwable - { - testChangingCrcCheckChance(false); - } - - - public void testChangingCrcCheckChance(boolean newFormat) throws Throwable + public void testChangingCrcCheckChance() { - //Start with crc_check_chance of 99% - if (newFormat) - createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor'} AND crc_check_chance = 0.99;"); - else - createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}"); + createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'class': 'LZ4Compressor'} AND crc_check_chance = 0.99;"); execute("CREATE INDEX foo ON %s(v)"); @@ -79,10 +60,7 @@ public void testChangingCrcCheckChance(boolean newFormat) throws Throwable Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance(), 0.0); //Test for stack overflow - if (newFormat) - alterTable("ALTER TABLE %s WITH crc_check_chance = 0.99"); - else - alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.99}"); + alterTable("ALTER TABLE %s WITH crc_check_chance = 0.99"); assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"), row("p1", "k1", "sv1", "v1"), @@ -131,10 +109,7 @@ public void testChangingCrcCheckChance(boolean newFormat) throws Throwable ); //Alter again via schema - if (newFormat) - alterTable("ALTER TABLE %s WITH crc_check_chance = 0.5"); - else - alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.5}"); + alterTable("ALTER TABLE %s WITH crc_check_chance = 0.5"); //We should be able to get the new value by accessing directly the schema metadata Assert.assertEquals(0.5, cfs.metadata().params.crcCheckChance, 0.0); @@ -168,12 +143,12 @@ public void testChangingCrcCheckChance(boolean newFormat) throws Throwable } @Test - public void testDropDuringCompaction() throws Throwable + public void testDropDuringCompaction() { CompactionManager.instance.disableAutoCompaction(); //Start with crc_check_chance of 99% - createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}"); + createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'class': 'LZ4Compressor'} AND crc_check_chance = 0.99;"); ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable()); diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java index 188170762361..2baf3c596ac3 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java @@ -635,10 +635,10 @@ public void testAlterTableWithCompression() throws Throwable createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"); assertSchemaOption("compression", map("chunk_length_in_kb", "16", "class", "org.apache.cassandra.io.compress.LZ4Compressor")); - alterTable("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };"); + alterTable("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length' : '32KiB' };"); assertSchemaOption("compression", map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")); - alterTable("ALTER TABLE %s WITH compression = { 'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 64 };"); + alterTable("ALTER TABLE %s WITH compression = { 'class' : 'LZ4Compressor', 'chunk_length' : '64KiB' };"); assertSchemaOption("compression", map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")); alterTable("ALTER TABLE %s WITH compression = { 'class' : 'LZ4Compressor', 'min_compress_ratio' : 2 };"); @@ -650,36 +650,20 @@ public void testAlterTableWithCompression() throws Throwable alterTable("ALTER TABLE %s WITH compression = { 'class' : 'LZ4Compressor', 'min_compress_ratio' : 0 };"); assertSchemaOption("compression", map("chunk_length_in_kb", "16", "class", "org.apache.cassandra.io.compress.LZ4Compressor")); - alterTable("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };"); + alterTable("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length' : '32KiB' };"); alterTable("ALTER TABLE %s WITH compression = { 'enabled' : 'false'};"); assertSchemaOption("compression", map("enabled", "false")); - assertAlterTableThrowsException(ConfigurationException.class, - "Missing sub-option 'class' for the 'compression' option.", - "ALTER TABLE %s WITH compression = {'chunk_length_in_kb' : 32};"); - assertAlterTableThrowsException(ConfigurationException.class, "The 'class' option must not be empty. To disable compression use 'enabled' : false", "ALTER TABLE %s WITH compression = { 'class' : ''};"); assertAlterTableThrowsException(ConfigurationException.class, - "If the 'enabled' option is set to false no other options must be specified", - "ALTER TABLE %s WITH compression = { 'enabled' : 'false', 'class' : 'SnappyCompressor'};"); - - assertAlterTableThrowsException(ConfigurationException.class, - "The 'sstable_compression' option must not be used if the compression algorithm is already specified by the 'class' option", - "ALTER TABLE %s WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'class' : 'SnappyCompressor'};"); - - assertAlterTableThrowsException(ConfigurationException.class, - "The 'chunk_length_kb' option must not be used if the chunk length is already specified by the 'chunk_length_in_kb' option", - "ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_kb' : 32 , 'chunk_length_in_kb' : 32 };"); - - assertAlterTableThrowsException(ConfigurationException.class, - "Invalid negative min_compress_ratio", + "Invalid 'min_compress_ratio' value for the 'compression' option. Can either be 0 or greater than or equal to 1: -1.0", "ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'min_compress_ratio' : -1 };"); assertAlterTableThrowsException(ConfigurationException.class, - "min_compress_ratio can either be 0 or greater than or equal to 1", + "Invalid 'min_compress_ratio' value for the 'compression' option. Can either be 0 or greater than or equal to 1: 0.5", "ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'min_compress_ratio' : 0.5 };"); } diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java index c2a86530193b..21166a3ac9f8 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java @@ -681,66 +681,50 @@ public void testCreateTableWithCompression() throws Throwable assertSchemaOption("compression", map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")); createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'chunk_length_kb' : 32 };"); + + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length' : '32KiB' };"); assertSchemaOption("compression", map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")); createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'min_compress_ratio' : 2 };"); + + " WITH compression = { 'class' : 'SnappyCompressor', 'min_compress_ratio' : 2 };"); assertSchemaOption("compression", map("chunk_length_in_kb", "16", "class", "org.apache.cassandra.io.compress.SnappyCompressor", "min_compress_ratio", "2.0")); createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'min_compress_ratio' : 1 };"); + + " WITH compression = { 'class' : 'SnappyCompressor', 'min_compress_ratio' : 1 };"); assertSchemaOption("compression", map("chunk_length_in_kb", "16", "class", "org.apache.cassandra.io.compress.SnappyCompressor", "min_compress_ratio", "1.0")); createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'min_compress_ratio' : 0 };"); + + " WITH compression = { 'class' : 'SnappyCompressor', 'min_compress_ratio' : 0 };"); assertSchemaOption("compression", map("chunk_length_in_kb", "16", "class", "org.apache.cassandra.io.compress.SnappyCompressor")); - createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'sstable_compression' : '', 'chunk_length_kb' : 32 };"); - assertSchemaOption("compression", map("enabled", "false")); - createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))" + " WITH compression = { 'enabled' : 'false'};"); assertSchemaOption("compression", map("enabled", "false")); - assertThrowsConfigurationException("Missing sub-option 'class' for the 'compression' option.", - "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = {'chunk_length_in_kb' : 32};"); - assertThrowsConfigurationException("The 'class' option must not be empty. To disable compression use 'enabled' : false", "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" + " WITH compression = { 'class' : ''};"); - assertThrowsConfigurationException("If the 'enabled' option is set to false no other options must be specified", - "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'enabled' : 'false', 'class' : 'SnappyCompressor'};"); - - assertThrowsConfigurationException("If the 'enabled' option is set to false no other options must be specified", - "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'enabled' : 'false', 'chunk_length_in_kb' : 32};"); - - assertThrowsConfigurationException("The 'sstable_compression' option must not be used if the compression algorithm is already specified by the 'class' option", + assertThrowsConfigurationException("Invalid 'chunk_length' value for the 'compression' option. Must be a power of 2: 31744", "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'class' : 'SnappyCompressor'};"); + + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length' : '31KiB' };"); - assertThrowsConfigurationException("The 'chunk_length_kb' option must not be used if the chunk length is already specified by the 'chunk_length_in_kb' option", + assertThrowsConfigurationException(format("Only one of '%s' or '%s' may be specified", CompressionParams.CHUNK_LENGTH, CompressionParams.CHUNK_LENGTH_IN_KB), "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" - + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_kb' : 32 , 'chunk_length_in_kb' : 32 };"); + + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length' : '32KiB' , 'chunk_length_in_kb' : 32 };"); - assertThrowsConfigurationException("chunk_length_in_kb must be a power of 2", + assertThrowsConfigurationException("Invalid 'chunk_length_in_kb' value for the 'compression' option. Must be a power of 2: 31744", "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 31 };"); - assertThrowsConfigurationException("Invalid negative or null chunk_length_in_kb", + assertThrowsConfigurationException("Invalid 'chunk_length_in_kb' value for the 'compression' option. May not be <= 0: -1", "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : -1 };"); - assertThrowsConfigurationException("Invalid negative min_compress_ratio", + assertThrowsConfigurationException("Invalid 'min_compress_ratio' value for the 'compression' option. Can either be 0 or greater than or equal to 1: -1.0", "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" + " WITH compression = { 'class' : 'SnappyCompressor', 'min_compress_ratio' : -1 };"); - assertThrowsConfigurationException("Unknown compression options unknownOption", + assertThrowsConfigurationException("Unknown compression options: ([unknownOption])", "CREATE TABLE %s (a text, b int, c int, primary key (a, b))" + " WITH compression = { 'class' : 'SnappyCompressor', 'unknownOption' : 32 };"); } diff --git a/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java b/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java index 41244ca443a6..60a93b127665 100644 --- a/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java +++ b/test/unit/org/apache/cassandra/db/SchemaCQLHelperTest.java @@ -275,7 +275,7 @@ public void testCfmOptionsCQL() .bloomFilterFpChance(1.0) .comment("comment") .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1"))) - .compression(CompressionParams.lz4(1 << 16, 1 << 15)) + .compression(TestCompressionParamsFactory.lz4(1 << 16, 1 << 15)) .crcCheckChance(0.3) .defaultTimeToLive(4) .gcGraceSeconds(5) diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailAllowUncompressedTablesTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailAllowUncompressedTablesTest.java index 27f13f1ef17e..acf48033da2a 100644 --- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailAllowUncompressedTablesTest.java +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailAllowUncompressedTablesTest.java @@ -39,7 +39,7 @@ public void createSuccess() { setGuardrail(true); String table = createTableName(); - schemaChange(String.format("CREATE TABLE %s.%s (k int primary key, v int) WITH compression={'sstable_compression':''}", KEYSPACE, table)); + schemaChange(String.format("CREATE TABLE %s.%s (k int primary key, v int) WITH compression={ 'enabled': false }", KEYSPACE, table)); TableMetadata tmd = Schema.instance.getTableMetadata(KEYSPACE, table); Assert.assertFalse(tmd.params.compression.isEnabled()); } @@ -52,7 +52,7 @@ public void createFailure() throws Throwable { setGuardrail(false); String table = createTableName(); - assertFails(String.format("CREATE TABLE %s.%s (k int primary key, v int) WITH compression={'sstable_compression':''}", KEYSPACE, table), "Uncompressed table is not allowed"); + assertFails(String.format("CREATE TABLE %s.%s (k int primary key, v int) WITH compression={ 'enabled': false }", KEYSPACE, table), "Uncompressed table is not allowed"); } @Test @@ -61,7 +61,7 @@ public void alterSuccess() setGuardrail(true); String table = createTableName(); schemaChange(String.format("CREATE TABLE %s.%s (k int primary key, v int)", KEYSPACE, table)); - schemaChange(String.format("ALTER TABLE %s.%s WITH compression = {'sstable_compression': ''}", KEYSPACE, table)); + schemaChange(String.format("ALTER TABLE %s.%s WITH compression = { 'enabled': false }", KEYSPACE, table)); TableMetadata tmd = Schema.instance.getTableMetadata(KEYSPACE, table); Assert.assertFalse(tmd.params.compression.isEnabled()); } @@ -72,6 +72,6 @@ public void alterFailure() throws Throwable setGuardrail(false); String table = createTableName(); schemaChange(String.format("CREATE TABLE %s.%s (k int primary key, v int)", KEYSPACE, table)); - assertFails(String.format("ALTER TABLE %s.%s WITH compression = {'sstable_compression': ''}", KEYSPACE, table), "Uncompressed table is not allowed"); + assertFails(String.format("ALTER TABLE %s.%s WITH compression = { 'enabled': false }", KEYSPACE, table), "Uncompressed table is not allowed"); } } diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java index de953734ca21..3bb595eceba5 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java @@ -66,7 +66,7 @@ public static void defineSchemaAndPrepareSSTable() SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE, CF_COMPRESSED).compression(CompressionParams.DEFAULT)); + SchemaLoader.standardCFMD(KEYSPACE, CF_COMPRESSED).compression(CompressionParams.defaultParams())); Keyspace keyspace = Keyspace.open(KEYSPACE); store = keyspace.getColumnFamilyStore(CF_COMPRESSED); diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java index 517edeb8a8b4..7e0d1775b6c0 100644 --- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java @@ -106,7 +106,7 @@ public void zstdBadParamsTest() throws Throwable @Test public void lz4FlushTest() throws Throwable { - createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'LZ4Compressor'};"); + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'class': 'LZ4Compressor'};"); ColumnFamilyStore store = flushTwice(); // Should flush as LZ4 "fast" @@ -131,7 +131,7 @@ public void lz4FlushTest() throws Throwable public void lz4hcFlushTest() throws Throwable { createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = " + - "{'sstable_compression': 'LZ4Compressor', 'lz4_compressor_type': 'high'};"); + "{'class': 'LZ4Compressor', 'lz4_compressor_type': 'high'};"); ColumnFamilyStore store = flushTwice(); // Should flush as LZ4 "fast" mode @@ -155,7 +155,7 @@ public void lz4hcFlushTest() throws Throwable @Test public void zstdFlushTest() throws Throwable { - createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'ZstdCompressor'};"); + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'class': 'ZstdCompressor'};"); ColumnFamilyStore store = flushTwice(); // Should flush as LZ4 @@ -177,7 +177,7 @@ public void zstdFlushTest() throws Throwable @Test public void deflateFlushTest() throws Throwable { - createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'DeflateCompressor'};"); + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'class': 'DeflateCompressor'};"); ColumnFamilyStore store = flushTwice(); // Should flush as LZ4 @@ -200,7 +200,7 @@ public void deflateFlushTest() throws Throwable public void useNoCompressorOnFlushTest() throws Throwable { DatabaseDescriptor.setFlushCompression(Config.FlushCompression.none); - createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'LZ4Compressor'};"); + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'class': 'LZ4Compressor'};"); ColumnFamilyStore store = flushTwice(); // Should flush as Noop compressor @@ -224,7 +224,7 @@ public void useTableCompressorOnFlushTest() throws Throwable { DatabaseDescriptor.setFlushCompression(Config.FlushCompression.table); - createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'ZstdCompressor'};"); + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'class': 'ZstdCompressor'};"); ColumnFamilyStore store = flushTwice(); // Should flush as Zstd @@ -237,7 +237,7 @@ public void useTableCompressorOnFlushTest() throws Throwable @Test public void zstdTableFlushTest() throws Throwable { - createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'ZstdCompressor'};"); + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'class': 'ZstdCompressor'};"); ColumnFamilyStore store = flushTwice(); // Should flush as LZ4 diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 2bf127f0790d..4f9dbb9c6374 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TestCompressionParamsFactory; import org.apache.cassandra.utils.SyncUtil; import org.assertj.core.api.Assertions; @@ -106,7 +107,7 @@ public void test6791() throws IOException, ConfigurationException MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, new File(filename + ".metadata"), null, SequentialWriterOption.DEFAULT, - CompressionParams.snappy(32), + TestCompressionParamsFactory.snappy(32), sstableMetadataCollector)) { @@ -155,7 +156,7 @@ public void testChunkIndexOverflow() throws IOException try { - writeSSTable(file, CompressionParams.snappy(chunkLength), 10); + writeSSTable(file, TestCompressionParamsFactory.snappy(chunkLength), 10); CompressionMetadata metadata = CompressionMetadata.open(new File(filename + ".metadata"), file.length(), true); long chunks = 2761628520L; @@ -180,7 +181,7 @@ public void testChunkIndexOverflow() throws IOException private static void testResetAndTruncate(File f, boolean compressed, boolean usemmap, int junkSize, double minCompressRatio) throws IOException { final String filename = f.absolutePath(); - writeSSTable(f, compressed ? CompressionParams.snappy() : null, junkSize); + writeSSTable(f, compressed ? TestCompressionParamsFactory.snappy() : null, junkSize); try (CompressionMetadata compressionMetadata = compressed ? CompressionMetadata.open(new File(filename + ".metadata"), f.length(), true) : null; FileHandle fh = new FileHandle.Builder(f).mmapped(usemmap).withCompressionMetadata(compressionMetadata).complete(); @@ -249,7 +250,7 @@ public void testDataCorruptionDetection() throws IOException MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata, null, SequentialWriterOption.DEFAULT, - CompressionParams.snappy(), sstableMetadataCollector)) + TestCompressionParamsFactory.snappy(), sstableMetadataCollector)) { writer.write(CONTENT.getBytes()); writer.finish(); diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index afa469c48772..8fd1c573416f 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -47,6 +47,7 @@ import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.io.util.SequentialWriterTest; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TestCompressionParamsFactory; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; @@ -88,35 +89,35 @@ private void runTests(String testName) throws IOException @Test public void testLZ4Writer() throws IOException { - compressionParameters = CompressionParams.lz4(); + compressionParameters = TestCompressionParamsFactory.lz4(); runTests("LZ4"); } @Test public void testDeflateWriter() throws IOException { - compressionParameters = CompressionParams.deflate(); + compressionParameters = TestCompressionParamsFactory.deflate(); runTests("Deflate"); } @Test public void testSnappyWriter() throws IOException { - compressionParameters = CompressionParams.snappy(); + compressionParameters = TestCompressionParamsFactory.snappy(); runTests("Snappy"); } @Test public void testZSTDWriter() throws IOException { - compressionParameters = CompressionParams.zstd(); + compressionParameters = TestCompressionParamsFactory.zstd(); runTests("ZSTD"); } @Test public void testNoopWriter() throws IOException { - compressionParameters = CompressionParams.noop(); + compressionParameters = TestCompressionParamsFactory.noop(); runTests("Noop"); } @@ -191,7 +192,7 @@ private void testWrite(File f, int bytesToTest, boolean useMemmap) throws IOExce public void testShortUncompressedChunk() throws IOException { // Test uncompressed chunk below threshold (CASSANDRA-14892) - compressionParameters = CompressionParams.lz4(DEFAULT_CHUNK_LENGTH, DEFAULT_CHUNK_LENGTH); + compressionParameters = TestCompressionParamsFactory.lz4(DEFAULT_CHUNK_LENGTH, DEFAULT_CHUNK_LENGTH); testWrite(FileUtils.createTempFile("14892", "1"), compressionParameters.maxCompressedLength() - 1, false); } @@ -288,7 +289,7 @@ public void resetAndTruncateTest() byte[] toWrite = new byte[writeSize]; try (SequentialWriter writer = new CompressedSequentialWriter(tempFile, offsetsFile, null, SequentialWriterOption.DEFAULT, - CompressionParams.lz4(bufferSize), + TestCompressionParamsFactory.lz4(bufferSize), new MetadataCollector(new ClusteringComparator(UTF8Type.instance)))) { // write bytes greather than buffer @@ -342,7 +343,7 @@ private TestableCSW(File file, File offsetsFile) throws IOException { this(file, offsetsFile, new CompressedSequentialWriter(file, offsetsFile, null, SequentialWriterOption.DEFAULT, - CompressionParams.lz4(BUFFER_SIZE, MAX_COMPRESSED), + TestCompressionParamsFactory.lz4(BUFFER_SIZE, MAX_COMPRESSED), new MetadataCollector(new ClusteringComparator(UTF8Type.instance)))); } diff --git a/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java b/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java index 321fe5735606..ca2a9bf006f5 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressionMetadataTest.java @@ -24,6 +24,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TestCompressionParamsFactory; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -31,7 +32,7 @@ public class CompressionMetadataTest { File chunksIndexFile = new File("/path/to/metadata"); - CompressionParams params = CompressionParams.zstd(); + CompressionParams params = TestCompressionParamsFactory.zstd(); long dataLength = 1000; long compressedFileLength = 100; diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 48ea2c6d20bd..26e86fa2dcba 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -33,6 +33,7 @@ import java.util.stream.Stream; import com.google.common.collect.Sets; + import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Rule; @@ -124,7 +125,7 @@ public static void defineSchema() throws Exception SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3), SchemaLoader.standardCFMD(KEYSPACE1, CF_MOVE_AND_OPEN), - SchemaLoader.standardCFMD(KEYSPACE1, CF_COMPRESSED).compression(CompressionParams.DEFAULT), + SchemaLoader.standardCFMD(KEYSPACE1, CF_COMPRESSED).compression(CompressionParams.defaultParams()), SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED, true), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD_LOW_INDEX_INTERVAL) .minIndexInterval(8) diff --git a/test/unit/org/apache/cassandra/io/sstable/VerifyTest.java b/test/unit/org/apache/cassandra/io/sstable/VerifyTest.java index 40bbe0887f71..90b0355f6a75 100644 --- a/test/unit/org/apache/cassandra/io/sstable/VerifyTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/VerifyTest.java @@ -71,6 +71,7 @@ import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TestCompressionParamsFactory; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.OutputHandler; @@ -117,7 +118,7 @@ public class VerifyTest @BeforeClass public static void defineSchema() throws ConfigurationException { - CompressionParams compressionParameters = CompressionParams.snappy(32768); + CompressionParams compressionParameters = TestCompressionParamsFactory.snappy(32768); DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setColumnIndexSizeInKiB(0); diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java index e6b5dd0c0962..4cd47c58f177 100644 --- a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java +++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java @@ -37,7 +37,7 @@ import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.compress.CompressionMetadata.Chunk; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TestCompressionParamsFactory; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.utils.Clock.Global.nanoTime; @@ -312,7 +312,7 @@ public void testMapForCompressionMetadata() throws Exception MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); try (SequentialWriter writer = new CompressedSequentialWriter(f, cf, null, SequentialWriterOption.DEFAULT, - CompressionParams.snappy(), sstableMetadataCollector)) + TestCompressionParamsFactory.snappy(), sstableMetadataCollector)) { writer.write(buffer); writer.finish(); @@ -393,7 +393,7 @@ public void testExtendForCompressionMetadata(int maxSegmentSize, int chunkSize, MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, cf, null, SequentialWriterOption.DEFAULT, - CompressionParams.deflate(chunkSize << 10), + TestCompressionParamsFactory.deflate(chunkSize << 10), sstableMetadataCollector)) { ByteBuffer slice = buffer.slice(); diff --git a/test/unit/org/apache/cassandra/schema/CompressionParamsTest.java b/test/unit/org/apache/cassandra/schema/CompressionParamsTest.java new file mode 100644 index 000000000000..c7e2abcf51de --- /dev/null +++ b/test/unit/org/apache/cassandra/schema/CompressionParamsTest.java @@ -0,0 +1,785 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Test; + +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.NoopCompressor; +import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.io.compress.ZstdCompressor; + +import static java.lang.String.format; +import static org.apache.cassandra.schema.CompressionParams.CHUNK_LENGTH_IN_KB; +import static org.apache.cassandra.schema.CompressionParams.CLASS; +import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; +import static org.apache.cassandra.schema.CompressionParams.MAX_COMPRESSED_LENGTH; +import static org.apache.cassandra.schema.CompressionParams.MIN_COMPRESS_RATIO; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class CompressionParamsTest +{ + private static ParameterizedClass emptyParameterizedClass() + { + return new ParameterizedClass(null, new HashMap<>()); + } + + @Test + public void additionalParamsTest() + { + // no map + ParameterizedClass options = new ParameterizedClass(); + CompressionParams params = CompressionParams.fromParameterizedClass(options); + assertThat(params.getOtherOptions()).isNotNull(); + assertThat(params.getOtherOptions().isEmpty()).isTrue(); + + options = emptyParameterizedClass(); + params = CompressionParams.fromParameterizedClass(options); + assertThat(params.getOtherOptions()).isNotNull(); + assertThat(params.getOtherOptions().isEmpty()).isTrue(); + + options.class_name = TestCompressor.class.getName(); + options.parameters.put("foo", "bar"); + params = CompressionParams.fromParameterizedClass(options); + assertThat(params.getOtherOptions()).isNotNull(); + assertThat(params.getOtherOptions().get("foo")).isEqualTo("bar"); + } + + // Tests chunklength settings for both Options and Map. + private static void chunkLengthTest(BiConsumer put, Consumer remove, Function func, T instance) + { + // CHUNK_LENGTH + + // test empty string + put.accept(CompressionParams.CHUNK_LENGTH, ""); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length' value"); + + // text zero string + put.accept(CompressionParams.CHUNK_LENGTH, "0MiB"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length' value"); + + + // test properly formated value + put.accept(CompressionParams.CHUNK_LENGTH, "1MiB"); + CompressionParams params = func.apply(instance); + assertEquals(1024 * 1024, params.chunkLength()); + + // test bad string + put.accept(CompressionParams.CHUNK_LENGTH, "badvalue"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length' value"); + + // test not power of 2 + put.accept(CompressionParams.CHUNK_LENGTH, "3MiB"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length' value") + .withMessageContaining("Must be a power of 2"); + + remove.accept(CompressionParams.CHUNK_LENGTH); + + + // CHUNK_LENGTH_IN_KB + // same tests as above + + put.accept(CHUNK_LENGTH_IN_KB, ""); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length_in_kb' value"); + + put.accept(CHUNK_LENGTH_IN_KB, "0"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length_in_kb' value"); + + put.accept(CHUNK_LENGTH_IN_KB, "1"); + params = func.apply(instance); + assertEquals(1024, params.chunkLength()); + + put.accept(CHUNK_LENGTH_IN_KB, "badvalue"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length_in_kb' value"); + + put.accept(CHUNK_LENGTH_IN_KB, "3"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length_in_kb' value") + .withMessageContaining("Must be a power of 2"); + + // test negative value + put.accept(CHUNK_LENGTH_IN_KB, "-1"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'chunk_length_in_kb' value") + .withMessageContaining("May not be <= 0"); + + remove.accept(CHUNK_LENGTH_IN_KB); + + + // TEST COMBINATIONS + put.accept(CompressionParams.CHUNK_LENGTH, "3MiB"); + put.accept(CHUNK_LENGTH_IN_KB, "2"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessage(CompressionParams.TOO_MANY_CHUNK_LENGTH); + } + + @Test + public void chunkLengthTest() + { + ParameterizedClass options = emptyParameterizedClass(); + chunkLengthTest(options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + Map map = new HashMap(); + map.put(CLASS, "lz4"); + Consumer remove = map::remove; + chunkLengthTest(map::put, remove, CompressionParams::fromMap, map); + } + + private static void minCompressRatioTest(BiConsumer put, Function func, T instance) + { + + CompressionParams params = func.apply(instance); + assertEquals(CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, params.minCompressRatio(), Double.MIN_VALUE); + assertEquals(Integer.MAX_VALUE, params.maxCompressedLength()); + + put.accept(MIN_COMPRESS_RATIO, "0.0"); //CompressionParams.DEFAULT_MIN_COMPRESS_RATIO + params = func.apply(instance); + assertEquals(CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, params.minCompressRatio(), Double.MIN_VALUE); + assertEquals(Integer.MAX_VALUE, params.maxCompressedLength()); + + put.accept(MIN_COMPRESS_RATIO, "0.3"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'min_compress_ratio' value") + .withMessageContaining("Can either be 0 or greater than or equal to 1"); + + put.accept(MIN_COMPRESS_RATIO, "1.3"); + params = func.apply(instance); + assertEquals(1.3, params.minCompressRatio(), Double.MIN_VALUE); + assertEquals((int) Math.ceil(DEFAULT_CHUNK_LENGTH / 1.3), params.maxCompressedLength()); + + put.accept(MIN_COMPRESS_RATIO, "-1.0"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'min_compress_ratio' value") + .withMessageContaining("Can either be 0 or greater than or equal to 1"); + } + + @Test + public void minCompressRatioTest() + { + ParameterizedClass options = emptyParameterizedClass(); + minCompressRatioTest(options.parameters::put, CompressionParams::fromParameterizedClass, options); + + Map map = new HashMap<>(); + map.put(CLASS, "lz4"); + minCompressRatioTest(map::put, CompressionParams::fromMap, map); + } + + private static void maxCompressedLengthTest(BiConsumer put, Function func, T instance) + { + CompressionParams params = func.apply(instance); + assertEquals(Integer.MAX_VALUE, params.maxCompressedLength()); + assertEquals(CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, params.minCompressRatio(), Double.MIN_VALUE); + + put.accept(CompressionParams.MAX_COMPRESSED_LENGTH, ""); + params = func.apply(instance); + assertEquals(Integer.MAX_VALUE, params.maxCompressedLength()); + assertEquals(CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, params.minCompressRatio(), Double.MIN_VALUE); + + put.accept(CompressionParams.MAX_COMPRESSED_LENGTH, "4KiB"); + params = func.apply(instance); + assertEquals(4 * 1024, params.maxCompressedLength()); + assertEquals(DEFAULT_CHUNK_LENGTH / (4.0 * 1024), params.minCompressRatio(), Double.MIN_VALUE); + + put.accept(CompressionParams.MAX_COMPRESSED_LENGTH, "badvalue"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'max_compressed_length' value") + .withMessageContaining("Invalid data storage"); + } + + @Test + public void maxCompressedLengthTest() + { + ParameterizedClass options = emptyParameterizedClass(); + maxCompressedLengthTest(options.parameters::put, CompressionParams::fromParameterizedClass, options); + + Map map = new HashMap<>(); + map.put(CLASS, "lz4"); + maxCompressedLengthTest(map::put, CompressionParams::fromMap, map); + } + + @Test + public void maxCompressionLengthAndMinCompressRatioTest() + { + ParameterizedClass options = emptyParameterizedClass(); + options.parameters.put(MIN_COMPRESS_RATIO, "1.0"); + options.parameters.put(CompressionParams.MAX_COMPRESSED_LENGTH, "4Gib"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromParameterizedClass(options)) + .withMessage("Can not specify both 'min_compress_ratio' and 'max_compressed_length' for the compressor parameters."); + + Map map = new HashMap<>(); + map.put(CLASS, "lz4"); + map.put(MIN_COMPRESS_RATIO, "1.0"); + map.put(CompressionParams.MAX_COMPRESSED_LENGTH, "4Gib"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromMap(map)) + .withMessage("Can not specify both 'min_compress_ratio' and 'max_compressed_length' for the compressor parameters."); + } + + private static void assertParams(CompressionParams params, boolean enabled, int chunkLength, int maxCompressedLength, double minCompressRatio, Class compressor) + { + assertThat(params.isEnabled()).isEqualTo(enabled); + assertThat(params.chunkLength()).isEqualTo(chunkLength); + assertThat(params.maxCompressedLength()).isEqualTo(maxCompressedLength); + assertThat(params.minCompressRatio()).isEqualTo(minCompressRatio); + if (compressor != null) + assertThat(params.getSstableCompressor()).isInstanceOf(compressor); + else + assertThat(params.getSstableCompressor()).isNull(); + } + + + @Test + public void defaultTest() + { + CompressionParams params = CompressionParams.fromParameterizedClass(emptyParameterizedClass()); + assertParams(params, true, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, LZ4Compressor.class); + roundTripMapTest(params); + + params = CompressionParams.fromParameterizedClass(null); + assertParams(params, true, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, LZ4Compressor.class); + roundTripMapTest(params); + + params = CompressionParams.fromMap(Collections.emptyMap()); + assertParams(params, true, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, LZ4Compressor.class); + roundTripMapTest(params); + } + + private static void paramsTest(boolean enabled, Class clazz, BiConsumer put, Consumer remove, Function func, T instance) + { + int expectedChunkLength = 4 * 1024 * 1024; + int expectedCompressedLength = 2 * 1024 * 1024; + int expectedRatioCompressedLength = 2796203; + + CompressionParams params = func.apply(instance); + assertParams(params, enabled, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, clazz); + roundTripMapTest(params); + + put.accept(CompressionParams.CHUNK_LENGTH, "4MiB"); + params = func.apply(instance); + assertParams(params, enabled, expectedChunkLength, Integer.MAX_VALUE, CompressionParams.DEFAULT_MIN_COMPRESS_RATIO, clazz); + roundTripMapTest(params); + + put.accept(CompressionParams.MAX_COMPRESSED_LENGTH, "2MiB"); + params = func.apply(instance); + assertParams(params, enabled, expectedChunkLength, expectedCompressedLength, 2.0, clazz); + roundTripMapTest(params); + + put.accept(CompressionParams.MAX_COMPRESSED_LENGTH, "2097151KiB"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> func.apply(instance)) + .withMessageContaining("Invalid 'max_compressed_length' value for the 'compression' option: Must be less than or equal to chunk length"); + assertParams(params, enabled, expectedChunkLength, expectedCompressedLength, 2.0, clazz); + roundTripMapTest(params); + + remove.accept(CompressionParams.MAX_COMPRESSED_LENGTH); + put.accept(MIN_COMPRESS_RATIO, "1.5"); + params = func.apply(instance); + assertParams(params, enabled, expectedChunkLength, expectedRatioCompressedLength, 1.5, clazz); + roundTripMapTest(params); + + put.accept(CompressionParams.ENABLED, "false"); + params = func.apply(instance); + assertParams(params, false, expectedChunkLength, expectedRatioCompressedLength, 1.5, null); + roundTripMapTest(params); + } + + @Test + public void fromMapTest() + { + // chunk length < 0 + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromMap(new HashMap() + {{ + put(CLASS, TestCompressor.class.getName()); + put(CHUNK_LENGTH_IN_KB, "-1"); + put(MIN_COMPRESS_RATIO, "0"); + }})) + .withMessage(format("Invalid '%s' value for the 'compression' option. May not be <= 0: -1", CHUNK_LENGTH_IN_KB)); + + // chunk length = 0 + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromMap(new HashMap() + {{ + put(CLASS, TestCompressor.class.getName()); + put(CHUNK_LENGTH_IN_KB, "0"); + put(MIN_COMPRESS_RATIO, "0"); + }})) + .withMessage(format("Invalid '%s' value for the 'compression' option. May not be <= 0: 0", CHUNK_LENGTH_IN_KB)); + + // min compress ratio < 0 + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromMap(new HashMap() + {{ + put(CLASS, TestCompressor.class.getName()); + put(CHUNK_LENGTH_IN_KB, Integer.toString(DEFAULT_CHUNK_LENGTH)); + put(MIN_COMPRESS_RATIO, "-1.0"); + }})) + .withMessageContaining(format("Invalid '%s' value for the 'compression' option. Can either be 0 or greater than or equal to 1", MIN_COMPRESS_RATIO)); + + // 0 < min compress ratio < 1 + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromMap(new HashMap() + {{ + put(CLASS, TestCompressor.class.getName()); + put(CHUNK_LENGTH_IN_KB, Integer.toString(DEFAULT_CHUNK_LENGTH)); + put(MIN_COMPRESS_RATIO, "0.5"); + }})) + .withMessageContaining(format("Invalid '%s' value for the 'compression' option. Can either be 0 or greater than or equal to 1", MIN_COMPRESS_RATIO)); + + // max compressed length > chunk length + int len = 1 << 5; + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromMap(new HashMap() + {{ + put(CLASS, TestCompressor.class.getName()); + put(CHUNK_LENGTH_IN_KB, Integer.toString(len)); + put(MAX_COMPRESSED_LENGTH, Integer.toString(len + 1) + "KiB"); + }})) + .withMessageContaining(format("Invalid '%s' value for the 'compression' option: Must be less than or equal to chunk length", MAX_COMPRESSED_LENGTH)); + } + + private static void roundTripMapTest(CompressionParams params) + { + Map map = params.asMap(); + + CompressionParams other = CompressionParams.fromMap(map); + if (params.isEnabled()) + { + assertTrue(other.isEnabled()); + assertThat(other.getOtherOptions()).isEqualTo(params.getOtherOptions()); + assertThat(other.maxCompressedLength()).isEqualTo(params.maxCompressedLength()); + assertThat(other.minCompressRatio()).isEqualTo(params.minCompressRatio()); + assertThat(other.chunkLength()).isEqualTo(params.chunkLength()); + assertThat(other.getCrcCheckChance()).isEqualTo(params.getCrcCheckChance()); + assertThat(other.klass()).isEqualTo(params.klass()); + } + else + { + assertThat(map.size()).isEqualTo(1); + assertThat(map.get(CompressionParams.ENABLED)).isEqualTo("false"); + assertFalse(other.isEnabled()); + assertTrue(other.getOtherOptions().isEmpty()); + assertThat(other.maxCompressedLength()).isEqualTo(Integer.MAX_VALUE); + assertThat(other.minCompressRatio()).isEqualTo(CompressionParams.DEFAULT_MIN_COMPRESS_RATIO); + assertThat(other.chunkLength()).isEqualTo(DEFAULT_CHUNK_LENGTH); + assertThat(other.getCrcCheckChance()).isEqualTo(1.0); + assertThat(other.getSstableCompressor()).isNull(); + } + } + + @Test + public void lz4Test() + { + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = CompressionParams.CompressorType.lz4.name(); + paramsTest(true, LZ4Compressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = LZ4Compressor.class.getSimpleName(); + paramsTest(true, LZ4Compressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = LZ4Compressor.class.getName(); + paramsTest(true, LZ4Compressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = LZ4Compressor.class.getName(); + options.parameters.put(LZ4Compressor.LZ4_COMPRESSOR_TYPE, LZ4Compressor.LZ4_FAST_COMPRESSOR); + paramsTest(true, LZ4Compressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + Map map = new HashMap<>(); + map.put(CLASS, CompressionParams.CompressorType.lz4.name()); + paramsTest(true, LZ4Compressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, LZ4Compressor.class.getName()); + paramsTest(true, LZ4Compressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, LZ4Compressor.class.getName()); + paramsTest(true, LZ4Compressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, LZ4Compressor.class.getName()); + map.put(LZ4Compressor.LZ4_COMPRESSOR_TYPE, LZ4Compressor.LZ4_FAST_COMPRESSOR); + paramsTest(true, LZ4Compressor.class, map::put, map::remove, CompressionParams::fromMap, map); + } + + @Test + public void noneTest() + { + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = CompressionParams.CompressorType.none.name(); + paramsTest(false, null, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = CompressionParams.CompressorType.none.name(); + options.parameters.put("foo", "bar"); + paramsTest(false, null, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = null; // constructs the default class + paramsTest(true, LZ4Compressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + Map map = new HashMap<>(); + map.put(CompressionParams.ENABLED, "false"); + paramsTest(false, null, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CompressionParams.ENABLED, "false"); + map.put(CLASS, "dummy"); + paramsTest(false, null, map::put, map::remove, CompressionParams::fromMap, map); + } + + @Test + public void noopTest() + { + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = CompressionParams.CompressorType.noop.name(); + paramsTest(true, NoopCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = "NoopCompressor"; + paramsTest(true, NoopCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = NoopCompressor.class.getName(); + paramsTest(true, NoopCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = NoopCompressor.class.getName(); + options.parameters.put("foo", "bar"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> paramsTest(true, NoopCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options)) + .withMessageContaining("Unknown compression options: ([foo])"); + + Map map = new HashMap<>(); + map.put(CLASS, CompressionParams.CompressorType.noop.name()); + paramsTest(true, NoopCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, "NoopCompressor"); + paramsTest(true, NoopCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, NoopCompressor.class.getName()); + paramsTest(true, NoopCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, NoopCompressor.class.getName()); + map.put("foo", "bor"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> paramsTest(true, NoopCompressor.class, map::put, map::remove, CompressionParams::fromMap, map)) + .withMessageContaining("Unknown compression options: ([foo])"); + } + + @Test + public void snappyTest() + { + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = CompressionParams.CompressorType.snappy.name(); + paramsTest(true, SnappyCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = SnappyCompressor.class.getSimpleName(); + paramsTest(true, SnappyCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = SnappyCompressor.class.getName(); + paramsTest(true, SnappyCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = SnappyCompressor.class.getName(); + options.parameters.put("foo", "bar"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> paramsTest(true, SnappyCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options)) + .withMessageContaining("Unknown compression options: ([foo])"); + + Map map = new HashMap<>(); + map.put(CLASS, CompressionParams.CompressorType.snappy.name()); + paramsTest(true, SnappyCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, SnappyCompressor.class.getName()); + paramsTest(true, SnappyCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, SnappyCompressor.class.getName()); + paramsTest(true, SnappyCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, SnappyCompressor.class.getName()); + map.put("foo", "bor"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> paramsTest(true, SnappyCompressor.class, map::put, map::remove, CompressionParams::fromMap, map)) + .withMessageContaining("Unknown compression options: ([foo])"); + } + + @Test + public void deflateTest() + { + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = CompressionParams.CompressorType.deflate.name(); + paramsTest(true, DeflateCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = DeflateCompressor.class.getSimpleName(); + paramsTest(true, DeflateCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = DeflateCompressor.class.getName(); + paramsTest(true, DeflateCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = DeflateCompressor.class.getName(); + options.parameters.put("foo", "bar"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> paramsTest(true, DeflateCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options)) + .withMessageContaining("Unknown compression options: ([foo])"); + + + Map map = new HashMap<>(); + map.put(CLASS, CompressionParams.CompressorType.deflate.name()); + paramsTest(true, DeflateCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, DeflateCompressor.class.getSimpleName()); + paramsTest(true, DeflateCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, DeflateCompressor.class.getName()); + paramsTest(true, DeflateCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, DeflateCompressor.class.getName()); + map.put("foo", "bor"); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> paramsTest(true, DeflateCompressor.class, map::put, map::remove, CompressionParams::fromMap, map)) + .withMessageContaining("Unknown compression options: ([foo])"); + } + + @Test + public void zstdTest() + { + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = CompressionParams.CompressorType.zstd.name(); + paramsTest(true, ZstdCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = ZstdCompressor.class.getSimpleName(); + paramsTest(true, ZstdCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = ZstdCompressor.class.getName(); + paramsTest(true, ZstdCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = ZstdCompressor.class.getName(); + options.parameters.put(ZstdCompressor.COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(ZstdCompressor.FAST_COMPRESSION_LEVEL)); + paramsTest(true, ZstdCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + Map map = new HashMap<>(); + map.put(CLASS, CompressionParams.CompressorType.zstd.name()); + paramsTest(true, ZstdCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, ZstdCompressor.class.getSimpleName()); + paramsTest(true, ZstdCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, ZstdCompressor.class.getName()); + paramsTest(true, ZstdCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, ZstdCompressor.class.getName()); + map.put(ZstdCompressor.COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(ZstdCompressor.FAST_COMPRESSION_LEVEL)); + paramsTest(true, ZstdCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + } + + @Test + public void customTest() + { + // test with options + ParameterizedClass options = emptyParameterizedClass(); + options.class_name = TestCompressor.class.getName(); + paramsTest(true, TestCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + options.parameters.clear(); + options.class_name = TestCompressor.class.getName(); + options.parameters.put("foo", "bar"); + paramsTest(true, TestCompressor.class, options.parameters::put, options.parameters::remove, CompressionParams::fromParameterizedClass, options); + + // test with map + Map map = new HashMap<>(); + map.put(CLASS, TestCompressor.class.getName()); + paramsTest(true, TestCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + map.clear(); + map.put(CLASS, TestCompressor.class.getName()); + map.put("foo", "bar"); + paramsTest(true, TestCompressor.class, map::put, map::remove, CompressionParams::fromMap, map); + + // test invalid class name + options.parameters.clear(); + options.class_name = "foo"; + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromParameterizedClass(options)) + .withMessage("Could not create Compression for type org.apache.cassandra.io.compress.foo"); + map.clear(); + options.class_name = "foo"; + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromParameterizedClass(options)) + .withMessage("Could not create Compression for type org.apache.cassandra.io.compress.foo"); + } + + @Test + public void fromParameterizedClassTest() + { + Map params = new HashMap<>(); + params.put("enabled", "true"); + params.put("chunk_length", "16KiB"); + params.put("min_compress_ratio", "0.0"); + params.put("max_comrpessed_length", "16KiB"); + params.put("class_specific_parameter", "value"); + ParameterizedClass parameterizedClass = new ParameterizedClass("lz4", params); + assertThatExceptionOfType(ConfigurationException.class).isThrownBy(() -> CompressionParams.fromParameterizedClass(parameterizedClass)) + .withMessageContaining("Unknown compression options:") + .withMessageContaining("class_specific_parameter") + .withMessageContaining("max_comrpessed_length"); + + params.remove("class_specific_parameter"); + params.remove("max_comrpessed_length"); + CompressionParams cp = CompressionParams.fromParameterizedClass(parameterizedClass); + assertNotNull(cp); + } + + @Test + public void testMap() + { + Map params = new HashMap<>(); + params.put(CompressionParams.ENABLED, "true"); + params.put(CompressionParams.CHUNK_LENGTH, "16KiB"); + params.put(MIN_COMPRESS_RATIO, "0.0"); + ParameterizedClass parameterizedClass = new ParameterizedClass("lz4", params); + Map map = CompressionParams.fromParameterizedClass(parameterizedClass).asMap(); + assertEquals("16", map.remove(CHUNK_LENGTH_IN_KB)); + assertEquals(CompressionParams.CompressorType.lz4.className, map.remove(CLASS)); + assertTrue(map.isEmpty()); + + params.put(MIN_COMPRESS_RATIO, "1.5"); + parameterizedClass = new ParameterizedClass("lz4", params); + map = CompressionParams.fromParameterizedClass(parameterizedClass).asMap(); + assertEquals("16", map.remove(CHUNK_LENGTH_IN_KB)); + assertEquals(CompressionParams.CompressorType.lz4.className, map.remove(CLASS)); + assertEquals("1.5", map.remove(MIN_COMPRESS_RATIO)); + assertTrue(map.isEmpty()); + + params.put(LZ4Compressor.LZ4_COMPRESSOR_TYPE, LZ4Compressor.LZ4_FAST_COMPRESSOR); + parameterizedClass = new ParameterizedClass("lz4", params); + map = CompressionParams.fromParameterizedClass(parameterizedClass).asMap(); + assertEquals("16", map.remove(CHUNK_LENGTH_IN_KB)); + assertEquals(CompressionParams.CompressorType.lz4.className, map.remove(CLASS)); + assertEquals("1.5", map.remove(MIN_COMPRESS_RATIO)); + assertEquals(LZ4Compressor.LZ4_FAST_COMPRESSOR, map.remove(LZ4Compressor.LZ4_COMPRESSOR_TYPE)); + assertTrue(map.isEmpty()); + } + + @Test + public void testChunkSizeCalculation() + { + Map params = new HashMap<>(); + params.put(CompressionParams.CHUNK_LENGTH, "16KiB"); + ParameterizedClass parameterizedClass = new ParameterizedClass("lz4", params); + Map map = CompressionParams.fromParameterizedClass(parameterizedClass).asMap(); + + Map params2 = new HashMap<>(); + params2.put(CHUNK_LENGTH_IN_KB, "16"); + ParameterizedClass parameterizedClass2 = new ParameterizedClass("lz4", params2); + Map map2 = CompressionParams.fromParameterizedClass(parameterizedClass2).asMap(); + + assertEquals(map.get(CHUNK_LENGTH_IN_KB), map2.get(CHUNK_LENGTH_IN_KB)); + } + + public static class TestCompressor implements ICompressor + { + Map options; + + static public TestCompressor create(Map options) + { + return new TestCompressor(options); + } + + private TestCompressor(Map options) + { + this.options = options; + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return 0; + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + return 0; + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + } + + @Override + public BufferType preferredBufferType() + { + return null; + } + + @Override + public boolean supports(BufferType bufferType) + { + return false; + } + + @Override + public Set supportedOptions() + { + return options.keySet(); + } + } +} diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java index 089077f81f22..906c96cd32d8 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java @@ -148,7 +148,7 @@ public void testConversionsInverses() throws Exception checkInverses(cfs.metadata()); // Testing with compression to catch #3558 - TableMetadata withCompression = cfs.metadata().unbuild().compression(CompressionParams.snappy(32768)).build(); + TableMetadata withCompression = cfs.metadata().unbuild().compression(TestCompressionParamsFactory.snappy(32768)).build(); checkInverses(withCompression); } } diff --git a/test/unit/org/apache/cassandra/schema/TestCompressionParamsFactory.java b/test/unit/org/apache/cassandra/schema/TestCompressionParamsFactory.java new file mode 100644 index 000000000000..de5cbcc1cad8 --- /dev/null +++ b/test/unit/org/apache/cassandra/schema/TestCompressionParamsFactory.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.util.Map; + +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.NoopCompressor; +import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.io.compress.ZstdCompressor; + +import static java.util.Collections.emptyMap; +import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; +import static org.apache.cassandra.schema.CompressionParams.DEFAULT_MIN_COMPRESS_RATIO; + +/** + * Contains simple constructors for various Compression implementations. + * They are a little inconsistent in their choice of parameters -- this is done on purpose to test out various compression parameter combinations. + */ +public class TestCompressionParamsFactory +{ + /** + * Creates Snappy CompressionParams with default chunk length and compression ratio of 1.1 + * @return CompressionParams for a SnappyCompressor with Default chunk length and compression ratio of 1.1 + */ + public static CompressionParams snappy() + { + return snappy(DEFAULT_CHUNK_LENGTH); + } + + /** + * Creates Snappy CompressionParams with specified chunk length and compression ratio of 1.1 + * @param chunkLength the chunklength. + * @return CompressionParams for a SnappyCompressor with specified chunk length and compression ratio of 1.1 + */ + public static CompressionParams snappy(int chunkLength) + { + return snappy(chunkLength, 1.1); + } + + /** + * Creates Snappy CompressionParams + * @param chunkLength the chunklength. + * @param minCompressRatio the minimum compress ratio + * @return CompressionParams for a SnappyCompressor with specified chunk length and compression ratio + */ + public static CompressionParams snappy(int chunkLength, double minCompressRatio) + { + return new CompressionParams(SnappyCompressor.class.getName(), emptyMap(), chunkLength, minCompressRatio); + } + + /** + * Creates Deflate CompressionParams + * @return CompressionParams for a DeflateCompressor with default chunk length + */ + public static CompressionParams deflate() + { + return deflate(DEFAULT_CHUNK_LENGTH); + } + + /** + * Creates Deflate CompressionParams with the specified chunk length + * @param chunkLength the chunk length + * @return CompressionParams for a DeflateCompressor with specified chunk length. + */ + public static CompressionParams deflate(int chunkLength) + { + return new CompressionParams(DeflateCompressor.class.getName(), emptyMap(), chunkLength, DEFAULT_MIN_COMPRESS_RATIO); + } + + /** + * Creates LZ4 CompressionParams with the default chunk length + * @return CompressionParams for a Lz4Compressor with default chunk length. + */ + public static CompressionParams lz4() + { + return lz4(DEFAULT_CHUNK_LENGTH); + } + + /** + * Creates LZ4 CompressionParams with the specified chunk length + * and a max compressed length of the same size. + * @param chunkLength the chunk and max compressed length. + * @return CompressionParams for a Lz4Compressor with specified chunk length. + */ + public static CompressionParams lz4(int chunkLength) + { + return lz4(chunkLength, chunkLength); + } + + /** + * Creates LZ4 CompressionParams with the specified chunk length and max compressed length + * @param chunkLength the chunnk length + * @param maxCompressedLength the max compressed size + * @return CompressionParams for a Lz4Compressor. + */ + public static CompressionParams lz4(int chunkLength, int maxCompressedLength) + { + return lz4(chunkLength, maxCompressedLength, emptyMap()); + } + + /** + * Creates LZ4 CompressionParams with the specified chunk length and max compressed length + * @param chunkLength the chunnk length + * @param maxCompressedLength the max compressed size + * @param otherOptions additional for this compressor + * @return CompressionParams for a Lz4Compressor. + */ + public static CompressionParams lz4(int chunkLength, int maxCompressedLength, Map otherOptions) + { + return new CompressionParams(LZ4Compressor.class.getName(), chunkLength, maxCompressedLength, otherOptions); + } + + /** + * Creates Zstd CompressionParams with default chunk length + * @return CompressionParams for a ZstdCompressor with default chunk length. + */ + public static CompressionParams zstd() + { + return zstd(DEFAULT_CHUNK_LENGTH); + } + + /** + * Creates Zstd CompressionParams with specified chunk length + * @param chunkLength the chunk length + * @return CompressionParams for a ZstdCompressor with specified chunk length. + */ + public static CompressionParams zstd(Integer chunkLength) + { + return zstd(chunkLength, emptyMap()); + } + + /** + * Creates Zstd CompressionParams with specified chunk length + * @param chunkLength the chunk length + * @param otherOptions additional for this compressor + * @return CompressionParams for a ZstdCompressor with specified chunk length. + */ + public static CompressionParams zstd(Integer chunkLength, Map otherOptions) + { + return new CompressionParams(ZstdCompressor.class.getName(), otherOptions, chunkLength, DEFAULT_MIN_COMPRESS_RATIO); + } + + /** + * Creates Noop CompressionParams with default chunk length + * @return CompressionParams for a NoopCompressor with default chunk length. + */ + public static CompressionParams noop() + { + return new CompressionParams(NoopCompressor.class.getName(), emptyMap(), DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO); + } + +} diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index 391d58972106..4069b4bc19e9 100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@ -48,6 +48,7 @@ import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TestCompressionParamsFactory; import org.apache.cassandra.utils.ChecksumType; import static org.junit.Assert.assertEquals; @@ -126,7 +127,7 @@ private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate, Descriptor desc = new Descriptor(parentDir, "ks", "cf", new SequenceBasedSSTableId(1)); File tmp = desc.fileFor(Components.DATA); MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); - CompressionParams param = CompressionParams.snappy(32, minCompressRatio); + CompressionParams param = TestCompressionParamsFactory.snappy(32, minCompressRatio); Map index = new HashMap(); try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.fileFor(Components.COMPRESSION_INFO),