diff --git a/CHANGELOG.md b/CHANGELOG.md
index bbb30d78aa5d0..bba62e97a49e0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
+- Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java
index 978c92870bd75..e3a6dbb4c57b5 100644
--- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java
+++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransport.java
@@ -67,8 +67,8 @@
* @see SecuritySSLNettyHttpServerTransport
*/
public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport {
- public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";
- public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor";
+ public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER;
+ public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR;
private static final Logger logger = LogManager.getLogger(SecureNetty4HttpServerTransport.class);
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java
index ef6b67ea44299..cf841f2e24b1e 100644
--- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java
+++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java
@@ -315,13 +315,11 @@ private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitiali
private final CountDownLatch latch;
private final Collection content;
- private final boolean secure;
private Http2SettingsHandler settingsHandler;
CountDownLatchHandlerHttp2(final CountDownLatch latch, final Collection content, final boolean secure) {
this.latch = latch;
this.content = content;
- this.secure = secure;
}
@Override
diff --git a/plugins/transport-nio/build.gradle b/plugins/transport-nio/build.gradle
index ee557aa0efc79..c0f0150378434 100644
--- a/plugins/transport-nio/build.gradle
+++ b/plugins/transport-nio/build.gradle
@@ -50,6 +50,7 @@ dependencies {
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
+ api "io.netty:netty-transport-native-unix-common:${versions.netty}"
}
tasks.named("dependencyLicenses").configure {
@@ -151,10 +152,6 @@ thirdPartyAudit {
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',
- // from io.netty.channel.unix (netty)
- 'io.netty.channel.unix.FileDescriptor',
- 'io.netty.channel.unix.UnixChannel',
-
'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
diff --git a/plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1 b/plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1
new file mode 100644
index 0000000000000..a80b9e51be74b
--- /dev/null
+++ b/plugins/transport-nio/licenses/netty-transport-native-unix-common-4.1.114.Final.jar.sha1
@@ -0,0 +1 @@
+d1171bb99411f282068f49d780cedf8c9adeabfd
\ No newline at end of file
diff --git a/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java b/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java
index 4f26e8ae65259..c4541e3b1c7d3 100644
--- a/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java
+++ b/plugins/transport-nio/src/internalClusterTest/java/org/opensearch/http/nio/NioPipeliningIT.java
@@ -61,8 +61,8 @@ public void testThatNioHttpServerSupportsPipelining() throws Exception {
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);
- try (NioHttpClient nettyHttpClient = new NioHttpClient()) {
- Collection responses = nettyHttpClient.get(transportAddress.address(), requests);
+ try (NioHttpClient client = NioHttpClient.http()) {
+ Collection responses = client.get(transportAddress.address(), requests);
assertThat(responses, hasSize(5));
Collection opaqueIds = NioHttpClient.returnOpaqueIds(responses);
diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java
index d44515f3dc727..6438cca9cc33d 100644
--- a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java
+++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java
@@ -32,6 +32,7 @@
package org.opensearch.http.nio;
+import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpPipelinedRequest;
@@ -44,6 +45,8 @@
import org.opensearch.nio.TaskScheduler;
import org.opensearch.nio.WriteOperation;
+import javax.net.ssl.SSLEngine;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -58,6 +61,7 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.ssl.SslHandler;
public class HttpReadWriteHandler implements NioChannelHandler {
@@ -77,6 +81,28 @@ public HttpReadWriteHandler(
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock
+ ) {
+ this(
+ nioHttpChannel,
+ transport,
+ settings,
+ taskScheduler,
+ nanoClock,
+ null, /* no header verifier */
+ new HttpContentDecompressor(),
+ null /* no SSL/TLS */
+ );
+ }
+
+ HttpReadWriteHandler(
+ NioHttpChannel nioHttpChannel,
+ NioHttpServerTransport transport,
+ HttpHandlingSettings settings,
+ TaskScheduler taskScheduler,
+ LongSupplier nanoClock,
+ @Nullable ChannelHandler headerVerifier,
+ ChannelHandler decompressor,
+ @Nullable SSLEngine sslEngine
) {
this.nioHttpChannel = nioHttpChannel;
this.transport = transport;
@@ -85,6 +111,12 @@ public HttpReadWriteHandler(
this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());
List handlers = new ArrayList<>(8);
+
+ SslHandler sslHandler = null;
+ if (sslEngine != null) {
+ sslHandler = new SslHandler(sslEngine);
+ }
+
HttpRequestDecoder decoder = new HttpRequestDecoder(
settings.getMaxInitialLineLength(),
settings.getMaxHeaderSize(),
@@ -92,7 +124,10 @@ public HttpReadWriteHandler(
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
handlers.add(decoder);
- handlers.add(new HttpContentDecompressor());
+ if (headerVerifier != null) {
+ handlers.add(headerVerifier);
+ }
+ handlers.add(decompressor);
handlers.add(new HttpResponseEncoder());
handlers.add(new HttpObjectAggregator(settings.getMaxContentLength()));
if (settings.isCompression()) {
@@ -102,7 +137,7 @@ public HttpReadWriteHandler(
handlers.add(new NioHttpResponseCreator());
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
- adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
+ adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0]));
adaptor.addCloseListener((v, e) -> nioHttpChannel.close());
}
diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java
index 0b7f4ee7646d1..426690b4b696d 100644
--- a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java
+++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NettyAdaptor.java
@@ -33,6 +33,7 @@
package org.opensearch.http.nio;
import org.opensearch.ExceptionsHelper;
+import org.opensearch.common.Nullable;
import org.opensearch.nio.FlushOperation;
import org.opensearch.nio.Page;
import org.opensearch.nio.WriteOperation;
@@ -49,6 +50,7 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.ssl.SslHandler;
class NettyAdaptor {
@@ -56,9 +58,13 @@ class NettyAdaptor {
private final LinkedList flushOperations = new LinkedList<>();
NettyAdaptor(ChannelHandler... handlers) {
- nettyChannel = new EmbeddedChannel();
- nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
+ this(null, handlers);
+ }
+ NettyAdaptor(@Nullable SslHandler sslHandler, ChannelHandler... handlers) {
+ this.nettyChannel = new EmbeddedChannel();
+
+ nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// This is a little tricky. The embedded channel will complete the promise once it writes the message
@@ -75,12 +81,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
});
+ if (sslHandler != null) {
+ nettyChannel.pipeline().addAfter("write_captor", "ssl_handler", sslHandler);
+ }
nettyChannel.pipeline().addLast(handlers);
}
public void close() throws Exception {
assert flushOperations.isEmpty() : "Should close outbound operations before calling close";
+ final SslHandler sslHandler = (SslHandler) nettyChannel.pipeline().get("ssl_handler");
+ if (sslHandler != null) {
+ // The nettyChannel.close() or sslHandler.closeOutbound() futures will block indefinitely,
+ // removing the handler instead from the channel.
+ nettyChannel.pipeline().remove(sslHandler);
+ }
+
ChannelFuture closeFuture = nettyChannel.close();
// This should be safe as we are not a real network channel
closeFuture.await();
diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java
index ecf9ad9f17f87..9eca5fc87120d 100644
--- a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java
+++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/NioHttpServerTransport.java
@@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.PlainActionFuture;
+import org.opensearch.common.Nullable;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
@@ -47,6 +48,8 @@
import org.opensearch.http.AbstractHttpServerTransport;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpServerChannel;
+import org.opensearch.http.HttpServerTransport;
+import org.opensearch.http.nio.ssl.SslUtils;
import org.opensearch.nio.BytesChannelContext;
import org.opensearch.nio.ChannelFactory;
import org.opensearch.nio.Config;
@@ -56,16 +59,28 @@
import org.opensearch.nio.NioSocketChannel;
import org.opensearch.nio.ServerChannelContext;
import org.opensearch.nio.SocketChannelContext;
+import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.transport.TransportAdapterProvider;
import org.opensearch.transport.nio.NioGroupFactory;
import org.opensearch.transport.nio.PageAllocator;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpContentDecompressor;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
@@ -83,6 +98,9 @@
public class NioHttpServerTransport extends AbstractHttpServerTransport {
private static final Logger logger = LogManager.getLogger(NioHttpServerTransport.class);
+ public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER;
+ public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR;
+
protected final PageAllocator pageAllocator;
private final NioGroupFactory nioGroupFactory;
@@ -97,6 +115,34 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private volatile NioGroup nioGroup;
private ChannelFactory channelFactory;
+ private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
+
+ public NioHttpServerTransport(
+ Settings settings,
+ NetworkService networkService,
+ BigArrays bigArrays,
+ PageCacheRecycler pageCacheRecycler,
+ ThreadPool threadPool,
+ NamedXContentRegistry xContentRegistry,
+ Dispatcher dispatcher,
+ NioGroupFactory nioGroupFactory,
+ ClusterSettings clusterSettings,
+ Tracer tracer
+ ) {
+ this(
+ settings,
+ networkService,
+ bigArrays,
+ pageCacheRecycler,
+ threadPool,
+ xContentRegistry,
+ dispatcher,
+ nioGroupFactory,
+ clusterSettings,
+ null,
+ tracer
+ );
+ }
public NioHttpServerTransport(
Settings settings,
@@ -108,6 +154,7 @@ public NioHttpServerTransport(
Dispatcher dispatcher,
NioGroupFactory nioGroupFactory,
ClusterSettings clusterSettings,
+ @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
Tracer tracer
) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
@@ -127,6 +174,7 @@ public NioHttpServerTransport(
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
this.tcpSendBufferSize = Math.toIntExact(SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings).getBytes());
this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes());
+ this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
logger.debug(
"using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}],"
@@ -178,8 +226,8 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOExcep
return httpServerChannel;
}
- protected ChannelFactory channelFactory() {
- return new HttpChannelFactory();
+ protected ChannelFactory channelFactory() throws SSLException {
+ return new HttpChannelFactory(secureHttpTransportSettingsProvider);
}
protected void acceptChannel(NioSocketChannel socketChannel) {
@@ -187,8 +235,11 @@ protected void acceptChannel(NioSocketChannel socketChannel) {
}
private class HttpChannelFactory extends ChannelFactory {
+ private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
+ private final ChannelInboundHandlerAdapter headerVerifier;
+ private final TransportAdapterProvider decompressorProvider;
- private HttpChannelFactory() {
+ private HttpChannelFactory(@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider) {
super(
tcpNoDelay,
tcpKeepAlive,
@@ -199,17 +250,85 @@ private HttpChannelFactory() {
tcpSendBufferSize,
tcpReceiveBufferSize
);
+ this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
+
+ final List headerVerifiers = getHeaderVerifiers(secureHttpTransportSettingsProvider);
+ final Optional> decompressorProviderOpt = getDecompressorProvider(
+ secureHttpTransportSettingsProvider
+ );
+
+ // There could be multiple request decompressor providers configured, using the first one
+ decompressorProviderOpt.ifPresent(p -> logger.debug("Using request decompressor provider: {}", p));
+
+ if (headerVerifiers.size() > 1) {
+ throw new IllegalArgumentException(
+ "Cannot have more than one header verifier configured, supplied " + headerVerifiers.size()
+ );
+ }
+
+ this.headerVerifier = headerVerifiers.isEmpty() ? null : headerVerifiers.get(0);
+ this.decompressorProvider = decompressorProviderOpt.orElseGet(() -> new TransportAdapterProvider() {
+ @Override
+ public String name() {
+ return REQUEST_DECOMPRESSOR;
+ }
+
+ @Override
+ public Optional create(Settings settings, HttpServerTransport transport, Class adapterClass) {
+ return Optional.empty();
+ }
+ });
+
+ }
+
+ private List getHeaderVerifiers(
+ @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider
+ ) {
+ if (secureHttpTransportSettingsProvider == null) {
+ return Collections.emptyList();
+ }
+
+ return secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(settings)
+ .stream()
+ .filter(p -> REQUEST_HEADER_VERIFIER.equalsIgnoreCase(p.name()))
+ .map(p -> p.create(settings, NioHttpServerTransport.this, ChannelInboundHandlerAdapter.class))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ }
+
+ private Optional> getDecompressorProvider(
+ @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider
+ ) {
+ if (secureHttpTransportSettingsProvider == null) {
+ return Optional.empty();
+ }
+
+ return secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(settings)
+ .stream()
+ .filter(p -> REQUEST_DECOMPRESSOR.equalsIgnoreCase(p.name()))
+ .findFirst();
}
@Override
- public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) {
+ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) throws IOException {
+ SSLEngine engine = null;
+ if (secureHttpTransportSettingsProvider != null) {
+ engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, NioHttpServerTransport.this)
+ .orElseGet(SslUtils::createDefaultServerSSLEngine);
+ }
+
NioHttpChannel httpChannel = new NioHttpChannel(channel);
HttpReadWriteHandler handler = new HttpReadWriteHandler(
httpChannel,
NioHttpServerTransport.this,
handlingSettings,
selector.getTaskScheduler(),
- threadPool::relativeTimeInMillis
+ threadPool::relativeTimeInMillis,
+ headerVerifier,
+ decompressorProvider.create(settings, NioHttpServerTransport.this, ChannelInboundHandlerAdapter.class)
+ .orElseGet(HttpContentDecompressor::new),
+ engine
);
Consumer exceptionHandler = (e) -> onException(httpChannel, e);
SocketChannelContext context = new BytesChannelContext(
@@ -244,6 +363,5 @@ public NioHttpServerChannel createServerChannel(
httpServerChannel.setContext(context);
return httpServerChannel;
}
-
}
}
diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java
new file mode 100644
index 0000000000000..afd67f9799273
--- /dev/null
+++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/SslUtils.java
@@ -0,0 +1,48 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ *
+ * Modifications Copyright OpenSearch Contributors. See
+ * GitHub history for details.
+ */
+package org.opensearch.http.nio.ssl;
+
+import org.opensearch.OpenSearchSecurityException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import java.security.NoSuchAlgorithmException;
+
+public class SslUtils {
+ private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };
+
+ private SslUtils() {
+
+ }
+
+ public static SSLEngine createDefaultServerSSLEngine() {
+ try {
+ final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
+ engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
+ engine.setUseClientMode(false);
+ return engine;
+ } catch (final NoSuchAlgorithmException ex) {
+ throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex);
+ }
+ }
+
+ public static SSLEngine createDefaultClientSSLEngine() {
+ try {
+ final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
+ engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
+ engine.setUseClientMode(true);
+ return engine;
+ } catch (final NoSuchAlgorithmException ex) {
+ throw new OpenSearchSecurityException("Unable to initialize default client SSL engine", ex);
+ }
+ }
+}
diff --git a/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java
new file mode 100644
index 0000000000000..a67f8247ebd4d
--- /dev/null
+++ b/plugins/transport-nio/src/main/java/org/opensearch/http/nio/ssl/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/**
+ * SSL supporting utility classes
+ */
+package org.opensearch.http.nio.ssl;
diff --git a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java
index d4be876867651..7707369b59120 100644
--- a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java
+++ b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java
@@ -47,9 +47,11 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.http.HttpServerTransport;
+import org.opensearch.http.HttpServerTransport.Dispatcher;
import org.opensearch.http.nio.NioHttpServerTransport;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
+import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
@@ -66,6 +68,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
public static final String NIO_TRANSPORT_NAME = "nio-transport";
public static final String NIO_HTTP_TRANSPORT_NAME = "nio-http-transport";
+ public static final String NIO_SECURE_HTTP_TRANSPORT_NAME = "nio-http-transport-secure";
private static final Logger logger = LogManager.getLogger(NioTransportPlugin.class);
@@ -140,6 +143,38 @@ public Map> getHttpTransports(
);
}
+ @Override
+ public Map> getSecureHttpTransports(
+ Settings settings,
+ ThreadPool threadPool,
+ BigArrays bigArrays,
+ PageCacheRecycler pageCacheRecycler,
+ CircuitBreakerService circuitBreakerService,
+ NamedXContentRegistry xContentRegistry,
+ NetworkService networkService,
+ Dispatcher dispatcher,
+ ClusterSettings clusterSettings,
+ SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
+ Tracer tracer
+ ) {
+ return Collections.singletonMap(
+ NIO_SECURE_HTTP_TRANSPORT_NAME,
+ () -> new NioHttpServerTransport(
+ settings,
+ networkService,
+ bigArrays,
+ pageCacheRecycler,
+ threadPool,
+ xContentRegistry,
+ dispatcher,
+ getNioGroupFactory(settings),
+ clusterSettings,
+ secureHttpTransportSettingsProvider,
+ tracer
+ )
+ );
+ }
+
private synchronized NioGroupFactory getNioGroupFactory(Settings settings) {
NioGroupFactory nioGroupFactory = groupFactory.get();
if (nioGroupFactory != null) {
diff --git a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java
index 45e51c6855f79..ff878eb55e411 100644
--- a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java
+++ b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpClient.java
@@ -71,6 +71,7 @@
import java.util.function.Consumer;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -83,6 +84,10 @@
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
@@ -92,7 +97,7 @@
/**
* Tiny helper to send http requests over nio.
*/
-class NioHttpClient implements Closeable {
+public class NioHttpClient implements Closeable {
static Collection returnOpaqueIds(Collection responses) {
List list = new ArrayList<>(responses.size());
@@ -105,9 +110,11 @@ static Collection returnOpaqueIds(Collection responses
private static final Logger logger = LogManager.getLogger(NioHttpClient.class);
private final NioSelectorGroup nioGroup;
+ private final boolean secure;
- NioHttpClient() {
+ private NioHttpClient(final boolean secure) {
try {
+ this.secure = secure;
nioGroup = new NioSelectorGroup(
daemonThreadFactory(Settings.EMPTY, "nio-http-client"),
1,
@@ -118,6 +125,14 @@ static Collection returnOpaqueIds(Collection responses
}
}
+ public static NioHttpClient http() {
+ return new NioHttpClient(false);
+ }
+
+ public static NioHttpClient https() {
+ return new NioHttpClient(true);
+ }
+
public Collection get(InetSocketAddress remoteAddress, String... uris) throws InterruptedException {
Collection requests = new ArrayList<>(uris.length);
for (int i = 0; i < uris.length; i++) {
@@ -138,7 +153,8 @@ public final FullHttpResponse send(InetSocketAddress remoteAddress, FullHttpRequ
public final NioSocketChannel connect(InetSocketAddress remoteAddress) {
ChannelFactory factory = new ClientChannelFactory(
new CountDownLatch(0),
- new ArrayList<>()
+ new ArrayList<>(),
+ secure
);
try {
NioSocketChannel nioSocketChannel = nioGroup.openChannel(remoteAddress, factory);
@@ -160,7 +176,7 @@ private synchronized Collection sendRequests(InetSocketAddress
final CountDownLatch latch = new CountDownLatch(requests.size());
final Collection content = Collections.synchronizedList(new ArrayList<>(requests.size()));
- ChannelFactory factory = new ClientChannelFactory(latch, content);
+ ChannelFactory factory = new ClientChannelFactory(latch, content, secure);
NioSocketChannel nioSocketChannel = null;
try {
@@ -196,8 +212,9 @@ private class ClientChannelFactory extends ChannelFactory content;
+ private final boolean secure;
- private ClientChannelFactory(CountDownLatch latch, Collection content) {
+ private ClientChannelFactory(CountDownLatch latch, Collection content, final boolean secure) {
super(
NetworkService.TCP_NO_DELAY.get(Settings.EMPTY),
NetworkService.TCP_KEEP_ALIVE.get(Settings.EMPTY),
@@ -210,12 +227,14 @@ private ClientChannelFactory(CountDownLatch latch, Collection
);
this.latch = latch;
this.content = content;
+ this.secure = secure;
}
@Override
- public NioSocketChannel createChannel(NioSelector selector, java.nio.channels.SocketChannel channel, Config.Socket socketConfig) {
+ public NioSocketChannel createChannel(NioSelector selector, java.nio.channels.SocketChannel channel, Config.Socket socketConfig)
+ throws IOException {
NioSocketChannel nioSocketChannel = new NioSocketChannel(channel);
- HttpClientHandler handler = new HttpClientHandler(nioSocketChannel, latch, content);
+ HttpClientHandler handler = new HttpClientHandler(nioSocketChannel, latch, content, secure);
Consumer exceptionHandler = (e) -> {
latch.countDown();
onException(e);
@@ -249,17 +268,34 @@ private static class HttpClientHandler implements NioChannelHandler {
private final CountDownLatch latch;
private final Collection content;
- private HttpClientHandler(NioSocketChannel channel, CountDownLatch latch, Collection content) {
+ private HttpClientHandler(
+ NioSocketChannel channel,
+ CountDownLatch latch,
+ Collection content,
+ final boolean secure
+ ) throws IOException {
this.latch = latch;
this.content = content;
final int maxContentLength = Math.toIntExact(new ByteSizeValue(100, ByteSizeUnit.MB).getBytes());
List handlers = new ArrayList<>(5);
+
+ SslHandler sslHandler = null;
+ if (secure) {
+ sslHandler = new SslHandler(
+ SslContextBuilder.forClient()
+ .clientAuth(ClientAuth.NONE)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build()
+ .newEngine(UnpooledByteBufAllocator.DEFAULT)
+ );
+ }
+
handlers.add(new HttpResponseDecoder());
handlers.add(new HttpRequestEncoder());
handlers.add(new HttpContentDecompressor());
handlers.add(new HttpObjectAggregator(maxContentLength));
- adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
+ adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0]));
adaptor.addCloseListener((v, e) -> channel.close());
}
diff --git a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java
index 09594673de5b2..61b42f2a6b77a 100644
--- a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java
+++ b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/NioHttpServerTransportTests.java
@@ -193,7 +193,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
- try (NioHttpClient client = new NioHttpClient()) {
+ try (NioHttpClient client = NioHttpClient.http()) {
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
request.headers().set(HttpHeaderNames.EXPECT, expectation);
HttpUtil.setContentLength(request, contentLength);
@@ -310,7 +310,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
// Test pre-flight request
- try (NioHttpClient client = new NioHttpClient()) {
+ try (NioHttpClient client = NioHttpClient.http()) {
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/");
request.headers().add(CorsHandler.ORIGIN, "test-cors.org");
request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST");
@@ -327,7 +327,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
}
// Test short-circuited request
- try (NioHttpClient client = new NioHttpClient()) {
+ try (NioHttpClient client = NioHttpClient.http()) {
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
request.headers().add(CorsHandler.ORIGIN, "google.com");
@@ -384,7 +384,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
- try (NioHttpClient client = new NioHttpClient()) {
+ try (NioHttpClient client = NioHttpClient.http()) {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
final FullHttpResponse response = client.send(remoteAddress.address(), request);
@@ -451,7 +451,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
- try (NioHttpClient client = new NioHttpClient()) {
+ try (NioHttpClient client = NioHttpClient.http()) {
final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8"));
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
@@ -514,7 +514,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
- try (NioHttpClient client = new NioHttpClient()) {
+ try (NioHttpClient client = NioHttpClient.http()) {
NioSocketChannel channel = null;
try {
CountDownLatch channelClosedLatch = new CountDownLatch(1);
diff --git a/plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java
new file mode 100644
index 0000000000000..1adfe0370344c
--- /dev/null
+++ b/plugins/transport-nio/src/test/java/org/opensearch/http/nio/ssl/SecureNioHttpServerTransportTests.java
@@ -0,0 +1,558 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.http.nio.ssl;
+
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.opensearch.OpenSearchException;
+import org.opensearch.common.network.NetworkAddress;
+import org.opensearch.common.network.NetworkService;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.MockBigArrays;
+import org.opensearch.common.util.MockPageCacheRecycler;
+import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.core.common.bytes.BytesArray;
+import org.opensearch.core.common.transport.TransportAddress;
+import org.opensearch.core.common.unit.ByteSizeValue;
+import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
+import org.opensearch.http.BindHttpException;
+import org.opensearch.http.CorsHandler;
+import org.opensearch.http.HttpServerTransport;
+import org.opensearch.http.HttpTransportSettings;
+import org.opensearch.http.NullDispatcher;
+import org.opensearch.http.nio.NioHttpClient;
+import org.opensearch.http.nio.NioHttpServerTransport;
+import org.opensearch.nio.NioSocketChannel;
+import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
+import org.opensearch.plugins.TransportExceptionHandler;
+import org.opensearch.rest.BytesRestResponse;
+import org.opensearch.rest.RestChannel;
+import org.opensearch.rest.RestRequest;
+import org.opensearch.telemetry.tracing.noop.NoopTracer;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.test.rest.FakeRestRequest;
+import org.opensearch.threadpool.TestThreadPool;
+import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.transport.nio.NioGroupFactory;
+import org.junit.After;
+import org.junit.Before;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+
+import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
+import static org.opensearch.core.rest.RestStatus.OK;
+import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN;
+import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for the secure {@link NioHttpServerTransport} class.
+ */
+public class SecureNioHttpServerTransportTests extends OpenSearchTestCase {
+
+ private NetworkService networkService;
+ private ThreadPool threadPool;
+ private MockBigArrays bigArrays;
+ private MockPageCacheRecycler pageRecycler;
+ private ClusterSettings clusterSettings;
+ private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
+
+ @Before
+ public void setup() throws Exception {
+ networkService = new NetworkService(Collections.emptyList());
+ threadPool = new TestThreadPool("test");
+ pageRecycler = new MockPageCacheRecycler(Settings.EMPTY);
+ bigArrays = new MockBigArrays(pageRecycler, new NoneCircuitBreakerService());
+ clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+
+ secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() {
+ @Override
+ public Optional buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException {
+ try {
+ SSLEngine engine = SslContextBuilder.forServer(
+ SecureNioHttpServerTransportTests.class.getResourceAsStream("/certificate.crt"),
+ SecureNioHttpServerTransportTests.class.getResourceAsStream("/certificate.key")
+ ).trustManager(InsecureTrustManagerFactory.INSTANCE).build().newEngine(UnpooledByteBufAllocator.DEFAULT);
+ return Optional.of(engine);
+ } catch (final IOException ex) {
+ throw new SSLException(ex);
+ }
+ }
+ };
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ if (threadPool != null) {
+ threadPool.shutdownNow();
+ }
+ threadPool = null;
+ networkService = null;
+ bigArrays = null;
+ clusterSettings = null;
+ }
+
+ /**
+ * Test that {@link NioHttpServerTransport} supports the "Expect: 100-continue" HTTP header
+ * @throws InterruptedException if the client communication with the server is interrupted
+ */
+ public void testExpectContinueHeader() throws InterruptedException {
+ final Settings settings = createSettings();
+ final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt());
+ runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE);
+ }
+
+ /**
+ * Test that {@link NioHttpServerTransport} responds to a
+ * 100-continue expectation with too large a content-length
+ * with a 413 status.
+ * @throws InterruptedException if the client communication with the server is interrupted
+ */
+ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException {
+ final String key = HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey();
+ final int maxContentLength = randomIntBetween(1, 104857600);
+ final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build();
+ final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE);
+ runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
+ }
+
+ /**
+ * Test that {@link NioHttpServerTransport} responds to an unsupported expectation with a 417 status.
+ * @throws InterruptedException if the client communication with the server is interrupted
+ */
+ public void testExpectUnsupportedExpectation() throws InterruptedException {
+ Settings settings = createSettings();
+ runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED);
+ }
+
+ private void runExpectHeaderTest(
+ final Settings settings,
+ final String expectation,
+ final int contentLength,
+ final HttpResponseStatus expectedStatus
+ ) throws InterruptedException {
+ final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+ @Override
+ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
+ channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")));
+ }
+
+ @Override
+ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
+ logger.error(
+ new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+ cause
+ );
+ throw new AssertionError();
+ }
+ };
+ try (
+ NioHttpServerTransport transport = new NioHttpServerTransport(
+ settings,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ dispatcher,
+ new NioGroupFactory(settings, logger),
+ clusterSettings,
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ transport.start();
+ final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+ try (NioHttpClient client = NioHttpClient.https()) {
+ final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
+ request.headers().set(HttpHeaderNames.EXPECT, expectation);
+ HttpUtil.setContentLength(request, contentLength);
+
+ final FullHttpResponse response = client.send(remoteAddress.address(), request);
+ try {
+ assertThat(response.status(), equalTo(expectedStatus));
+ if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) {
+ final FullHttpRequest continuationRequest = new DefaultFullHttpRequest(
+ HttpVersion.HTTP_1_1,
+ HttpMethod.POST,
+ "/",
+ Unpooled.EMPTY_BUFFER
+ );
+ final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), continuationRequest);
+ try {
+ assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
+ assertThat(
+ new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8),
+ is("done")
+ );
+ } finally {
+ continuationResponse.release();
+ }
+ }
+ } finally {
+ response.release();
+ }
+ }
+ }
+ }
+
+ public void testBindUnavailableAddress() {
+ Settings initialSettings = createSettings();
+ try (
+ NioHttpServerTransport transport = new NioHttpServerTransport(
+ initialSettings,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ new NullDispatcher(),
+ new NioGroupFactory(Settings.EMPTY, logger),
+ clusterSettings,
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ transport.start();
+ TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+ Settings settings = Settings.builder()
+ .put("http.port", remoteAddress.getPort())
+ .put("network.host", remoteAddress.getAddress())
+ .build();
+ try (
+ NioHttpServerTransport otherTransport = new NioHttpServerTransport(
+ settings,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ new NullDispatcher(),
+ new NioGroupFactory(Settings.EMPTY, logger),
+ clusterSettings,
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start);
+ assertEquals("Failed to bind to " + NetworkAddress.format(remoteAddress.address()), bindHttpException.getMessage());
+ }
+ }
+ }
+
+ public void testCorsRequest() throws InterruptedException {
+ final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+ @Override
+ public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+ logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
+ throw new AssertionError();
+ }
+
+ @Override
+ public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+ logger.error(
+ new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+ cause
+ );
+ throw new AssertionError();
+ }
+
+ };
+
+ final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true)
+ .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org")
+ .build();
+
+ try (
+ NioHttpServerTransport transport = new NioHttpServerTransport(
+ settings,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ dispatcher,
+ new NioGroupFactory(settings, logger),
+ new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ transport.start();
+ final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+ // Test pre-flight request
+ try (NioHttpClient client = NioHttpClient.https()) {
+ final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/");
+ request.headers().add(CorsHandler.ORIGIN, "test-cors.org");
+ request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST");
+
+ final FullHttpResponse response = client.send(remoteAddress.address(), request);
+ try {
+ assertThat(response.status(), equalTo(HttpResponseStatus.OK));
+ assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("test-cors.org"));
+ assertThat(response.headers().get(CorsHandler.VARY), equalTo(CorsHandler.ORIGIN));
+ assertTrue(response.headers().contains(CorsHandler.DATE));
+ } finally {
+ response.release();
+ }
+ }
+
+ // Test short-circuited request
+ try (NioHttpClient client = NioHttpClient.https()) {
+ final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
+ request.headers().add(CorsHandler.ORIGIN, "google.com");
+
+ final FullHttpResponse response = client.send(remoteAddress.address(), request);
+ try {
+ assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN));
+ } finally {
+ response.release();
+ }
+ }
+ }
+ }
+
+ public void testLargeCompressedResponse() throws InterruptedException {
+ final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
+ final String url = "/thing";
+ final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+ @Override
+ public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+ if (url.equals(request.uri())) {
+ channel.sendResponse(new BytesRestResponse(OK, responseString));
+ } else {
+ logger.error("--> Unexpected successful uri [{}]", request.uri());
+ throw new AssertionError();
+ }
+ }
+
+ @Override
+ public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+ logger.error(
+ new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+ cause
+ );
+ throw new AssertionError();
+ }
+
+ };
+
+ try (
+ NioHttpServerTransport transport = new NioHttpServerTransport(
+ Settings.EMPTY,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ dispatcher,
+ new NioGroupFactory(Settings.EMPTY, logger),
+ clusterSettings,
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ transport.start();
+ final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+ try (NioHttpClient client = NioHttpClient.https()) {
+ DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+ request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
+ final FullHttpResponse response = client.send(remoteAddress.address(), request);
+ try {
+ assertThat(response.status(), equalTo(HttpResponseStatus.OK));
+ byte[] bytes = new byte[response.content().readableBytes()];
+ response.content().readBytes(bytes);
+ assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString));
+ } finally {
+ response.release();
+ }
+ }
+ }
+ }
+
+ public void testBadRequest() throws InterruptedException {
+ final AtomicReference causeReference = new AtomicReference<>();
+ final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+ @Override
+ public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+ logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
+ throw new AssertionError();
+ }
+
+ @Override
+ public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+ causeReference.set(cause);
+ try {
+ final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad");
+ channel.sendResponse(new BytesRestResponse(channel, BAD_REQUEST, e));
+ } catch (final IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ };
+
+ final Settings settings;
+ final int maxInitialLineLength;
+ final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
+ if (randomBoolean()) {
+ maxInitialLineLength = httpMaxInitialLineLengthSetting.getDefault(Settings.EMPTY).bytesAsInt();
+ settings = createSettings();
+ } else {
+ maxInitialLineLength = randomIntBetween(1, 8192);
+ settings = createBuilderWithPort().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build();
+ }
+
+ try (
+ NioHttpServerTransport transport = new NioHttpServerTransport(
+ settings,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ dispatcher,
+ new NioGroupFactory(settings, logger),
+ clusterSettings,
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ transport.start();
+ final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+ try (NioHttpClient client = NioHttpClient.https()) {
+ final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8"));
+ final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+
+ final FullHttpResponse response = client.send(remoteAddress.address(), request);
+ try {
+ assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST));
+ assertThat(
+ new String(response.content().array(), Charset.forName("UTF-8")),
+ containsString("you sent a bad request and you should feel bad")
+ );
+ } finally {
+ response.release();
+ }
+ }
+ }
+
+ assertNotNull(causeReference.get());
+ assertThat(causeReference.get(), instanceOf(TooLongFrameException.class));
+ }
+
+ public void testReadTimeout() throws Exception {
+ final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+ @Override
+ public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+ logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
+ throw new AssertionError("Should not have received a dispatched request");
+ }
+
+ @Override
+ public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+ logger.error(
+ new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+ cause
+ );
+ throw new AssertionError("Should not have received a dispatched request");
+ }
+
+ };
+
+ Settings settings = createBuilderWithPort().put(
+ HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(),
+ new TimeValue(randomIntBetween(100, 300))
+ ).build();
+
+ try (
+ NioHttpServerTransport transport = new NioHttpServerTransport(
+ settings,
+ networkService,
+ bigArrays,
+ pageRecycler,
+ threadPool,
+ xContentRegistry(),
+ dispatcher,
+ new NioGroupFactory(settings, logger),
+ new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+ secureHttpTransportSettingsProvider,
+ NoopTracer.INSTANCE
+ )
+ ) {
+ transport.start();
+ final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+ try (NioHttpClient client = NioHttpClient.https()) {
+ NioSocketChannel channel = null;
+ try {
+ CountDownLatch channelClosedLatch = new CountDownLatch(1);
+ channel = client.connect(remoteAddress.address());
+ channel.addCloseListener((r, t) -> channelClosedLatch.countDown());
+ assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES));
+ } finally {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+ }
+ }
+ }
+
+ private Settings createSettings() {
+ return createBuilderWithPort().build();
+ }
+
+ private Settings.Builder createBuilderWithPort() {
+ return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange());
+ }
+}
diff --git a/plugins/transport-nio/src/test/resources/README.txt b/plugins/transport-nio/src/test/resources/README.txt
new file mode 100644
index 0000000000000..a4353cee45a97
--- /dev/null
+++ b/plugins/transport-nio/src/test/resources/README.txt
@@ -0,0 +1,14 @@
+#!/usr/bin/env bash
+#
+# This is README describes how the certificates in this directory were created.
+# This file can also be executed as a script
+#
+
+# 1. Create certificate key
+
+openssl req -x509 -sha256 -newkey rsa:2048 -keyout certificate.key -out certificate.crt -days 1024 -nodes
+
+# 2. Export the certificate in pkcs12 format
+
+openssl pkcs12 -export -in certificate.crt -inkey certificate.key -out server.p12 -name netty4-secure -password pass:password
+
diff --git a/plugins/transport-nio/src/test/resources/certificate.crt b/plugins/transport-nio/src/test/resources/certificate.crt
new file mode 100644
index 0000000000000..54c78fdbcf6de
--- /dev/null
+++ b/plugins/transport-nio/src/test/resources/certificate.crt
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDkzCCAnugAwIBAgIUddAawr5zygcd+Dcn9WVDpO4BJ7YwDQYJKoZIhvcNAQEL
+BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X
+DTI0MDMxNDE5NDQzOVoXDTI3MDEwMjE5NDQzOVowWTELMAkGA1UEBhMCQVUxEzAR
+BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5
+IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
+MIIBCgKCAQEAzjOKkg6Iba5zfZ8b/RYw+PGmGEfbdGuuF10Wz4Jmx/Nk4VfDLxdh
+TW8VllUL2JD7uPkjABj7pW3awAbvIJ+VGbKqfBr1Nsz0mPPzhT8cfuMH/FDZgQs3
+4HuqDKr0LfC1Kw5E3WF0GVMBDNu0U+nKoeqySeYjGdxDnd3W4cqK5AnUxL0RnIny
+Bw7ZuhcU55XndH/Xauro/2EpvJduDsWMdqt7ZfIf1TOmaiQHK+82yb/drVaJbczK
+uTpn1Kv2bnzkQEckgq+z1dLNOOyvP2xf+nsziw5ilJe92e5GJOUJYFAlEgUAGpfD
+dv6j/gTRYvdJCJItOQEQtektNCAZsoc0wwIDAQABo1MwUTAdBgNVHQ4EFgQUzHts
+wIt+zhB/R4U4Do2P6rr0YhkwHwYDVR0jBBgwFoAUzHtswIt+zhB/R4U4Do2P6rr0
+YhkwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAveh870jJX7vt
+oLCrdugsyo79pR4f7Nr1kUy3jJrfoaoUmrjiiiHWgT22fGwp7j1GZF2mVfo8YVaK
+63YNn5gB2NNZhguPOFC4AdvHRYOKRBOaOvWK8oq7BcJ//18JYI/pPnpgkYvJjqv4
+gFKaZX9qWtujHpAmKiVGs7pwYGNXfixPHRNV4owcfHMIH5dhbbqT49j94xVpjbXs
+OymKtFl4kpCE/0LzKFrFcuu55Am1VLBHx2cPpHLOipgUcF5BHFlQ8AXiCMOwfPAw
+d22mLB6Gt1oVEpyvQHYd3e04FetEXQ9E8T+NKWZx/8Ucf+IWBYmZBRxch6O83xgk
+bAbGzqkbzQ==
+-----END CERTIFICATE-----
diff --git a/plugins/transport-nio/src/test/resources/certificate.key b/plugins/transport-nio/src/test/resources/certificate.key
new file mode 100644
index 0000000000000..228350180935d
--- /dev/null
+++ b/plugins/transport-nio/src/test/resources/certificate.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDOM4qSDohtrnN9
+nxv9FjD48aYYR9t0a64XXRbPgmbH82ThV8MvF2FNbxWWVQvYkPu4+SMAGPulbdrA
+Bu8gn5UZsqp8GvU2zPSY8/OFPxx+4wf8UNmBCzfge6oMqvQt8LUrDkTdYXQZUwEM
+27RT6cqh6rJJ5iMZ3EOd3dbhyorkCdTEvRGcifIHDtm6FxTnled0f9dq6uj/YSm8
+l24OxYx2q3tl8h/VM6ZqJAcr7zbJv92tVoltzMq5OmfUq/ZufORARySCr7PV0s04
+7K8/bF/6ezOLDmKUl73Z7kYk5QlgUCUSBQAal8N2/qP+BNFi90kIki05ARC16S00
+IBmyhzTDAgMBAAECggEAVOdiElvLjyX6xeoC00YU6hxOIMdNtHU2HMamwtDV01UD
+38mMQ9KjrQelYt4n34drLrHe2IZw75/5J4JzagJrmUY47psHBwaDXItuZRokeJaw
+zhLYTEs7OcKRtV+a5WOspUrdzi33aQoFb67zZG3qkpsZyFXrdBV+/fy/Iv+MCvLH
+xR0jQ5mzE3cw20R7S4nddChBA/y8oKGOo6QRf2SznC1jL/+yolHvJPEn1v8AUxYm
+BMPHxj1O0c4M4IxnJQ3Y5Jy9OaFMyMsFlF1hVhc/3LDDxDyOuBsVsFDicojyrRea
+GKngIke0yezy7Wo4NUcp8YQhafonpWVsSJJdOUotcQKBgQD0rihFBXVtcG1d/Vy7
+FvLHrmccD56JNV744LSn2CDM7W1IulNbDUZINdCFqL91u5LpxozeE1FPY1nhwncJ
+N7V7XYCaSLCuV1YJzRmUCjnzk2RyopGpzWog3f9uUFGgrk1HGbNAv99k/REya6Iu
+IRSkuQhaJOj3bRXzonh0K4GjewKBgQDXvamtCioOUMSP8vq919YMkBw7F+z/fr0p
+pamO8HL9eewAUg6N92JQ9kobSo/GptdmdHIjs8LqnS5C3H13GX5Qlf5GskOlCpla
+V55ElaSp0gvKwWE168U7gQH4etPQAXXJrOGFaGbPj9W81hTUud7HVE88KYdfWTBo
+I7TuE25tWQKBgBRjcr2Vn9xXsvVTCGgamG5lLPhcoNREGz7X0pXt34XT/vhBdnKu
+331i5pZMom+YCrzqK5DRwUPBPpseTjb5amj2OKIijn5ojqXQbmI0m/GdBZC71TF2
+CXLlrMQvcy3VeGEFVjd+BYpvwAAYkfIQFZ1IQdbpHnSHpX2guzLK8UmDAoGBANUy
+PIcf0EetUVHfkCIjNQfdMcjD8BTcLhsF9vWmcDxFTA9VB8ULf0D64mjt2f85yQsa
+b+EQN8KZ6alxMxuLOeRxFYLPj0F9o+Y/R8wHBV48kCKhz2r1v0b6SfQ/jSm1B61x
+BrxLW64qOdIOzS8bLyhUDKkrcPesr8V548aRtUKhAoGBAKlNJFd8BCGKD9Td+3dE
+oP1iHTX5XZ+cQIqL0e+GMQlK4HnQP566DFZU5/GHNNAfmyxd5iSRwhTqPMHRAmOb
+pqQwsyufx0dFeIBxeSO3Z6jW5h2sl4nBipZpw9bzv6EBL1xRr0SfMNZzdnf4JFzc
+0htGo/VO93Z2pv8w7uGUz1nN
+-----END PRIVATE KEY-----
diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java
index 0953e51484bd3..8d20650d76583 100644
--- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java
+++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java
@@ -181,7 +181,7 @@ private List processRequestsWithBody(
private List sendRequests(
final InetSocketAddress remoteAddress,
final Collection requests,
- boolean orderer
+ boolean ordered
) {
final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
try {
@@ -209,7 +209,7 @@ private List sendRequests(
)
.toArray(Mono[]::new);
- if (orderer == false) {
+ if (ordered == false) {
return ParallelFlux.from(monos).sequential().collectList().block();
} else {
return Flux.concat(monos).flatMapSequential(r -> Mono.just(r)).collectList().block();
diff --git a/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java b/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java
index ff86cbc04e240..b7a47b0f4c742 100644
--- a/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java
+++ b/server/src/main/java/org/opensearch/plugins/SecureHttpTransportSettingsProvider.java
@@ -27,6 +27,16 @@
*/
@ExperimentalApi
public interface SecureHttpTransportSettingsProvider {
+ /**
+ * The well-known name of header verifier {@link TransportAdapterProvider} provider instance
+ */
+ final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";
+
+ /**
+ * The well-known name of request decompressor {@link TransportAdapterProvider} provider instance
+ */
+ final String REQUEST_DECOMPRESSOR = "RequestDecompressor";
+
/**
* Collection of additional {@link TransportAdapterProvider}s that are specific to particular HTTP transport
* @param settings settings