Skip to content

Commit

Permalink
Ensure support of the transport-nio by security plugin (HTTP) (#16474)
Browse files Browse the repository at this point in the history
* Ensure support of the transport-nio by security plugin (HTTP)

Signed-off-by: Andriy Redko <[email protected]>

* Add header verifier and decompressor support of secure NIO transport variant

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Nov 5, 2024
1 parent 4c35a2b commit b25e10a
Show file tree
Hide file tree
Showing 20 changed files with 966 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
* @see <a href="https://github.com/opensearch-project/security/blob/d526c9f6c2a438c14db8b413148204510b9fe2e2/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java">SecuritySSLNettyHttpServerTransport</a>
*/
public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport {
public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";
public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor";
public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER;
public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR;

private static final Logger logger = LogManager.getLogger(SecureNetty4HttpServerTransport.class);
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,11 @@ private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitiali

private final CountDownLatch latch;
private final Collection<FullHttpResponse> content;
private final boolean secure;
private Http2SettingsHandler settingsHandler;

CountDownLatchHandlerHttp2(final CountDownLatch latch, final Collection<FullHttpResponse> content, final boolean secure) {
this.latch = latch;
this.content = content;
this.secure = secure;
}

@Override
Expand Down
5 changes: 1 addition & 4 deletions plugins/transport-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
}

tasks.named("dependencyLicenses").configure {
Expand Down Expand Up @@ -151,10 +152,6 @@ thirdPartyAudit {
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',

// from io.netty.channel.unix (netty)
'io.netty.channel.unix.FileDescriptor',
'io.netty.channel.unix.UnixChannel',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
d1171bb99411f282068f49d780cedf8c9adeabfd
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void testThatNioHttpServerSupportsPipelining() throws Exception {
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (NioHttpClient nettyHttpClient = new NioHttpClient()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
try (NioHttpClient client = NioHttpClient.http()) {
Collection<FullHttpResponse> responses = client.get(transportAddress.address(), requests);
assertThat(responses, hasSize(5));

Collection<String> opaqueIds = NioHttpClient.returnOpaqueIds(responses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.http.nio;

import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpPipelinedRequest;
Expand All @@ -44,6 +45,8 @@
import org.opensearch.nio.TaskScheduler;
import org.opensearch.nio.WriteOperation;

import javax.net.ssl.SSLEngine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -58,6 +61,7 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslHandler;

public class HttpReadWriteHandler implements NioChannelHandler {

Expand All @@ -77,6 +81,28 @@ public HttpReadWriteHandler(
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock
) {
this(
nioHttpChannel,
transport,
settings,
taskScheduler,
nanoClock,
null, /* no header verifier */
new HttpContentDecompressor(),
null /* no SSL/TLS */
);
}

HttpReadWriteHandler(
NioHttpChannel nioHttpChannel,
NioHttpServerTransport transport,
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock,
@Nullable ChannelHandler headerVerifier,
ChannelHandler decompressor,
@Nullable SSLEngine sslEngine
) {
this.nioHttpChannel = nioHttpChannel;
this.transport = transport;
Expand All @@ -85,14 +111,23 @@ public HttpReadWriteHandler(
this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());

List<ChannelHandler> handlers = new ArrayList<>(8);

SslHandler sslHandler = null;
if (sslEngine != null) {
sslHandler = new SslHandler(sslEngine);
}

HttpRequestDecoder decoder = new HttpRequestDecoder(
settings.getMaxInitialLineLength(),
settings.getMaxHeaderSize(),
settings.getMaxChunkSize()
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
handlers.add(decoder);
handlers.add(new HttpContentDecompressor());
if (headerVerifier != null) {
handlers.add(headerVerifier);
}
handlers.add(decompressor);
handlers.add(new HttpResponseEncoder());
handlers.add(new HttpObjectAggregator(settings.getMaxContentLength()));
if (settings.isCompression()) {
Expand All @@ -102,7 +137,7 @@ public HttpReadWriteHandler(
handlers.add(new NioHttpResponseCreator());
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));

adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0]));
adaptor.addCloseListener((v, e) -> nioHttpChannel.close());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.http.nio;

import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.nio.FlushOperation;
import org.opensearch.nio.Page;
import org.opensearch.nio.WriteOperation;
Expand All @@ -49,16 +50,21 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.ssl.SslHandler;

class NettyAdaptor {

private final EmbeddedChannel nettyChannel;
private final LinkedList<FlushOperation> flushOperations = new LinkedList<>();

NettyAdaptor(ChannelHandler... handlers) {
nettyChannel = new EmbeddedChannel();
nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
this(null, handlers);
}

NettyAdaptor(@Nullable SslHandler sslHandler, ChannelHandler... handlers) {
this.nettyChannel = new EmbeddedChannel();

nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// This is a little tricky. The embedded channel will complete the promise once it writes the message
Expand All @@ -75,12 +81,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
});
if (sslHandler != null) {
nettyChannel.pipeline().addAfter("write_captor", "ssl_handler", sslHandler);
}
nettyChannel.pipeline().addLast(handlers);
}

public void close() throws Exception {
assert flushOperations.isEmpty() : "Should close outbound operations before calling close";

final SslHandler sslHandler = (SslHandler) nettyChannel.pipeline().get("ssl_handler");
if (sslHandler != null) {
// The nettyChannel.close() or sslHandler.closeOutbound() futures will block indefinitely,
// removing the handler instead from the channel.
nettyChannel.pipeline().remove(sslHandler);
}

ChannelFuture closeFuture = nettyChannel.close();
// This should be safe as we are not a real network channel
closeFuture.await();
Expand Down
Loading

0 comments on commit b25e10a

Please sign in to comment.