diff --git a/.build/build-rat.xml b/.build/build-rat.xml index 755b76ea58ee..17c1d0ede27d 100644 --- a/.build/build-rat.xml +++ b/.build/build-rat.xml @@ -62,6 +62,7 @@ + diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index c41c3cb973a4..ee6c1dd9e255 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -604,6 +604,50 @@ commitlog_segment_size: 32MiB # # flush_compression: fast +# Defines the default compression used on tables when none is specified +# in the CQL command. +# +# The class_name is the compressor class name. It may be one of the aliases, +# the class name of a system ICompressor implementation, or fully qualified +# name of a class that implements ICompressor and has a public static 'create' method that accepts +# a Map argument and returns an instance of the class. +# +# class aliases are: +# Alias System compressor impl. +# deflate DeflateCompressor +# lz4 LZ4Compressor +# none (null) -- compresson disabled +# noop NoopCompressor +# snappy SnappyCompressor +# zstd ZstdCompressor +# +# The standard parameters are any required or optional parameter for the instantiation of the +# specified class, or one of the following standard parameters: +# Parameter Usage +# enabled Disables compression if set to false. Defaults to true. +# chunk_length The length of the compresson chunks, must include KiB, MiB or GiB suffix, defaults to 16KiB. +# chunk_length_in_kb Same as above but expects an integer. +# min_compress_ratio The minimal acceptable compression, must greater than or equal to 1.0. +# max_compressed_length The maximum size for a compressed block. Must be less than or equal to chunk_length. +# Must include KiB, MiB or GiB suffix. Defaults to Integer.MAX_VALUE +# +# Only one of the min_compress_ratio and max_compressed_length options can be specified. +# They are mathematically related in that 'chunk_length / max_compressed_length = min_compress_ratio'. +# If neither option is specified a min_compress_ratio of 0.0 and a max_compressed_length of +# Integer.MAX_VALUE KB is the default. +# +# Only one of chunk_length or chunk_length_in_kb may be specified. +# +# Additional class specific parameters may be added to the parameters section. The value of the class specific +# parameter must be a string. +# +#sstable_compression: +# - class_name: lz4 +# parameters: +# - enabled: "true" +# chunk_length: 16KiB +# max_compressed_length: 16KiB + # any class that implements the SeedProvider interface and has a # constructor that takes a Map of parameters will do. seed_provider: diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 74c6fc10f526..16501e0346f2 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -64,7 +64,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet): ('class', 'max_threshold', 'tombstone_compaction_interval', 'tombstone_threshold', 'enabled', 'unchecked_tombstone_compaction', 'only_purge_repaired_tombstones', 'provide_overlapping_tombstones')), ('compression', 'compression_parameters', - ('sstable_compression', 'chunk_length_kb', 'crc_check_chance')), + ('class', 'chunk_length', 'chunk_length_in_kb', 'crc_check_chance', 'enabled', 'min_compress_ratio', 'max_compressed_length')), ('caching', None, ('rows_per_partition', 'keys')), ) @@ -498,7 +498,7 @@ def cf_prop_val_completer(ctxt, cass): exist_opts = ctxt.get_binding('propname') this_opt = exist_opts[-1] if this_opt == 'compression': - return ["{'sstable_compression': '"] + return ["{'class': '"] if this_opt == 'compaction': return ["{'class': '"] if this_opt == 'caching': @@ -563,7 +563,7 @@ def cf_prop_val_mapval_completer(ctxt, cass): return [Hint('')] return [Hint('')] elif opt == 'compression': - if key == 'sstable_compression': + if key == 'class': return list(map(escape_value, CqlRuleSet.available_compression_classes)) return [Hint('')] elif opt == 'caching': diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 2157b225ebdc..2c1a6f215514 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1110,6 +1110,9 @@ public enum PaxosOnLinearizabilityViolation */ public ParameterizedClass default_compaction = null; + @Nullable + public ParameterizedClass sstable_compression; + public static Supplier getOverrideLoadConfig() { return overrideLoadConfig; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 298ed3f8f92c..3fe57c7af645 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -226,6 +226,8 @@ public class DatabaseDescriptor private static ImmutableMap> sstableFormats; private static volatile SSTableFormat selectedSSTableFormat; + private static ParameterizedClass sstableCompression; + private static Function commitLogSegmentMgrProvider = c -> DatabaseDescriptor.isCDCEnabled() ? new CommitLogSegmentManagerCDC(c, DatabaseDescriptor.getCommitLogLocation()) : new CommitLogSegmentManagerStandard(c, DatabaseDescriptor.getCommitLogLocation()); @@ -867,7 +869,7 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m if (conf.allow_extra_insecure_udfs) logger.warn("Allowing java.lang.System.* access in UDFs is dangerous and not recommended. Set allow_extra_insecure_udfs: false to disable."); - if(conf.scripted_user_defined_functions_enabled) + if (conf.scripted_user_defined_functions_enabled) throw new ConfigurationException("JavaScript user-defined functions were removed in CASSANDRA-18252. " + "Hooks are planned to be introduced as part of CASSANDRA-17280"); @@ -966,6 +968,8 @@ else if (conf.max_value_size.toMebibytes() >= 2048) if (conf.paxos_state_purging == null) conf.paxos_state_purging = PaxosStatePurging.legacy; + sstableCompression = conf.sstable_compression; + logInitializationOutcome(logger); if (conf.max_space_usable_for_compactions_in_percentage < 0 || conf.max_space_usable_for_compactions_in_percentage > 1) @@ -2569,6 +2573,16 @@ public static void setFlushCompression(Config.FlushCompression compression) conf.flush_compression = compression; } + public static ParameterizedClass getSSTableCompression() + { + return sstableCompression; + } + + public static void setSSTableCompression(ParameterizedClass compressor) + { + conf.sstable_compression = compressor; + } + /** * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use diff --git a/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java b/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java index 9367cb444d80..0a026dcea5e6 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java +++ b/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java @@ -89,7 +89,7 @@ private static CompressionParams buildCompressionParams(TableMetadata metadata, if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) { // The default compressor is generally fast (LZ4 with 16KiB block size) - compressionParams = CompressionParams.DEFAULT; + compressionParams = CompressionParams.defaultParams(); break; } // else fall through diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index d826acc28b49..90c580018a18 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -18,23 +18,24 @@ package org.apache.cassandra.schema; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; @@ -43,18 +44,16 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static org.apache.cassandra.config.CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT; public final class CompressionParams { - private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class); - - private static volatile boolean hasLoggedSsTableCompressionWarning; - private static volatile boolean hasLoggedChunkLengthWarning; - private static volatile boolean hasLoggedCrcCheckChanceWarning; - - public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; + public static final CompressorType DEFAULT_COMPRESSION_TYPE = CompressorType.lz4; + public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; // in KB public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; // Since pre-4.0 versions do not understand the // new compression parameter we can't use a // different default value. @@ -62,169 +61,216 @@ public final class CompressionParams public static final String CLASS = "class"; public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb"; + /** + * Requires a DataStorageSpec suffix + */ + public static final String CHUNK_LENGTH = "chunk_length"; + /** + * Requires a DataStorageSpec suffix + */ + public static final String MAX_COMPRESSED_LENGTH = "max_compressed_length"; public static final String ENABLED = "enabled"; public static final String MIN_COMPRESS_RATIO = "min_compress_ratio"; - public static final CompressionParams DEFAULT = !CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean() - ? noCompression() - : new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), - DEFAULT_CHUNK_LENGTH, - calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), - DEFAULT_MIN_COMPRESS_RATIO, - Collections.emptyMap()); - - public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()), + public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(emptyMap()), // 4 KiB is often the underlying disk block size 1024 * 4, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, - Collections.emptyMap()); + emptyMap()); - private static final String CRC_CHECK_CHANCE_WARNING = "The option crc_check_chance was deprecated as a compression option. " + - "You should specify it as a top-level table option instead"; + private static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + emptyMap()); - @Deprecated public static final String SSTABLE_COMPRESSION = "sstable_compression"; - @Deprecated public static final String CHUNK_LENGTH_KB = "chunk_length_kb"; - @Deprecated public static final String CRC_CHECK_CHANCE = "crc_check_chance"; + @VisibleForTesting + static final String TOO_MANY_CHUNK_LENGTH = format("Only one of '%s' or '%s' may be specified", CHUNK_LENGTH, CHUNK_LENGTH_IN_KB); private final ICompressor sstableCompressor; + /** + * The chunk length in KB + */ private final int chunkLength; - private final int maxCompressedLength; // In content we store max length to avoid rounding errors causing compress/decompress mismatch. - private final double minCompressRatio; // In configuration we store min ratio, the input parameter. + /** + * The compressed length in KB. + * In content we store max length to avoid rounding errors causing compress/decompress mismatch. + */ + private final int maxCompressedLength; + /** + * The minimum compression ratio. + * In configuration we store min ratio, the input parameter. + * Ths is mathematically related to chunkLength and maxCompressedLength in that + * # chunk_length / max_compressed_length = min_compress_ratio + */ + private final double minCompressRatio; private final ImmutableMap otherOptions; // Unrecognized options, can be used by the compressor // TODO: deprecated, should now be carefully removed. Doesn't affect schema code as it isn't included in equals() and hashCode() private volatile double crcCheckChance = 1.0; - public static CompressionParams fromMap(Map opts) - { - Map options = copyOptions(opts); - String sstableCompressionClass; + public enum CompressorType + { + lz4(LZ4Compressor.class.getName(), LZ4Compressor::create), + noop(NoopCompressor.class.getName(), NoopCompressor::create), + snappy(SnappyCompressor.class.getName(), SnappyCompressor::create), + deflate(DeflateCompressor.class.getName(), DeflateCompressor::create), + zstd(ZstdCompressor.class.getName(), ZstdCompressor::create), + none(null, (opt) -> null); - if (!opts.isEmpty() && isEnabled(opts) && !containsSstableCompressionClass(opts)) - throw new ConfigurationException(format("Missing sub-option '%s' for the 'compression' option.", CLASS)); + final String className; + final Function,ICompressor> creator; - if (!removeEnabled(options)) - { - sstableCompressionClass = null; - - if (!options.isEmpty()) - throw new ConfigurationException(format("If the '%s' option is set to false no other options must be specified", ENABLED)); - } - else + CompressorType(String className, Function,ICompressor> creator) { - sstableCompressionClass = removeSstableCompressionClass(options); + this.className = className; + this.creator = creator; } - int chunkLength = removeChunkLength(options); - double minCompressRatio = removeMinCompressRatio(options); + static CompressorType forClass(String name) + { + if (name == null) + return none; - CompressionParams cp = new CompressionParams(sstableCompressionClass, options, chunkLength, minCompressRatio); - cp.validate(); + for (CompressorType type : CompressorType.values()) + { + if (Objects.equal(type.className, name)) + return type; + } + return null; + } - return cp; + public ICompressor create(Map options) { + return creator.apply(options); + } } - public Class klass() + public static CompressionParams defaultParams() { - return sstableCompressor.getClass(); + return fromParameterizedClass(DatabaseDescriptor.getSSTableCompression()); } - public static CompressionParams noCompression() + public static CompressionParams fromParameterizedClass(ParameterizedClass options) { - return new CompressionParams((ICompressor) null, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, 0.0, Collections.emptyMap()); - } - - // The shorthand methods below are only used for tests. They are a little inconsistent in their choice of - // parameters -- this is done on purpose to test out various compression parameter combinations. + if (options == null) + return DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean() ? DEFAULT : noCompression(); - @VisibleForTesting - public static CompressionParams snappy() - { - return snappy(DEFAULT_CHUNK_LENGTH); + return fromClassAndOptions(options.class_name, options.parameters == null ? emptyMap() : copyOptions(options.parameters)); } - @VisibleForTesting - public static CompressionParams snappy(int chunkLength) + public static CompressionParams fromMap(Map opts) { - return snappy(chunkLength, 1.1); - } + Map options = copyOptions(opts); - @VisibleForTesting - public static CompressionParams snappy(int chunkLength, double minCompressRatio) - { - return new CompressionParams(SnappyCompressor.instance, chunkLength, calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio, Collections.emptyMap()); - } + String sstableCompressionClass = removeSstableCompressionClass(options); - @VisibleForTesting - public static CompressionParams deflate() - { - return deflate(DEFAULT_CHUNK_LENGTH); + return fromClassAndOptions(sstableCompressionClass, options); } - @VisibleForTesting - public static CompressionParams deflate(int chunkLength) + public static CompressionParams noCompression() { - return new CompressionParams(DeflateCompressor.instance, chunkLength, Integer.MAX_VALUE, 0.0, Collections.emptyMap()); + return new CompressionParams(null, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, emptyMap()); } - @VisibleForTesting - public static CompressionParams lz4() + static int calcMaxCompressedLength(int chunkLength, double minCompressRatio) { - return lz4(DEFAULT_CHUNK_LENGTH); + return (int) Math.ceil(Math.min(chunkLength / minCompressRatio, Integer.MAX_VALUE)); } - @VisibleForTesting - public static CompressionParams lz4(int chunkLength) + static double calcMinCompressRatio(int chunkLength, int maxCompressedLength) { - return lz4(chunkLength, chunkLength); + if (maxCompressedLength == Integer.MAX_VALUE) + return 0; + return chunkLength * 1.0 / maxCompressedLength; } - @VisibleForTesting - public static CompressionParams lz4(int chunkLength, int maxCompressedLength) + private static CompressionParams fromClassAndOptions(String sstableCompressionClass, Map options) { - return new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), chunkLength, maxCompressedLength, calcMinCompressRatio(chunkLength, maxCompressedLength), Collections.emptyMap()); - } + final boolean enabled = removeEnabled(options); - public static CompressionParams zstd() - { - return zstd(DEFAULT_CHUNK_LENGTH); - } + if (options.containsKey(CHUNK_LENGTH_IN_KB) && options.containsKey(CHUNK_LENGTH)) + throw new ConfigurationException(format(TOO_MANY_CHUNK_LENGTH, + CHUNK_LENGTH_IN_KB, CHUNK_LENGTH)); - public static CompressionParams zstd(Integer chunkLength) - { - ZstdCompressor compressor = ZstdCompressor.create(Collections.emptyMap()); - return new CompressionParams(compressor, chunkLength, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap()); - } + final int chunk_length_in_bytes = removeChunkLength(options); - @VisibleForTesting - public static CompressionParams noop() - { - NoopCompressor compressor = NoopCompressor.create(Collections.emptyMap()); - return new CompressionParams(compressor, DEFAULT_CHUNK_LENGTH, Integer.MAX_VALUE, DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap()); + // figure out how we calculate the max_compressed_length and min_compress_ratio + if (options.containsKey(MIN_COMPRESS_RATIO) && options.containsKey(MAX_COMPRESSED_LENGTH)) + throw new ConfigurationException(format("Can not specify both '%s' and '%s' for the compressor parameters.", + MIN_COMPRESS_RATIO, MAX_COMPRESSED_LENGTH)); + + // calculate the max_compressed_length and min_compress_ratio + int max_compressed_length_in_bytes; + double min_compress_ratio; + String max_compressed_length_str = options.remove(MAX_COMPRESSED_LENGTH); + if (!StringUtils.isBlank(max_compressed_length_str)) + { + try + { + max_compressed_length_in_bytes = new DataStorageSpec.IntKibibytesBound(max_compressed_length_str).toBytes(); + validateMaxCompressedLength(max_compressed_length_in_bytes, chunk_length_in_bytes); + min_compress_ratio = CompressionParams.calcMinCompressRatio(chunk_length_in_bytes, max_compressed_length_in_bytes); + } catch (IllegalArgumentException e) { + throw new ConfigurationException(invalidValue(MAX_COMPRESSED_LENGTH, e.getMessage(), max_compressed_length_str)); + } + } + else + { + min_compress_ratio = removeMinCompressRatio(options); + validateMinCompressRatio(min_compress_ratio); + max_compressed_length_in_bytes = CompressionParams.calcMaxCompressedLength(chunk_length_in_bytes, min_compress_ratio); + } + + // try to set compressor type + CompressorType compressorType = StringUtils.isEmpty(sstableCompressionClass) ? DEFAULT_COMPRESSION_TYPE : null; + if (compressorType == null) + { + try + { + compressorType = CompressorType.valueOf(sstableCompressionClass); + } + catch (IllegalArgumentException expected) + { + compressorType = CompressorType.forClass(sstableCompressionClass); + } + } + + Function, ICompressor> creator = compressorType != null ? compressorType.creator : (opt) -> FBUtilities.newCompressor(parseCompressorClass(sstableCompressionClass), opt); + CompressionParams cp = new CompressionParams(enabled ? creator.apply(options) : null, chunk_length_in_bytes, max_compressed_length_in_bytes, min_compress_ratio, options); + if (enabled && compressorType != CompressorType.none) + { + ICompressor compressor = cp.sstableCompressor; + if (compressor == null) + throw new ConfigurationException(format("'%s' is not a valid compressor class name for the 'class' option.", sstableCompressionClass)); + else + checkCompressorOptions(compressor, options.keySet()); + } + + cp.validate(); + + return cp; } - public CompressionParams(String sstableCompressorClass, Map otherOptions, int chunkLength, double minCompressRatio) throws ConfigurationException + private static String invalidValue(String param, Object value) { - this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio, otherOptions); + return format("Invalid '%s' value for the 'compression' option: %s", param, value); } - static int calcMaxCompressedLength(int chunkLength, double minCompressRatio) + private static String invalidValue(String param, String extraText, Object value) { - return (int) Math.ceil(Math.min(chunkLength / minCompressRatio, Integer.MAX_VALUE)); + return format("Invalid '%s' value for the 'compression' option. %s: %s", param, extraText, value); } - public CompressionParams(String sstableCompressorClass, int chunkLength, int maxCompressedLength, Map otherOptions) throws ConfigurationException + public CompressionParams(String sstableCompressorClass, Map otherOptions, int chunkLength, double minCompressRatio) throws ConfigurationException { - this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, maxCompressedLength, calcMinCompressRatio(chunkLength, maxCompressedLength), otherOptions); + this(FBUtilities.newCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, calcMaxCompressedLength(chunkLength, minCompressRatio), minCompressRatio, otherOptions); } - static double calcMinCompressRatio(int chunkLength, int maxCompressedLength) + public CompressionParams(String sstableCompressorClass, int chunkLength, int maxCompressedLength, Map otherOptions) throws ConfigurationException { - if (maxCompressedLength == Integer.MAX_VALUE) - return 0; - return chunkLength * 1.0 / maxCompressedLength; + this(FBUtilities.newCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, maxCompressedLength, calcMinCompressRatio(chunkLength, maxCompressedLength), otherOptions); } private CompressionParams(ICompressor sstableCompressor, int chunkLength, int maxCompressedLength, double minCompressRatio, Map otherOptions) throws ConfigurationException @@ -241,6 +287,11 @@ public CompressionParams copy() return new CompressionParams(sstableCompressor, chunkLength, maxCompressedLength, minCompressRatio, otherOptions); } + public Class klass() + { + return sstableCompressor.getClass(); + } + /** * Checks if compression is enabled. * @return {@code true} if compression is enabled, {@code false} otherwise. @@ -269,6 +320,11 @@ public int chunkLength() return chunkLength; } + double minCompressRatio() + { + return minCompressRatio; + } + public int maxCompressedLength() { return maxCompressedLength; @@ -290,77 +346,25 @@ private static Class parseCompressorClass(String className) throws Configurat } } - private static ICompressor createCompressor(Class compressorClass, Map compressionOptions) throws ConfigurationException + public static void checkCompressorOptions(ICompressor compressor, Set options) { - if (compressorClass == null) - { - if (!compressionOptions.isEmpty()) - throw new ConfigurationException("Unknown compression options (" + compressionOptions.keySet() + ") since no compression class found"); - return null; - } - - if (compressionOptions.containsKey(CRC_CHECK_CHANCE)) - { - if (!hasLoggedCrcCheckChanceWarning) - { - logger.warn(CRC_CHECK_CHANCE_WARNING); - hasLoggedCrcCheckChanceWarning = true; - } - compressionOptions.remove(CRC_CHECK_CHANCE); - } - - try - { - Method method = compressorClass.getMethod("create", Map.class); - ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions); - // Check for unknown options - for (String provided : compressionOptions.keySet()) - if (!compressor.supportedOptions().contains(provided)) - throw new ConfigurationException("Unknown compression options " + provided); - return compressor; - } - catch (NoSuchMethodException e) - { - throw new ConfigurationException("create method not found", e); - } - catch (SecurityException e) - { - throw new ConfigurationException("Access forbiden", e); - } - catch (IllegalAccessException e) - { - throw new ConfigurationException("Cannot access method create in " + compressorClass.getName(), e); - } - catch (InvocationTargetException e) - { - if (e.getTargetException() instanceof ConfigurationException) - throw (ConfigurationException) e.getTargetException(); - - Throwable cause = e.getCause() == null - ? e - : e.getCause(); - - throw new ConfigurationException(format("%s.create() threw an error: %s %s", - compressorClass.getSimpleName(), - cause.getClass().getName(), - cause.getMessage()), - e); - } - catch (ExceptionInInitializerError e) - { - throw new ConfigurationException("Cannot initialize class " + compressorClass.getName()); - } + List notFound = new ArrayList<>(); + for (String provided : options) + if (!compressor.supportedOptions().contains(provided)) + notFound.add(provided); + if (!notFound.isEmpty()) + throw new ConfigurationException("Unknown compression options: (" + notFound + ")"); } public static ICompressor createCompressor(ParameterizedClass compression) throws ConfigurationException { - return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); + return FBUtilities.newCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); } private static Map copyOptions(Map co) { if (co == null || co.isEmpty()) - return Collections.emptyMap(); + return emptyMap(); Map compressionOptions = new HashMap<>(); for (Map.Entry entry : co.entrySet()) @@ -368,31 +372,6 @@ private static Map copyOptions(Map Integer.MAX_VALUE / 1024) - throw new ConfigurationException(format("Value of %s is too large (%s)", CHUNK_LENGTH_IN_KB,parsed)); - return 1024 * parsed; - } - catch (NumberFormatException e) - { - throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_IN_KB, e); - } - } - /** * Removes the chunk length option from the specified set of option. * @@ -401,29 +380,49 @@ private static Integer parseChunkLength(String chLengthKB) throws ConfigurationE */ private static int removeChunkLength(Map options) { - if (options.containsKey(CHUNK_LENGTH_IN_KB)) + Integer chunkLengthInBytes = null; + String key = null; + if (options.containsKey(CHUNK_LENGTH)) { - if (options.containsKey(CHUNK_LENGTH_KB)) + key = CHUNK_LENGTH; + String value = options.remove(CHUNK_LENGTH); + try { - throw new ConfigurationException(format("The '%s' option must not be used if the chunk length is already specified by the '%s' option", - CHUNK_LENGTH_KB, - CHUNK_LENGTH_IN_KB)); + chunkLengthInBytes = new DataStorageSpec.IntKibibytesBound(value).toBytes(); + } catch (IllegalArgumentException e) { + throw new ConfigurationException(invalidValue(CHUNK_LENGTH, e.getMessage(), value)); } - - return parseChunkLength(options.remove(CHUNK_LENGTH_IN_KB)); } - if (options.containsKey(CHUNK_LENGTH_KB)) + if (options.containsKey(CHUNK_LENGTH_IN_KB)) { - if (!hasLoggedChunkLengthWarning) + key = CHUNK_LENGTH_IN_KB; + if (chunkLengthInBytes != null) + throw new ConfigurationException(TOO_MANY_CHUNK_LENGTH); + else { - hasLoggedChunkLengthWarning = true; - logger.warn("The {} option has been deprecated. You should use {} instead", - CHUNK_LENGTH_KB, - CHUNK_LENGTH_IN_KB); + String chLengthKB = options.remove(CHUNK_LENGTH_IN_KB); + try + { + int parsed = Integer.parseInt(chLengthKB); + if (parsed * 1024L > Integer.MAX_VALUE) + throw new ConfigurationException(invalidValue(CHUNK_LENGTH_IN_KB, "Value is too large", parsed)); + if (parsed <= 0) + throw new ConfigurationException(invalidValue(CHUNK_LENGTH_IN_KB, "May not be <= 0", parsed)); + chunkLengthInBytes = 1024 * parsed; + } + catch (NumberFormatException e) + { + throw new ConfigurationException(invalidValue(CHUNK_LENGTH_IN_KB, e.getMessage(), chLengthKB)); + } } + } - return parseChunkLength(options.remove(CHUNK_LENGTH_KB)); + if (chunkLengthInBytes != null) + { + int chunkLength = chunkLengthInBytes; + validateChunkLength(key, chunkLength); + return chunkLength; } return DEFAULT_CHUNK_LENGTH; @@ -438,24 +437,7 @@ private static int removeChunkLength(Map options) private static double removeMinCompressRatio(Map options) { String ratio = options.remove(MIN_COMPRESS_RATIO); - if (ratio != null) - { - return Double.parseDouble(ratio); - } - return DEFAULT_MIN_COMPRESS_RATIO; - } - - /** - * Returns {@code true} if the specified options contains the name of the compression class to be used, - * {@code false} otherwise. - * - * @param options the options - * @return {@code true} if the specified options contains the name of the compression class to be used, - * {@code false} otherwise. - */ - public static boolean containsSstableCompressionClass(Map options) - { - return options.containsKey(CLASS) || options.containsKey(SSTABLE_COMPRESSION); + return ratio != null ? Double.parseDouble(ratio) : DEFAULT_MIN_COMPRESS_RATIO; } /** @@ -466,29 +448,14 @@ public static boolean containsSstableCompressionClass(Map option */ private static String removeSstableCompressionClass(Map options) { - if (options.containsKey(CLASS)) - { - if (options.containsKey(SSTABLE_COMPRESSION)) - throw new ConfigurationException(format("The '%s' option must not be used if the compression algorithm is already specified by the '%s' option", - SSTABLE_COMPRESSION, - CLASS)); - + if (options.containsKey(CLASS)) { String clazz = options.remove(CLASS); if (clazz.isEmpty()) throw new ConfigurationException(format("The '%s' option must not be empty. To disable compression use 'enabled' : false", CLASS)); return clazz; } - - if (options.containsKey(SSTABLE_COMPRESSION) && !hasLoggedSsTableCompressionWarning) - { - hasLoggedSsTableCompressionWarning = true; - logger.warn("The {} option has been deprecated. You should use {} instead", - SSTABLE_COMPRESSION, - CLASS); - } - - return options.remove(SSTABLE_COMPRESSION); + return null; } /** @@ -517,23 +484,35 @@ private static boolean removeEnabled(Map options) return enabled == null || Boolean.parseBoolean(enabled); } - // chunkLength must be a power of 2 because we assume so when - // computing the chunk number from an uncompressed file offset (see - // CompressedRandomAccessReader.decompresseChunk()) - public void validate() throws ConfigurationException - { - // if chunk length was not set (chunkLength == null), this is fine, default will be used + private static void validateChunkLength(String key, int chunkLength) throws ConfigurationException { if (chunkLength <= 0) - throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_IN_KB); + throw new ConfigurationException(invalidValue(key, "May not be <= 0", chunkLength)); + // chunkLength must be a power of 2 because we assume so when + // computing the chunk number from an uncompressed file offset (see + // CompressedRandomAccessReader.decompresseChunk()) if ((chunkLength & (chunkLength - 1)) != 0) - throw new ConfigurationException(CHUNK_LENGTH_IN_KB + " must be a power of 2"); + throw new ConfigurationException(invalidValue(key, "Must be a power of 2", chunkLength)); + } + + private static void validateMinCompressRatio(double ratio) throws ConfigurationException { + if (ratio != DEFAULT_MIN_COMPRESS_RATIO && ratio < 1.0) + throw new ConfigurationException(invalidValue(MIN_COMPRESS_RATIO , "Can either be 0 or greater than or equal to 1", ratio)); + + } + private static void validateMaxCompressedLength(int maxCompressedLength, int chunkLength) throws ConfigurationException { if (maxCompressedLength < 0) - throw new ConfigurationException("Invalid negative " + MIN_COMPRESS_RATIO); + throw new ConfigurationException(invalidValue(MAX_COMPRESSED_LENGTH, "May not be less than zero", maxCompressedLength)); if (maxCompressedLength > chunkLength && maxCompressedLength < Integer.MAX_VALUE) - throw new ConfigurationException(MIN_COMPRESS_RATIO + " can either be 0 or greater than or equal to 1"); + throw new ConfigurationException(invalidValue(MAX_COMPRESSED_LENGTH, "Must be less than or equal to chunk length")); + } + + public void validate() throws ConfigurationException + { + validateChunkLength(CHUNK_LENGTH, chunkLength); + validateMinCompressRatio(minCompressRatio); } public Map asMap() diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java index 8f883f8f4783..3431679bbf89 100644 --- a/src/java/org/apache/cassandra/schema/TableParams.java +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@ -350,7 +350,7 @@ public static final class Builder private SpeculativeRetryPolicy additionalWritePolicy = PercentileSpeculativeRetryPolicy.NINETY_NINE_P; private CachingParams caching = CachingParams.DEFAULT; private CompactionParams compaction = CompactionParams.DEFAULT; - private CompressionParams compression = CompressionParams.DEFAULT; + private CompressionParams compression = CompressionParams.defaultParams(); private MemtableParams memtable = MemtableParams.DEFAULT; private ImmutableMap extensions = ImmutableMap.of(); private boolean cdc; diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index a39b1bb22eba..31eedbace31c 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -25,6 +25,8 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.math.BigInteger; import java.net.InetAddress; import java.net.NetworkInterface; @@ -79,6 +81,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.StatsComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; @@ -88,11 +91,13 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.security.AbstractCryptoProvider; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.security.ISslContextFactory; import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import org.objectweb.asm.Opcodes; +import static java.lang.String.format; import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_AVAILABLE_PROCESSORS; import static org.apache.cassandra.config.CassandraRelevantProperties.GIT_SHA; import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR; @@ -723,6 +728,56 @@ public static AbstractCryptoProvider newCryptoProvider(String className, Map className, Map parameters) throws ConfigurationException + { + if (className == null) + { + if (!parameters.isEmpty()) + throw new ConfigurationException("Unknown compression options (" + parameters.keySet() + ") since no compression class found"); + return null; + } + + try + { + Method method = className.getMethod("create", Map.class); + ICompressor compressor = (ICompressor) method.invoke(null, parameters); + // Check for unknown options + CompressionParams.checkCompressorOptions(compressor, parameters.keySet()); + return compressor; + } + catch (NoSuchMethodException e) + { + throw new ConfigurationException("create method not found", e); + } + catch (SecurityException e) + { + throw new ConfigurationException("Access forbiden", e); + } + catch (IllegalAccessException e) + { + throw new ConfigurationException("Cannot access method create in " + className.getName(), e); + } + catch (InvocationTargetException e) + { + if (e.getTargetException() instanceof ConfigurationException) + throw (ConfigurationException) e.getTargetException(); + + Throwable cause = e.getCause() == null + ? e + : e.getCause(); + + throw new ConfigurationException(format("%s.create() threw an error: %s %s", + className.getSimpleName(), + cause.getClass().getName(), + cause.getMessage()), + e); + } + catch (ExceptionInInitializerError e) + { + throw new ConfigurationException("Cannot initialize class " + className.getName()); + } + } + /** * @return The Class for the given name. * @param classname Fully qualified classname. diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml index cde345a575b6..0ddbd9475995 100644 --- a/tools/cqlstress-example.yaml +++ b/tools/cqlstress-example.yaml @@ -44,7 +44,7 @@ table_definition: | PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid) ) WITH compaction = { 'class':'LeveledCompactionStrategy' } -# AND compression = { 'sstable_compression' : '' } +# AND compression = { 'class' : 'LZ4Compressor' } # AND comment='A table of many types to test wide rows' # diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java index 4c67c4f55023..9313b0f6d0ff 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java @@ -140,7 +140,7 @@ String createStandard1StatementCQL3(StressSettings settings) //Compression b.append(") WITH compression = {"); if (compression != null) - b.append("'sstable_compression' : '").append(compression).append("'"); + b.append("'class' : '").append(compression).append("'"); b.append("}"); @@ -181,7 +181,7 @@ String createCounter1StatementCQL3(StressSettings settings) //Compression b.append(") WITH compression = {"); if (compression != null) - b.append("'sstable_compression' : '").append(compression).append("'"); + b.append("'class' : '").append(compression).append("'"); b.append("}");