diff --git a/CHANGELOG.md b/CHANGELOG.md index 67d82b4d..b124f440 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Changelog for Management API, new PRs should update the `main / unreleased` sect ``` ## unreleased +* [FEATURE] [#573](https://github.com/k8ssandra/management-api-for-apache-cassandra/issues/573) Add support for HCD 1.2 ## v0.1.90 (2024-11-22) * [FEATURE] [#566](https://github.com/k8ssandra/management-api-for-apache-cassandra/issues/566) Add listRoles and dropRole functionality to the REST interface diff --git a/README.md b/README.md index ecb77fd6..1841f79b 100644 --- a/README.md +++ b/README.md @@ -54,34 +54,34 @@ The following versions of Cassandra and DSE are published to Docker and supported: -| Cassandra 4.0.x | Cassandra 4.1.x | Cassandra 5.0.x | DSE 6.8.x | DSE 6.9.x | HCD 1.0.x | -|-----------------| --------------- |-----------------|-----------|-----------| ----------| -| 4.0.0 | 4.1.0 | 5.0.1 | 6.8.25 | 6.9.0 | 1.0.0 | -| 4.0.1 | 4.1.1 | 5.0.2 | 6.8.26 | 6.9.1 | | -| 4.0.3 | 4.1.2 | | 6.8.28 | 6.9.2 | | -| 4.0.4 | 4.1.3 | | 6.8.29 | 6.9.3 | | -| 4.0.5 | 4.1.4 | | 6.8.30 | 6.9.4 | | -| 4.0.6 | 4.1.5 | | 6.8.31 | | | -| 4.0.7 | 4.1.6 | | 6.8.32 | | | -| 4.0.8 | 4.1.7 | | 6.8.33 | | | -| 4.0.9 | | | 6.8.34 | | | -| 4.0.10 | | | 6.8.35 | | | -| 4.0.11 | | | 6.8.36 | | | -| 4.0.12 | | | 6.8.37 | | | -| 4.0.13 | | | 6.8.38 | | | -| 4.0.14 | | | 6.8.39 | | | -| 4.0.15 | | | 6.8.40 | | | -| | | | 6.8.41 | | | -| | | | 6.8.42 | | | -| | | | 6.8.43 | | | -| | | | 6.8.44 | | | -| | | | 6.8.46 | | | -| | | | 6.8.47 | | | -| | | | 6.8.48 | | | -| | | | 6.8.49 | | | -| | | | 6.8.50 | | | -| | | | 6.8.51 | | | -| | | | 6.8.52 | | | +| Cassandra 4.0.x | Cassandra 4.1.x | Cassandra 5.0.x | DSE 6.8.x | DSE 6.9.x | HCD 1.0.x | HCD 1.2.x | +|---------------- | --------------- |---------------- |---------- |---------- | --------- | --------- | +| 4.0.0 | 4.1.0 | 5.0.1 | 6.8.25 | 6.9.0 | 1.0.0 | 1.2.0 | +| 4.0.1 | 4.1.1 | 5.0.2 | 6.8.26 | 6.9.1 | | | +| 4.0.3 | 4.1.2 | | 6.8.28 | 6.9.2 | | | +| 4.0.4 | 4.1.3 | | 6.8.29 | 6.9.3 | | | +| 4.0.5 | 4.1.4 | | 6.8.30 | 6.9.4 | | | +| 4.0.6 | 4.1.5 | | 6.8.31 | | | | +| 4.0.7 | 4.1.6 | | 6.8.32 | | | | +| 4.0.8 | 4.1.7 | | 6.8.33 | | | | +| 4.0.9 | | | 6.8.34 | | | | +| 4.0.10 | | | 6.8.35 | | | | +| 4.0.11 | | | 6.8.36 | | | | +| 4.0.12 | | | 6.8.37 | | | | +| 4.0.13 | | | 6.8.38 | | | | +| 4.0.14 | | | 6.8.39 | | | | +| 4.0.15 | | | 6.8.40 | | | | +| | | | 6.8.41 | | | | +| | | | 6.8.42 | | | | +| | | | 6.8.43 | | | | +| | | | 6.8.44 | | | | +| | | | 6.8.46 | | | | +| | | | 6.8.47 | | | | +| | | | 6.8.48 | | | | +| | | | 6.8.49 | | | | +| | | | 6.8.50 | | | | +| | | | 6.8.51 | | | | +| | | | 6.8.52 | | | | - Apache Cassandra images are available in `linux/amd64` or `linux/arm64` formats. The DSE images are available only in the `linux/amd64` format. - All images (with the exception of Cassandra 5.0) are available as an Ubuntu based image or a RedHat UBI 8 based image. @@ -194,11 +194,11 @@ Example for DSE 6.9.0 ** NOTE: The docker repo is not a typo, it really is `datastax/dse-mgmtapi-6_8` for 6.9 images -### Docker coordinates for HCD 1.0.x images +### Docker coordinates for HCD 1.0.x/1.2.x images -#### Ubuntu based images (HCD 1.0) +#### Ubuntu based images (HCD 1.0/1.2) -For all JDK 11 Ubuntu based HCD 1.0.x images, the Docker coordinates are as follows: +For all JDK 11 Ubuntu based HCD 1.0.x/1.2.x images, the Docker coordinates are as follows: datastax/hcd: @@ -206,9 +206,13 @@ Example for HCD 1.0.0 datastax/hcd:1.0.0 -#### RedHat UBI images (HCD 1.0) +Example for HCD 1.2.0 -For all RedHat UBI based HCD 1.0.x images, the Docker coordinates are as follows: + datastax/hcd:1.2.0 + +#### RedHat UBI images (HCD 1.0/1.2) + +For all RedHat UBI based HCD 1.0.x/1.2.x images, the Docker coordinates are as follows: datastax/hcd:-ubi @@ -216,6 +220,10 @@ Example for HCD 1.0.0 datastax/hcd:1.0.0-ubi +Example for HCD 1.2.0 + + datastax/hcd:1.2.0-ubi + ## Building ### Minimum Java Version diff --git a/management-api-agent-5.0.x/src/main/java/org/apache/cassandra/transport/UnixSocketServer50x.java b/management-api-agent-5.0.x/src/main/java/org/apache/cassandra/transport/UnixSocketServer50x.java index c372d697..4ab36251 100644 --- a/management-api-agent-5.0.x/src/main/java/org/apache/cassandra/transport/UnixSocketServer50x.java +++ b/management-api-agent-5.0.x/src/main/java/org/apache/cassandra/transport/UnixSocketServer50x.java @@ -33,7 +33,6 @@ import org.apache.cassandra.transport.messages.StartupMessage; import org.apache.cassandra.transport.messages.SupportedMessage; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +282,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List ou promise = new VoidChannelPromise(ctx.channel(), false); - long approxStartTimeNanos = MonotonicClock.Global.approxTime.now(); Message.Response response = Dispatcher.processRequest( ctx.channel(), diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java index d5e76a47..05f1f089 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java @@ -50,7 +50,7 @@ public class CassandraMetricRegistryListener implements MetricRegistryListener { private static final int MIN_DSE_PATCH_VERSION = 33; private static final Pattern VERSION_PATTERN = - Pattern.compile("([1-9]\\d*)\\.(\\d+)\\.(\\d+)(?:-([a-zA-Z0-9]+))?"); + Pattern.compile("([1-9]\\d*)\\.(\\d+)\\.(\\d+)(?:-([a-zA-Z0-9\\.]+))?"); private static final String SERVER_VERSION = FBUtilities.getReleaseVersionString(); private static final int SERVER_MAJOR_VERSION; diff --git a/management-api-agent-hcd-1.2.x/pom.xml b/management-api-agent-hcd-1.2.x/pom.xml new file mode 100644 index 00000000..73e70797 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/pom.xml @@ -0,0 +1,176 @@ + + + + 4.0.0 + + io.k8ssandra + datastax-mgmtapi + ${revision} + + ${revision} + datastax-mgmtapi-agent-hcd-1.2.x + + + artifactory + https://repo.datastax.com/dse + + + + + io.k8ssandra + datastax-mgmtapi-common + ${project.version} + + + io.k8ssandra + datastax-mgmtapi-agent-common + ${project.version} + + + net.bytebuddy + byte-buddy + ${bytebuddy.version} + + + net.bytebuddy + byte-buddy-agent + ${bytebuddy.version} + + + junit + junit + ${junit.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + + io.k8ssandra + datastax-mgmtapi-agent-common + ${project.version} + tests + test + + + + + dse-db-all + + true + + + + com.datastax.dse + dse-db-all + 5.0.0-beta2.82a4ce1a2756 + + + commons-codec + * + + + provided + + + + + + + + ${basedir}/src/main/resources + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + initialize + parse-version + + parse-version + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.4.0 + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${basedir} + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-resources-plugin + + + UTF-8 + + + + org.apache.maven.plugins + maven-shade-plugin + + true + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + + com.datastax.mgmtapi.Agent + + + + + + + + + + diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/CassandraAPIServiceProviderHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/CassandraAPIServiceProviderHcd.java new file mode 100644 index 00000000..e359016e --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/CassandraAPIServiceProviderHcd.java @@ -0,0 +1,17 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi; + +import com.datastax.mgmtapi.shim.CassandraAPIHcd; +import com.datastax.mgmtapi.shims.CassandraAPI; + +public class CassandraAPIServiceProviderHcd implements CassandraAPIServiceProvider { + + @Override + public CassandraAPI getCassandraAPI() { + return new CassandraAPIHcd(); + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/GenericSerializerHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/GenericSerializerHcd.java new file mode 100644 index 00000000..5519b032 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/GenericSerializerHcd.java @@ -0,0 +1,143 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.rpc; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BooleanType; +import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.DateType; +import org.apache.cassandra.db.marshal.DoubleType; +import org.apache.cassandra.db.marshal.EmptyType; +import org.apache.cassandra.db.marshal.FloatType; +import org.apache.cassandra.db.marshal.InetAddressType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.serializers.TypeSerializer; + +/** + * Uses reflection to look up an appropriate TypeSerializer/AbstractType to serialize objects + * without writing annoying ByteBufferUtil.bytes(X/Y/Z) boilerplate. + */ +class GenericSerializerHcd { + /** + * I considered using the drivers code (CodecRegistry, TypeCodec, etc) but decided that it made + * more sense to use the server side stuff from Cassandra. + * + *

Extending this (at least for the purpose of RPC calls) is relatively straightforward: write + * a class that extends C*'s TypeSerializer and add to the map. For actually getting data into + * C*'s UDTs it might be a bit trickier. Unfortunately, there is not always a direct 1:1 mapping + * between Java and Cassandra types. A simple example is millisecond timestamps, which could be + * serialized as 'long' and 'timestamp'. The driver code actually has some bounds for this, but I + * think for us it will be simpler to just write more serializers and add them to the map. + */ + private static final ConcurrentHashMap typeMap = + new ConcurrentHashMap() { + { + put("void", EmptyType.instance); + put("boolean", BooleanType.instance); + put("java.lang.Boolean", BooleanType.instance); + put("byte", ByteType.instance); + put("java.lang.Byte", ByteType.instance); + put("int", Int32Type.instance); + put("java.lang.Integer", Int32Type.instance); + put("long", LongType.instance); + put("java.lang.Long", LongType.instance); + put("float", FloatType.instance); + put("java.lang.Float", FloatType.instance); + put("double", DoubleType.instance); + put("java.lang.Double", DoubleType.instance); + put("java.lang.String", UTF8Type.instance); + put("java.net.InetAddress", InetAddressType.instance); + put("java.util.Date", DateType.instance); + put("java.nio.ByteBuffer", BytesType.instance); + put("java.util.UUID", UUIDType.instance); + } + }; + + static void registerType(String className, AbstractType type) { + if (typeMap.putIfAbsent(className, type) != null) { + throw new IllegalStateException("The type " + className + " is already registered."); + } + } + + static TypeSerializer getSerializer(Type type) { + return getTypeOrException(type).getSerializer(); + } + + static AbstractType getTypeOrException(Type type) { + AbstractType ctype = getType(type); + + if (ctype == null) { + throw new AssertionError( + String.format("Add type '%s' to GenericSerializer", type.getTypeName())); + } + + return ctype; + } + + static boolean simpleType(Type type) { + return getType(type) != null; + } + + /** + * Most of the actual work is done here. Note that Generic type information is mostly destroyed at + * runtime (a list is just a list). For the Parameterized types to work correctly you have to call + * Method.getGenericParameterTypes() or something similar. Also, we currently punt on the frozen + * keyword. + * + * @return The C* abstract type corresponding to the Java type, or null if not found/impossible. + */ + static AbstractType getType(Type type) { + assert type != null; + String strType = type.getTypeName(); + + /** + * Rather than hard coding List List List etc we create them as needed. + * Also there is no need for a lock as the actual serializers do that for us. + */ + if (!typeMap.containsKey(strType)) { + if (type instanceof ParameterizedType) { + ParameterizedType ptype = (ParameterizedType) type; + + if (ptype.getRawType().getTypeName().equals("java.util.List")) { + assert ptype.getActualTypeArguments().length == 1; + typeMap.putIfAbsent( + strType, ListType.getInstance(getType(ptype.getActualTypeArguments()[0]), false)); + } else if (ptype.getRawType().getTypeName().equals("java.util.Set")) { + assert ptype.getActualTypeArguments().length == 1; + typeMap.putIfAbsent( + strType, SetType.getInstance(getType(ptype.getActualTypeArguments()[0]), false)); + } else if (ptype.getRawType().getTypeName().equals("java.util.Map")) { + assert ptype.getActualTypeArguments().length == 2; + typeMap.putIfAbsent( + strType, + MapType.getInstance( + getType(ptype.getActualTypeArguments()[0]), + getType(ptype.getActualTypeArguments()[1]), + false)); + } else { + throw new AssertionError( + "Don't know how to serialize generic type '" + + ptype.getRawType().getTypeName() + + "'"); + } + } else { + return null; + } + } + + return typeMap.get(strType); + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/ObjectSerializerHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/ObjectSerializerHcd.java new file mode 100644 index 00000000..ddf70cde --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/ObjectSerializerHcd.java @@ -0,0 +1,138 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.rpc; + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import java.lang.reflect.Field; +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.ResultSet; +import org.apache.cassandra.cql3.ResultSet.ResultMetadata; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.TupleType; + +public class ObjectSerializerHcd implements ObjectSerializer { + public final ImmutableSortedMap serializers; + + public class FieldSerializer { + public final AbstractType type; + public final Function accessor; + + FieldSerializer(AbstractType type, Function accessor) { + this.type = type; + this.accessor = accessor; + } + + FieldSerializer(AbstractType type, final Field field) { + field.setAccessible(true); + this.type = type; + this.accessor = + (obj) -> { + try { + return field.get(obj); + } catch (IllegalAccessException e) { + throw new AssertionError("Should not happen as we set the field to accessible."); + } + }; + } + + ByteBuffer serializeField(T obj) { + Object value = accessor.apply(obj); + if (value == null) { + return null; + } + return type.getSerializer().serialize(accessor.apply(obj)); + } + } + + /** + * Due to the magic of java generics, the class doesn't have the full generic information, hence + * the double types. Also, this will only serialize **PUBLIC** fields (perhaps this should be + * changed; it's not totally clear). Tag accordingly. + */ + public ObjectSerializerHcd(Class clazz, Type genericType) { + serializers = + GenericSerializerHcd.simpleType(genericType) + ? ImmutableSortedMap.of( + "result", new FieldSerializer(GenericSerializerHcd.getType(genericType), x -> x)) + : ImmutableSortedMap.copyOf( + Arrays.stream(clazz.getFields()) + .collect( + Collectors.toMap( + field -> field.getName(), + field -> + new FieldSerializer( + GenericSerializerHcd.getType(field.getGenericType()), field)))); + // currently not recursive; multiple ways to do it + } + + public ObjectSerializerHcd(Class clazz) { + this(clazz, clazz); + } + + /** + * Serialize an object into a C* ResultSet, with each field as a named value. + * + * @param obj The object to serialize + * @param ksName Pretend we are coming from this keyspace + * @param cfName Pretend we are coming from this columnfamily + */ + public ResultSet toResultSet(T obj, String ksName, String cfName) { + return new ResultSet( + new ResultMetadata( + serializers.entrySet().stream() + .map( + e -> + new ColumnSpecification( + ksName, + cfName, + new ColumnIdentifier(e.getKey(), true), + e.getValue().type)) + .collect(Collectors.toList())), + Lists.>newArrayList(toByteBufferList(obj))); + } + + /** + * Serialize an object into a C* multi-row ResultSet, with each field as a named value. + * + * @param obj The object to serialize + * @param ksName Pretend we are coming from this keyspace + * @param cfName Pretend we are coming from this columnfamily + */ + public ResultSet toMultiRowResultSet(Collection obj, String ksName, String cfName) { + return new ResultSet( + new ResultMetadata( + serializers.entrySet().stream() + .map( + e -> + new ColumnSpecification( + ksName, + cfName, + new ColumnIdentifier(e.getKey(), true), + e.getValue().type)) + .collect(Collectors.toList())), + obj.stream().map(this::toByteBufferList).collect(Collectors.toList())); + } + + public List toByteBufferList(T obj) { + return serializers.values().stream() + .map(fs -> fs.serializeField(obj)) + .collect(Collectors.toList()); + } + + public ByteBuffer toByteBuffer(T obj) { + return TupleType.buildValue( + serializers.values().stream().map(fs -> fs.serializeField(obj)).toArray(ByteBuffer[]::new)); + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/RpcMethodHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/RpcMethodHcd.java new file mode 100644 index 00000000..f2b54ee1 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/RpcMethodHcd.java @@ -0,0 +1,179 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.rpc; + +import com.google.common.base.Preconditions; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.OptionalInt; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.ResultSet; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RpcMethodHcd implements RpcMethod { + private static final Logger logger = LoggerFactory.getLogger(RpcMethodHcd.class); + private final Method method; + private final RpcObject rpcObject; + private final String name; + private final List argSerializers; + private final List argTypes; + private final List argNames; + private final ObjectSerializerHcd retSerializer; + private final OptionalInt clientStateArgIdx; + private final List> params; + + RpcMethodHcd(Method method, RpcObject rpcObject) { + this.method = method; + this.rpcObject = rpcObject; + this.name = method.getAnnotation(Rpc.class).name(); + + Annotation[][] allAnnotations = method.getParameterAnnotations(); + params = + IntStream.range(0, method.getParameterCount()) + .boxed() + .flatMap( + argIdx -> + Arrays.stream(allAnnotations[argIdx]) + .filter(a -> a instanceof RpcParam) + .findFirst() + .map(RpcParam.class::cast) + .map(rpcParam -> Stream.of(Pair.of(argIdx, rpcParam))) + .orElseGet(Stream::empty)) + .collect(Collectors.toList()); + + Class[] paramTypes = method.getParameterTypes(); + clientStateArgIdx = + IntStream.range(0, method.getParameterCount()) + .filter(argIdx -> paramTypes[argIdx] == RpcClientState.class) + .findFirst(); + + int expectedParamsCount = params.size() + (clientStateArgIdx.isPresent() ? 1 : 0); + if (method.getParameterCount() != expectedParamsCount) { + throw new AssertionError( + String.format( + "All arguments for %s.%s must be annotated with either RpcParam or RpcClientState", + rpcObject.getName(), name)); + } + + Type[] genericParamTypes = method.getGenericParameterTypes(); + this.argSerializers = + params.stream() + .map(p -> GenericSerializerHcd.getSerializer(genericParamTypes[p.getKey()])) + .collect(Collectors.toList()); + + this.argTypes = + params.stream() + .map(p -> GenericSerializerHcd.getTypeOrException(genericParamTypes[p.getKey()])) + .collect(Collectors.toList()); + + this.argNames = params.stream().map(p -> p.getValue().name()).collect(Collectors.toList()); + + if (method.getAnnotation(Rpc.class).multiRow()) { + Preconditions.checkArgument( + Collection.class.isAssignableFrom(method.getReturnType()), + "If mutli-row result set is requested, the method return type must be an implementation of java.util.Collection"); + Type elemType = + ((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0]; + Preconditions.checkArgument( + elemType instanceof Class, + "If multi-row result set is request, the element type must be a Class"); + this.retSerializer = new ObjectSerializerHcd<>((Class) elemType); + } else { + this.retSerializer = + new ObjectSerializerHcd<>(method.getReturnType(), method.getGenericReturnType()); + } + } + + public String getName() { + return name; + } + + public int getArgumentCount() { + return argTypes.size(); + } + + public ColumnSpecification getArgumentSpecification(int position) { + return new ColumnSpecification( + "system", + rpcObject.getName() + "." + name, + new ColumnIdentifier(argNames.get(position), false), + argTypes.get(position)); + } + + public ResultMessage execute(ClientState clientState, List parameters) + throws RequestExecutionException { + try { + RpcClientState rpcClientState = RpcClientState.fromClientState(clientState); + LazyRef rpcArgs = LazyRef.of(() -> getMethodArgs(rpcClientState, parameters)); + + // endpoint is not explicitly provided or points to this node -> execute locally + return toResultMessage(method.invoke(rpcObject.raw, rpcArgs.get())); + } catch (Exception e) { + throw createRpcExecutionException(e); + } + } + + private RpcExecutionException createRpcExecutionException(Throwable e) { + String msg = String.format("Failed to execute method %s.%s", rpcObject.getName(), name); + logger.info(msg, e); + return RpcExecutionException.create(msg, e); + } + + private Object[] getMethodArgs(RpcClientState rpcClientState, Collection parameters) { + Object[] args = new Object[method.getParameterCount()]; + clientStateArgIdx.ifPresent(idx -> args[idx] = rpcClientState); + Object[] rpcParams = deserializeParameters(parameters); + for (int i = 0; i < rpcParams.length; i++) { + args[params.get(i).getKey()] = rpcParams[i]; + } + return args; + } + + public ResultSet toResultSet(Object object) { + if (method.getAnnotation(Rpc.class).multiRow()) { + return retSerializer.toMultiRowResultSet((Collection) object, rpcObject.getName(), name); + } else { + return retSerializer.toResultSet(object, rpcObject.getName(), name); + } + } + + public ResultMessage toResultMessage(Object object) { + if (object == null) { + return new ResultMessage.Void(); + } else { + return new ResultMessage.Rows(toResultSet(object)); + } + } + + private Object[] deserializeParameters(Collection args) { + Object[] deserialized = new Object[args.size()]; + + int i = 0; + for (ByteBuffer arg : args) { + deserialized[i] = arg != null ? argSerializers.get(i).deserialize(arg) : null; + i++; + } + + return deserialized; + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/RpcMethodServiceProviderHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/RpcMethodServiceProviderHcd.java new file mode 100644 index 00000000..b227b461 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/rpc/RpcMethodServiceProviderHcd.java @@ -0,0 +1,16 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.rpc; + +import java.lang.reflect.Method; + +public class RpcMethodServiceProviderHcd implements RpcMethodServiceProvider { + + @Override + public RpcMethod getRpcMethod(Method method, RpcObject rpcObject) { + return new RpcMethodHcd(method, rpcObject); + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/shim/CassandraAPIHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/shim/CassandraAPIHcd.java new file mode 100644 index 00000000..c085d08d --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/shim/CassandraAPIHcd.java @@ -0,0 +1,352 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.shim; + +import com.datastax.mgmtapi.shims.CassandraAPI; +import com.datastax.mgmtapi.shims.RpcStatementShim; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.cassandra.auth.IRoleManager; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.fql.FullQueryLoggerOptions; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.TokenSerializer; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.K8SeedProviderHcd; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.locator.SeedProvider; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.management.StreamStateCompositeData; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.UnixSocketServerHcd; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CassandraAPIHcd implements CassandraAPI { + private static final Logger logger = LoggerFactory.getLogger(CassandraAPIHcd.class); + + private static final Supplier seedProvider = + Suppliers.memoize(() -> new K8SeedProviderHcd()); + + @Override + public void enableFullQuerylog() { + logger.debug("Getting FQL options and calling enableFullQueryLogger."); + FullQueryLoggerOptions fqlOpts = DatabaseDescriptor.getFullQueryLogOptions(); + StorageService.instance.enableFullQueryLogger( + fqlOpts.log_dir, + fqlOpts.roll_cycle, + fqlOpts.block, + fqlOpts.max_queue_weight, + fqlOpts.max_log_size, + fqlOpts.archive_command, + fqlOpts.max_archive_retries); + } + + @Override + public void disableFullQuerylog() { + logger.debug("Stopping FullQueryLogger."); + StorageService.instance.stopFullQueryLogger(); + } + + @Override + public boolean isFullQueryLogEnabled() { + boolean isEnabled = StorageService.instance.isFullQueryLogEnabled(); + logger.debug("Querying whether full query logging is enabled. Result is {}", isEnabled); + return isEnabled; + } + + @Override + public void decommission(boolean force) throws InterruptedException { + StorageService.instance.decommission(force); + } + + @Override + public Map, List> checkConsistencyLevel( + String consistencyLevelName, Integer rfPerDc) { + try { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + IEndpointSnitch endpointSnitch = DatabaseDescriptor.getEndpointSnitch(); + TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap(); + + ConsistencyLevel cl = ConsistencyLevel.valueOf(consistencyLevelName); + + Map dcNames = new HashMap<>(); + + for (InetAddressAndPort endpoint : + tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().values()) { + String dc = endpointSnitch.getDatacenter(endpoint); + assert dc != null; + + dcNames.put(dc, String.valueOf(rfPerDc)); + } + + Keyspace mockKs = + Keyspace.mockKS( + KeyspaceMetadata.create( + "none", + KeyspaceParams.create( + true, + ImmutableMap.builder() + .put("class", "NetworkTopologyStrategy") + .putAll(dcNames) + .build()))); + + AbstractReplicationStrategy mockStrategy = mockKs.getReplicationStrategy(); + mockStrategy.validateOptions(); + + Collection> tokenRanges = + tokenMetadata.getPrimaryRangesFor(tokenMetadata.sortedTokens()); + + Map, List> results = new HashMap<>(); + + // For each range check the endpoints can achieve cl using the midpoint + for (Range range : tokenRanges) { + Token midpoint = partitioner.midpoint(range.left, range.right); + EndpointsForRange endpoints = + mockStrategy.calculateNaturalReplicas(midpoint, tokenMetadata); + + if (!ReplicaPlans.isSufficientLiveReplicasForRead( + mockKs.getReplicationStrategy(), cl, endpoints)) { + List downEndpoints = new ArrayList<>(); + for (InetAddressAndPort endpoint : endpoints.endpoints()) { + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + + if (!epState.isAlive()) downEndpoints.add(endpoint.toString()); + } + + int blockFor = cl.blockFor(mockKs.getReplicationStrategy()); + + if (downEndpoints.isEmpty() && endpoints.size() < blockFor) + downEndpoints.add( + String.format( + "%d replicas required, but only %d nodes in the ring", + blockFor, endpoints.size())); + else if (downEndpoints.isEmpty()) downEndpoints.add("Nodes Flapping"); + + results.put( + ImmutableList.of( + (long) range.left.getTokenValue(), (long) range.right.getTokenValue()), + downEndpoints); + } + } + return results; + } catch (Throwable e) { + logger.error("Exception encountered", e); + throw e; + } + } + + @Override + public SeedProvider getK8SeedProvider() { + return seedProvider.get(); + } + + public Set reloadSeeds() { + Field seedField = FBUtilities.getProtectedField(Gossiper.class, "seeds"); + + Set seeds = null; + try { + seeds = (Set) seedField.get(Gossiper.instance); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + // Get the new set in the same that buildSeedsList does + Set tmp = new HashSet<>(); + try { + for (InetAddressAndPort seed : getK8SeedProvider().getSeeds()) { + if (seed.equals(FBUtilities.getBroadcastAddressAndPort())) continue; + tmp.add(seed); + } + } + /** + * If using the SimpleSeedProvider invalid yaml added to the config since startup could cause + * this to throw. Additionally, third party seed providers may throw exceptions. Handle the + * error and return a null to indicate that there was a problem. + */ + catch (Throwable e) { + JVMStabilityInspector.inspectThrowable(e); + return null; + } + + if (tmp.size() == 0) { + return seeds.stream().map(s -> s.getAddress()).collect(Collectors.toSet()); + } + + if (tmp.equals(seeds)) { + return seeds.stream().map(s -> s.getAddress()).collect(Collectors.toSet()); + } + + // Add the new entries + seeds.addAll(tmp); + // Remove the old entries + seeds.retainAll(tmp); + logger.debug("New seed node list after reload {}", seeds); + + return seeds.stream().map(s -> s.getAddress()).collect(Collectors.toSet()); + } + + @Override + public ChannelInitializer makeSocketInitializer( + Server.ConnectionTracker connectionTracker) { + return UnixSocketServerHcd.makeSocketInitializer(connectionTracker); + } + + @Override + public List> getEndpointStates() { + List> result = new ArrayList<>(); + + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + + for (InetAddressAndPort endpoint : Gossiper.instance.getEndpoints()) { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + Map states = new HashMap<>(); + for (Map.Entry s : state.states()) { + String value = + (s.getKey() == ApplicationState.TOKENS) + ? formatTokens(partitioner, s) + : s.getValue().value; + states.put(s.getKey().name(), value); + } + + states.put("ENDPOINT_IP", endpoint.getAddress().getHostAddress()); + states.put("IS_ALIVE", Boolean.toString(state.isAlive())); + states.put("PARTITIONER", partitioner.getClass().getName()); + states.put("CLUSTER_NAME", getStorageService().getClusterName()); + states.put( + "IS_LOCAL", Boolean.toString(endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))); + result.add(states); + } + + return result; + } + + private String formatTokens( + IPartitioner partitioner, Map.Entry s) { + try { + byte[] bytes = s.getValue().value.getBytes(StandardCharsets.ISO_8859_1); + Collection tokens = + TokenSerializer.deserialize( + partitioner, new DataInputStream(new ByteArrayInputStream(bytes))); + return tokens.stream().map(Token::toString).collect(Collectors.joining(",")); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public List>>> getStreamInfo() { + Set streams = + StreamManager.instance.getCurrentStreams().stream() + .map(StreamStateCompositeData::fromCompositeData) + .collect(Collectors.toSet()); + + List>>> result = new ArrayList<>(); + + for (StreamState status : streams) { + Map>> streamInfo = new HashMap<>(); + List> sessionResults = new ArrayList<>(); + + for (SessionInfo info : status.sessions) { + Map sessionInfo = new HashMap<>(); + sessionInfo.put("STREAM_OPERATION", status.streamOperation.getDescription()); + sessionInfo.put("PEER", info.peer.toString()); + sessionInfo.put("USING_CONNECTION", info.connecting.toString()); + sessionInfo.put("TOTAL_FILES_TO_RECEIVE", String.valueOf(info.getTotalFilesToReceive())); + sessionInfo.put("TOTAL_FILES_RECEIVED", String.valueOf(info.getTotalFilesReceived())); + sessionInfo.put("TOTAL_SIZE_TO_RECEIVE", String.valueOf(info.getTotalSizeToReceive())); + sessionInfo.put("TOTAL_SIZE_RECEIVED", String.valueOf(info.getTotalSizeReceived())); + + sessionInfo.put("TOTAL_FILES_TO_SEND", String.valueOf(info.getTotalFilesToSend())); + sessionInfo.put("TOTAL_FILES_SENT", String.valueOf(info.getTotalFilesSent())); + sessionInfo.put("TOTAL_SIZE_TO_SEND", String.valueOf(info.getTotalSizeToSend())); + sessionInfo.put("TOTAL_SIZE_SENT", String.valueOf(info.getTotalSizeSent())); + sessionResults.add(sessionInfo); + } + + streamInfo.put(status.planId.toString(), sessionResults); + + result.add(streamInfo); + } + + return result; + } + + @Override + public StorageService getStorageService() { + return StorageService.instance; + } + + @Override + public IRoleManager getRoleManager() { + return DatabaseDescriptor.getRoleManager(); + } + + @Override + public CompactionManager getCompactionManager() { + return CompactionManager.instance; + } + + @Override + public Gossiper getGossiper() { + return Gossiper.instance; + } + + @Override + public String getLocalDataCenter() { + return DatabaseDescriptor.getLocalDataCenter(); + } + + @Override + public RpcStatementShim makeRpcStatement(String method, String[] params) { + return new RpcStatement(method, params); + } + + @Override + public HintsService getHintsService() { + return HintsService.instance; + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/shim/RpcStatement.java b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/shim/RpcStatement.java new file mode 100644 index 00000000..33229de8 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/com/datastax/mgmtapi/shim/RpcStatement.java @@ -0,0 +1,60 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.shim; + +import com.datastax.mgmtapi.shims.RpcStatementShim; +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.messages.ResultMessage; + +public class RpcStatement implements RpcStatementShim { + private final String method; + private final String[] params; + + public RpcStatement(String method, String[] params) { + this.method = method; + this.params = params; + } + + @Override + public void authorize(ClientState clientState) {} + + @Override + public ResultMessage execute(QueryState queryState, QueryOptions queryOptions, long l) { + return new ResultMessage.Void(); + } + + @Override + public ResultMessage executeLocally(QueryState queryState, QueryOptions queryOptions) { + return new ResultMessage.Void(); + } + + @Override + public AuditLogContext getAuditLogContext() { + return null; + } + + @Override + public String getMethod() { + return method; + } + + @Override + public String[] getParams() { + return params; + } + + @Override + public void validate(ClientState cs) {} + + @Override + public String getRawCQLStatement() { + throw new UnsupportedOperationException("Not supported yet."); // Generated from + // nbfs://nbhost/SystemFileSystem/Templates/Classes/Code/GeneratedMethodBody + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/locator/K8SeedProviderHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/locator/K8SeedProviderHcd.java new file mode 100644 index 00000000..14b6bb9d --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/locator/K8SeedProviderHcd.java @@ -0,0 +1,52 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class K8SeedProviderHcd implements SeedProvider { + private static final Logger logger = LoggerFactory.getLogger(K8SeedProviderHcd.class); + + public K8SeedProviderHcd() {} + + public List getSeeds() { + Config conf; + try { + conf = DatabaseDescriptor.loadConfig(); + } catch (Exception e) { + throw new AssertionError(e); + } + String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1); + List seeds = new ArrayList<>(hosts.length); + for (String host : hosts) { + try { + /** + * A name may resolve to multiple seed node IPs, as would be the case in Kubernetes when a + * headless service is used to represent the seed nodes in a cluster, which is why we use + * `getAllByName` here instead of `getByName`. + */ + seeds.addAll( + Arrays.asList(InetAddress.getAllByName(host.trim())).stream() + .map(n -> InetAddressAndPort.getByAddress(n)) + .collect(Collectors.toList())); + } catch (UnknownHostException ex) { + // not fatal... DD will bark if there end up being zero seeds. + logger.warn("Seed provider couldn't lookup host {}", host); + } + } + return Collections.unmodifiableList(seeds); + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/transport/NettyChannelWrapper.java b/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/transport/NettyChannelWrapper.java new file mode 100644 index 00000000..4eda90cc --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/transport/NettyChannelWrapper.java @@ -0,0 +1,268 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.cassandra.transport; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** + * A simple wrapper class around Netty's Channel class. The only job of this class is to override + * the remoteAddress() method to return null as it causes a ClasCastException in Cassandra for 5.0 + * and newer. + */ +public class NettyChannelWrapper implements Channel { + + private final Channel delegate; + + NettyChannelWrapper(Channel delegate) { + this.delegate = delegate; + } + + @Override + public ChannelId id() { + return delegate.id(); + } + + @Override + public EventLoop eventLoop() { + return delegate.eventLoop(); + } + + @Override + public Channel parent() { + return delegate.parent(); + } + + @Override + public ChannelConfig config() { + return delegate.config(); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public boolean isRegistered() { + return delegate.isRegistered(); + } + + @Override + public boolean isActive() { + return delegate.isActive(); + } + + @Override + public ChannelMetadata metadata() { + return delegate.metadata(); + } + + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + /** + * For Cassandra 5.0, and/or possibly Netty 4.1.96.Final, the remote address for the Channel + * seems to be initialized to a Netty io.netty.channel.unix.DomainSocketAddress instance. For + * Cassandra 4.1 and lower, it seems to be null. Strangely enough, an initialized instance of + * io.netty.channel.unix.DomainSocketAddress causes a ClassCastException in Cassandra, since it + * does not extend java.io.InetSocketAddress. Apparently, null can be cast as an instance of + * InetSocketAddress and this works fine for how the Management API Agent works. So, for + * Cassandra 5.0+, we just return null here. + */ + SocketAddress realAddress = delegate.remoteAddress(); + if (realAddress != null) { + if (InetSocketAddress.class.isAssignableFrom(realAddress.getClass())) { + // remoteAddress can be cast as InetSocketAddress so we can return the delegate's address + return delegate.remoteAddress(); + } + } + // either the remoteAddress was null, or it cannot be cast as an InetSocketAddress. + return null; + } + + @Override + public ChannelFuture closeFuture() { + return delegate.closeFuture(); + } + + @Override + public boolean isWritable() { + return delegate.isWritable(); + } + + @Override + public long bytesBeforeUnwritable() { + return delegate.bytesBeforeUnwritable(); + } + + @Override + public long bytesBeforeWritable() { + return delegate.bytesBeforeWritable(); + } + + @Override + public Unsafe unsafe() { + return delegate.unsafe(); + } + + @Override + public ChannelPipeline pipeline() { + return delegate.pipeline(); + } + + @Override + public ByteBufAllocator alloc() { + return delegate.alloc(); + } + + @Override + public Channel read() { + return delegate.read(); + } + + @Override + public Channel flush() { + return delegate.flush(); + } + + @Override + public Attribute attr(AttributeKey ak) { + return delegate.attr(ak); + } + + @Override + public boolean hasAttr(AttributeKey ak) { + return delegate.hasAttr(ak); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return delegate.bind(localAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return delegate.connect(remoteAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return delegate.connect(remoteAddress, localAddress); + } + + @Override + public ChannelFuture disconnect() { + return delegate.disconnect(); + } + + @Override + public ChannelFuture close() { + return delegate.close(); + } + + @Override + public ChannelFuture deregister() { + return delegate.deregister(); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return delegate.bind(localAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return delegate.connect(remoteAddress, promise); + } + + @Override + public ChannelFuture connect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return delegate.connect(remoteAddress, localAddress, promise); + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return delegate.disconnect(promise); + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return delegate.close(promise); + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return delegate.deregister(promise); + } + + @Override + public ChannelFuture write(Object msg) { + return delegate.write(msg); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + return delegate.write(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return delegate.writeAndFlush(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return delegate.writeAndFlush(msg); + } + + @Override + public ChannelPromise newPromise() { + return delegate.newPromise(); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return delegate.newProgressivePromise(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return delegate.newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return delegate.newFailedFuture(cause); + } + + @Override + public ChannelPromise voidPromise() { + return delegate.voidPromise(); + } + + @Override + public int compareTo(Channel o) { + return delegate.compareTo(o); + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/transport/UnixSocketServerHcd.java b/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/transport/UnixSocketServerHcd.java new file mode 100644 index 00000000..4aab7a36 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/java/org/apache/cassandra/transport/UnixSocketServerHcd.java @@ -0,0 +1,409 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.cassandra.transport; + +import com.datastax.mgmtapi.ipc.IPCController; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.VoidChannelPromise; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.Attribute; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.messages.AuthenticateMessage; +import org.apache.cassandra.transport.messages.ErrorMessage; +import org.apache.cassandra.transport.messages.ReadyMessage; +import org.apache.cassandra.transport.messages.StartupMessage; +import org.apache.cassandra.transport.messages.SupportedMessage; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UnixSocketServerHcd { + private static final Logger logger = LoggerFactory.getLogger(IPCController.class); + + // Names of handlers used in pre-V5 pipelines + private static final String ENVELOPE_DECODER = "envelopeDecoder"; + private static final String ENVELOPE_ENCODER = "envelopeEncoder"; + private static final String MESSAGE_DECOMPRESSOR = "decompressor"; + private static final String MESSAGE_COMPRESSOR = "compressor"; + private static final String MESSAGE_DECODER = "messageDecoder"; + private static final String MESSAGE_ENCODER = "messageEncoder"; + private static final String LEGACY_MESSAGE_PROCESSOR = "legacyCqlProcessor"; + private static final String INITIAL_HANDLER = "initialHandler"; + private static final String EXCEPTION_HANDLER = "exceptionHandler"; + + public static ChannelInitializer makeSocketInitializer( + final Server.ConnectionTracker connectionTracker) { + logger.debug("Creating Channel Initializer"); + return new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + + pipeline.addLast(ENVELOPE_ENCODER, Envelope.Encoder.instance); + final _ConnectionFactory factory = new _ConnectionFactory(connectionTracker); + pipeline.addLast( + INITIAL_HANDLER, new PipelineChannelInitializer(new Envelope.Decoder(), factory)); + /** + * The exceptionHandler will take care of handling exceptionCaught(...) events while still + * running on the same EventLoop as all previous added handlers in the pipeline. This is + * important as the used eventExecutorGroup may not enforce strict ordering for channel + * events. As the exceptionHandler runs in the EventLoop as the previous handlers we are + * sure all exceptions are correctly handled before the handler itself is removed. See + * https://issues.apache.org/jira/browse/CASSANDRA-13649 + */ + pipeline.addLast(EXCEPTION_HANDLER, PreV5Handlers.ExceptionHandler.instance); + } + }; + } + + @ChannelHandler.Sharable + static class UnixSockMessage extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Message.Request request) + throws Exception { + final Message.Response response; + final UnixSocketConnection connection; + long queryStartNanoTime = System.nanoTime(); + + try { + assert request.connection() instanceof UnixSocketConnection; + connection = (UnixSocketConnection) request.connection(); + if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) + ClientWarn.instance.captureWarnings(); + + QueryState qstate = + connection.validateNewMessage( + request.type, connection.getVersion(), request.getStreamId()); + // logger.info("Executing {} {} {}", request, connection.getVersion(), + // request.getStreamId()); + + Message.Response r = request.execute(qstate, queryStartNanoTime); + + // UnixSocket has no auth + response = r instanceof AuthenticateMessage ? new ReadyMessage() : r; + + response.setStreamId(request.getStreamId()); + response.setWarnings(ClientWarn.instance.getWarnings()); + response.attach(connection); + connection.applyStateTransition(request.type, response.type); + } catch (Throwable t) { + // logger.warn("Exception encountered", t); + JVMStabilityInspector.inspectThrowable(t); + ExceptionHandlers.UnexpectedChannelExceptionHandler handler = + new ExceptionHandlers.UnexpectedChannelExceptionHandler(ctx.channel(), true); + ctx.writeAndFlush( + ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId())); + request.getSource().release(); + return; + } finally { + ClientWarn.instance.resetWarnings(); + } + + ctx.writeAndFlush(response); + request.getSource().release(); + } + } + + static class UnixSocketConnection extends ServerConnection { + private enum State { + UNINITIALIZED, + AUTHENTICATION, + READY + } + + private final ClientState clientState; + private volatile State state; + // private final ConcurrentMap queryStates = new ConcurrentHashMap<>(); + + public UnixSocketConnection( + Channel channel, ProtocolVersion version, Connection.Tracker tracker) { + super(channel, version, tracker); + this.clientState = ClientState.forInternalCalls(); + this.state = State.UNINITIALIZED; + } + + @Override + public QueryState validateNewMessage(Message.Type type, ProtocolVersion version) { + return validateNewMessage(type, version, -1); + } + + public QueryState validateNewMessage(Message.Type type, ProtocolVersion version, int streamId) { + switch (state) { + case UNINITIALIZED: + if (type != Message.Type.STARTUP && type != Message.Type.OPTIONS) + throw new ProtocolException( + String.format("Unexpected message %s, expecting STARTUP or OPTIONS", type)); + break; + case AUTHENTICATION: + // Support both SASL auth from protocol v2 and the older style Credentials auth from v1 + if (type != Message.Type.AUTH_RESPONSE && type != Message.Type.CREDENTIALS) + throw new ProtocolException( + String.format( + "Unexpected message %s, expecting %s", + type, version == ProtocolVersion.V1 ? "CREDENTIALS" : "SASL_RESPONSE")); + break; + case READY: + if (type == Message.Type.STARTUP) + throw new ProtocolException( + "Unexpected message STARTUP, the connection is already initialized"); + break; + default: + throw new AssertionError(); + } + return new QueryState(clientState); + } + + @Override + public void applyStateTransition(Message.Type requestType, Message.Type responseType) { + switch (state) { + case UNINITIALIZED: + if (requestType == Message.Type.STARTUP) { + // Just set the state to READY as the Unix socket needs to bypass authentication + state = State.READY; + } + break; + case AUTHENTICATION: + // Support both SASL auth from protocol v2 and the older style Credentials auth from v1 + assert requestType == Message.Type.AUTH_RESPONSE + || requestType == Message.Type.CREDENTIALS; + + if (responseType == Message.Type.READY || responseType == Message.Type.AUTH_SUCCESS) { + state = State.READY; + // we won't use the authenticator again, null it so that it can be GC'd + } + break; + case READY: + break; + default: + throw new AssertionError(); + } + } + + @Override + public IAuthenticator.SaslNegotiator getSaslNegotiator(QueryState queryState) { + return null; + } + } + + static class PipelineChannelInitializer extends ByteToMessageDecoder { + Envelope.Decoder decoder; + Connection.Factory factory; + + PipelineChannelInitializer(Envelope.Decoder decoder, Connection.Factory factory) { + this.decoder = decoder; + this.factory = factory; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) + throws Exception { + Envelope inbound = decoder.decode(buffer); + if (inbound == null) return; + + try { + Envelope outbound; + switch (inbound.header.type) { + case OPTIONS: + logger.debug("OPTIONS received {}", inbound.header.version); + List cqlVersions = new ArrayList<>(); + cqlVersions.add(QueryProcessor.CQL_VERSION.toString()); + + List compressions = new ArrayList<>(); + if (Compressor.SnappyCompressor.instance != null) compressions.add("snappy"); + // LZ4 is always available since worst case scenario it default to a pure JAVA implem. + compressions.add("lz4"); + + Map> supportedOptions = new HashMap<>(); + supportedOptions.put(StartupMessage.CQL_VERSION, cqlVersions); + supportedOptions.put(StartupMessage.COMPRESSION, compressions); + supportedOptions.put( + StartupMessage.PROTOCOL_VERSIONS, ProtocolVersion.supportedVersions()); + SupportedMessage supported = new SupportedMessage(supportedOptions); + outbound = supported.encode(inbound.header.version); + ctx.writeAndFlush(outbound); + break; + + case STARTUP: + Attribute attrConn = ctx.channel().attr(Connection.attributeKey); + Connection connection = attrConn.get(); + if (connection == null) { + connection = factory.newConnection(ctx.channel(), inbound.header.version); + attrConn.set(connection); + } + assert connection instanceof ServerConnection; + + StartupMessage startup = + (StartupMessage) Message.Decoder.decodeMessage(ctx.channel(), inbound); + // InetAddress remoteAddress = ((InetSocketAddress) + // ctx.channel().remoteAddress()).getAddress(); + // final ClientResourceLimits.Allocator allocator = + // ClientResourceLimits.getAllocatorForEndpoint(remoteAddress); + + ChannelPromise promise; + if (inbound.header.version.isGreaterOrEqualTo(ProtocolVersion.V5)) { + // v5 not yet supported + logger.warn("PROTOCOL v5 not yet supported."); + } + // no need to configure the pipeline asynchronously in this case + // the capacity obtained from allocator for the STARTUP message + // is released when flushed by the legacy dispatcher/flusher so + // there's no need to explicitly release that here either. + + ChannelPipeline pipeline = ctx.channel().pipeline(); + pipeline.addBefore(ENVELOPE_ENCODER, ENVELOPE_DECODER, new Envelope.Decoder()); + pipeline.addBefore( + INITIAL_HANDLER, MESSAGE_DECOMPRESSOR, Envelope.Decompressor.instance); + pipeline.addBefore(INITIAL_HANDLER, MESSAGE_COMPRESSOR, Envelope.Compressor.instance); + pipeline.addBefore( + INITIAL_HANDLER, MESSAGE_DECODER, PreV5Handlers.ProtocolDecoder.instance); + pipeline.addBefore( + INITIAL_HANDLER, MESSAGE_ENCODER, PreV5Handlers.ProtocolEncoder.instance); + pipeline.addBefore(INITIAL_HANDLER, LEGACY_MESSAGE_PROCESSOR, new UnixSockMessage()); + pipeline.remove(INITIAL_HANDLER); + + promise = new VoidChannelPromise(ctx.channel(), false); + + // HCD 1.2 uses Converged Core 5, which will advance over time to pull in Cassandra 5.0 + // code. For now, this bit will act a lot like the Cassandra 4.1 code changes mentioned + // below. + + // In Cassandra 4.1.3, Dispatcher.processRequest method signatures changed in order to + // add CASSANDRA-15241 (https://issues.apache.org/jira/browse/CASSANDRA-15241). To + // avoid splitting the 4.1 agent based on which version of Cassandra it runs with, + // we'll use reflection here to determine the correct method to invoke. + + // In Cassandra 4.1.6, Dispatcher.processRequest method signatures changed again for + // CASSANDRA-19534. The long value for start time was replaced by RequestTime + Message.Response response = null; + try { + // try to see if the Converged Core 5 object RequestTime exists + Class dispatcherRequestTime = + Class.forName("org.apache.cassandra.transport.Dispatcher$RequestTime"); + // this version of CC has it, get Dispatcher.RequestTime.forImmediateExecution() + Method forImmediateExecution = + dispatcherRequestTime.getDeclaredMethod("forImmediateExecution"); + Method processRequestMethod = + Dispatcher.class.getDeclaredMethod( + "processRequest", + Channel.class, + Message.Request.class, + ClientResourceLimits.Overload.class, + dispatcherRequestTime); + response = + (Message.Response) + processRequestMethod.invoke( + null, + ctx.channel(), + startup, + ClientResourceLimits.Overload.NONE, + forImmediateExecution.invoke(null)); + + } catch (ClassNotFoundException cnfe) { + logger.debug( + "Dispatcher$RequestTime in Converged Core 5 not found, trying Dispatcher.processRequest with primitive long from older versions"); + try { + // try to get the CC5 version with a primitive long instead of RequestTime + Method processRequestMethod = + Dispatcher.class.getDeclaredMethod( + "processRequest", + Channel.class, + Message.Request.class, + ClientResourceLimits.Overload.class, + long.class); + // CC5 method found so we'll need to invoke it with a start time + response = + (Message.Response) + processRequestMethod.invoke( + null, + ctx.channel(), + startup, + ClientResourceLimits.Overload.NONE, + MonotonicClock.Global.approxTime.now()); + } catch (NoSuchMethodException ex) { + // CC5 version has an older signature still + logger.debug( + "Cassandra Dispatcher.processRequest() with primitve long not found, trying yet an older signature"); + try { + Method processRequestMethod = + Dispatcher.class.getDeclaredMethod( + "processRequest", + Channel.class, + Message.Request.class, + ClientResourceLimits.Overload.class); + response = + (Message.Response) + processRequestMethod.invoke( + null, ctx.channel(), startup, ClientResourceLimits.Overload.NONE); + } catch (NoSuchMethodException ex2) { + // something is broken, need to figure out what method/signature should be used + logger.debug( + "Expected Cassandra Dispatcher.processRequest() method signature not found. Management API agent will not be able to start Cassandra.", + ex2); + throw ex2; + } + } + } + if (response.type.equals(Message.Type.AUTHENTICATE)) + // bypass authentication + response = new ReadyMessage(); + + outbound = response.encode(inbound.header.version); + ctx.writeAndFlush(outbound, promise); + logger.debug("Configured pipeline: {}", ctx.pipeline()); + break; + + default: + ErrorMessage error = + ErrorMessage.fromException( + new ProtocolException( + String.format( + "Unexpected message %s, expecting STARTUP or OPTIONS", + inbound.header.type))); + outbound = error.encode(inbound.header.version); + ctx.writeAndFlush(outbound); + } + } finally { + inbound.release(); + } + } + } + + public static class _ConnectionFactory implements Connection.Factory { + + private final Server.ConnectionTracker connectionTracker; + + public _ConnectionFactory(Server.ConnectionTracker connectionTracker) { + this.connectionTracker = connectionTracker; + } + + @Override + public Connection newConnection(Channel chnl, ProtocolVersion pv) { + if (chnl.remoteAddress() != null) { + // need to wrap the channel + Channel channelWraper = new NettyChannelWrapper(chnl); + return new UnixSocketConnection(channelWraper, pv, connectionTracker); + } + return new UnixSocketConnection(chnl, pv, connectionTracker); + } + } +} diff --git a/management-api-agent-hcd-1.2.x/src/main/resources/META-INF/services/com.datastax.mgmtapi.CassandraAPIServiceProvider b/management-api-agent-hcd-1.2.x/src/main/resources/META-INF/services/com.datastax.mgmtapi.CassandraAPIServiceProvider new file mode 100644 index 00000000..51de0bba --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/resources/META-INF/services/com.datastax.mgmtapi.CassandraAPIServiceProvider @@ -0,0 +1 @@ +com.datastax.mgmtapi.CassandraAPIServiceProviderHcd diff --git a/management-api-agent-hcd-1.2.x/src/main/resources/META-INF/services/com.datastax.mgmtapi.rpc.RpcMethodServiceProvider b/management-api-agent-hcd-1.2.x/src/main/resources/META-INF/services/com.datastax.mgmtapi.rpc.RpcMethodServiceProvider new file mode 100644 index 00000000..b9115ff6 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/main/resources/META-INF/services/com.datastax.mgmtapi.rpc.RpcMethodServiceProvider @@ -0,0 +1 @@ +com.datastax.mgmtapi.rpc.RpcMethodServiceProviderHcd diff --git a/management-api-agent-hcd-1.2.x/src/test/java/com/datastax/mgmtapi/rpc/ObjectSerializerHcdTest.java b/management-api-agent-hcd-1.2.x/src/test/java/com/datastax/mgmtapi/rpc/ObjectSerializerHcdTest.java new file mode 100644 index 00000000..4ac07d12 --- /dev/null +++ b/management-api-agent-hcd-1.2.x/src/test/java/com/datastax/mgmtapi/rpc/ObjectSerializerHcdTest.java @@ -0,0 +1,23 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package com.datastax.mgmtapi.rpc; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ObjectSerializerHcdTest + extends ObjectSerializerTestBase> { + + @Override + protected ObjectSerializerHcd createExampleSerializer() { + return new ObjectSerializerHcd<>(Example.class); + } + + @Override + protected String getCqlType(ObjectSerializerHcd serializer, String fieldName) { + assertThat(serializer.serializers).containsKey(fieldName); + return serializer.serializers.get(fieldName).type.toString(); + } +} diff --git a/management-api-agent-hcd/README.md b/management-api-agent-hcd/README.md index d3a41ba4..f5ab6da7 100644 --- a/management-api-agent-hcd/README.md +++ b/management-api-agent-hcd/README.md @@ -1,8 +1,16 @@ # Management API with HCD (Hyper-Converged Database) -It is important to note that all HCD dependencies should only be specified in the HCD agent module. No HCD dependencies +It is important to note that all HCD dependencies should only be specified in the HCD agent modules. No HCD dependencies can be added to any other projects/modules, as users without access to HCD artifacts won't be able to build the OSS Management API. +## HCD versions + +As of this document edit, there are 2 versions of HCD in development. Version 1.0.x is currently maintained on the `hcd-1.0` branch +of the HCD repository. Version 1.2.x is maintained on the `main` branch of the repository. The major difference between the two +versions is the Converged Cassandra Core that is used. HCD 1.0.x uses Converged Core 4, while HCD 1.2.x uses Converged Core 5. As +with Cassandra versions, the HCD agent has to be broken into 2 sub-modules for compiling compatibility. The version in this +sub-module is for HCD 1.0.x. For HCD 1.2.x, use the agent in sub-module `management-api-agent-hcd-1.2.x`. + ## Maven Settings In order to build Management API artifacts for HCD (jarfiles and/or Docker images), you will need to have access to the DSE Maven @@ -29,21 +37,23 @@ which is an ongoing effort. ## Docker image builds -OUT OF SCOPE: At the moment, no HCD images are being built as part of this project. They are built from the BDP repo currently. +OUT OF SCOPE: At the moment, no HCD images are being built as part of this project. They are built from the HCD repo currently. ### Building HCD images locally -OUT OF SCOPE: At the moment, no HCD images are being built as part of this project. They are built from the BDP repo currently. +OUT OF SCOPE: At the moment, no HCD images are being built as part of this project. They are built from the HCD repo currently. -If you have access to the BDP repository, you can build an image from the `7.0-dev` branch. Use the following from the BDP repository root: +If you have access to the HCD repository, you can build an image from the `hcd-1.0` branch. Use the following from the HCD repository root: ```sh -./mvnw clean verify -P binary-distribution +./mvnw clean verify ``` -### Building a specific version of DSE +### Building a specific version of HCD -NOT APPLICABLE +HCD versions are maintained in branch names with the format `hcd-.` (for example `hcd-1.1`). The latest/current version +pf HCD will be in the `main` branch (version 1.2.x as of this edit). Building a specific versions of HCD simply requires you to checkout +the version bracnh (or `main` if you wanto build the latest version) and build as above. ## Running a locally built image diff --git a/management-api-agent-hcd/pom.xml b/management-api-agent-hcd/pom.xml index 3a7d0fe3..40cfd441 100644 --- a/management-api-agent-hcd/pom.xml +++ b/management-api-agent-hcd/pom.xml @@ -72,7 +72,7 @@ com.datastax.dse dse-db-all - 4.0.11-88b47593868d + 4.0.11-3b5d38811943 commons-codec diff --git a/pom.xml b/pom.xml index b8ac3c3e..6c22cf5f 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,7 @@ management-api-agent-4.1.x management-api-agent-5.0.x management-api-agent-hcd + management-api-agent-hcd-1.2.x management-api-server