Skip to content

Commit

Permalink
Initial support for GELF HTTP transport
Browse files Browse the repository at this point in the history
Closes #9
  • Loading branch information
Jochen Schalanda committed Feb 5, 2018
1 parent a55396e commit dcd71d0
Show file tree
Hide file tree
Showing 9 changed files with 649 additions and 3 deletions.
17 changes: 15 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,25 @@
<gpg.keyname>B1606F22</gpg.keyname>

<slf4j.version>1.7.25</slf4j.version>
<netty.version>4.1.20.Final</netty.version>
<jackson.version>2.8.9</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.20.Final</version>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.9</version>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -129,6 +136,12 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/graylog2/gelfclient/GelfConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;

/**
* The configuration used by a {@link org.graylog2.gelfclient.transport.GelfTransport}.
*/
public class GelfConfiguration {
private static final int DEFAULT_PORT = 12201;
private static final String DEFAULT_HOSTNAME = "127.0.0.1";

private final String hostname;
private final int port;
private URI uri = URI.create("http://127.0.0.1:12201/gelf");
private GelfTransports transport = GelfTransports.TCP;
private Compression compression = Compression.GZIP;
private int queueSize = 512;
Expand Down Expand Up @@ -114,6 +117,15 @@ public InetSocketAddress getRemoteAddress() {
return new InetSocketAddress(hostname, port);
}

public URI getUri() {
return uri;
}

public GelfConfiguration uri(URI uri) {
this.uri = uri;
return this;
}

/**
* Get the transport protocol used with the GELF server.
*
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/graylog2/gelfclient/GelfTransports.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.graylog2.gelfclient;

import org.graylog2.gelfclient.transport.GelfHttpTransport;
import org.graylog2.gelfclient.transport.GelfTcpTransport;
import org.graylog2.gelfclient.transport.GelfTransport;
import org.graylog2.gelfclient.transport.GelfUdpTransport;
Expand All @@ -25,7 +26,8 @@
*/
public enum GelfTransports {
TCP,
UDP;
UDP,
HTTP;

/**
* Creates a {@link GelfTransport} from the given protocol and configuration.
Expand All @@ -44,6 +46,9 @@ public static GelfTransport create(final GelfTransports transport, final GelfCon
case UDP:
gelfTransport = new GelfUdpTransport(config);
break;
case HTTP:
gelfTransport = new GelfHttpTransport(config);
break;
default:
throw new IllegalArgumentException("Unsupported GELF transport: " + transport);
}
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/org/graylog2/gelfclient/encoder/GelfHttpEncoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.graylog2.gelfclient.encoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.List;

public class GelfHttpEncoder extends MessageToMessageEncoder<ByteBuf> {
private static final Logger LOG = LoggerFactory.getLogger(GelfHttpEncoder.class);

private final URI uri;

public GelfHttpEncoder(URI uri) {
this.uri = uri;
}

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List<Object> list) throws Exception {
final FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath(), msg.retain());
request.headers().set(HttpHeaderNames.HOST, uri.getHost());
request.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.readableBytes());
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

list.add(request);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("Error while encoding HTTP request", cause);
ctx.close();
}
}
134 changes: 134 additions & 0 deletions src/main/java/org/graylog2/gelfclient/transport/GelfHttpTransport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2018 Graylog, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.graylog2.gelfclient.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.graylog2.gelfclient.GelfConfiguration;
import org.graylog2.gelfclient.encoder.GelfHttpEncoder;
import org.graylog2.gelfclient.encoder.GelfMessageJsonEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link GelfTransport} implementation that uses HTTP(S) to send GELF messages.
* <p>This class is thread-safe.</p>
*/
public class GelfHttpTransport extends AbstractGelfTransport {
private static final Logger LOG = LoggerFactory.getLogger(GelfHttpTransport.class);

/**
* Creates a new TCP GELF transport.
*
* @param config the GELF client configuration
*/
public GelfHttpTransport(GelfConfiguration config) {
super(config);
}

@Override
protected void createBootstrap(final EventLoopGroup workerGroup) {
final Bootstrap bootstrap = new Bootstrap();
final GelfSenderThread senderThread = new GelfSenderThread(queue, config.getMaxInflightSends());

bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout())
.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive())
.remoteAddress(config.getRemoteAddress())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (config.isTlsEnabled()) {
LOG.debug("TLS enabled.");
final SslContext sslContext;

if (!config.isTlsCertVerificationEnabled()) {
// If the cert should not be verified just use an insecure trust manager.
LOG.debug("TLS certificate verification disabled!");
sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} else if (config.getTlsTrustCertChainFile() != null) {
// If a cert chain file is set, use it.
LOG.debug("TLS certificate chain file: {}", config.getTlsTrustCertChainFile());
sslContext = SslContextBuilder.forClient()
.trustManager(config.getTlsTrustCertChainFile())
.build();
} else {
// Otherwise use the JVM default cert chain.
sslContext = SslContextBuilder.forClient().build();
}

ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}

ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpContentDecompressor());
ch.pipeline().addLast(new GelfHttpEncoder(config.getUri()));
ch.pipeline().addLast(new GelfMessageJsonEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
senderThread.start(ctx.channel());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channel disconnected!");
senderThread.stop();
scheduleReconnect(ctx.channel().eventLoop());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("Exception caught", cause);
}
});
}
});

if (config.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, config.getSendBufferSize());
}

bootstrap.connect().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOG.debug("Connected!");
} else {
LOG.error("Connection failed: {}", future.cause().getMessage());
scheduleReconnect(future.channel().eventLoop());
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2018 Graylog, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.graylog2.gelfclient.encoder;

import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import org.testng.annotations.Test;

import java.net.URI;
import java.nio.charset.StandardCharsets;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

public class GelfHttpEncoderTest {

@Test(expectedExceptions = EncoderException.class)
public void testExceptionIsPassedThrough() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel(new GelfHttpEncoder(null));
channel.writeOutbound(Unpooled.EMPTY_BUFFER);
}

@Test
public void testEncode() throws Exception {
final URI uri = URI.create("http://example.org:8080/gelf");
final EmbeddedChannel channel = new EmbeddedChannel(new GelfHttpEncoder(uri));
assertTrue(channel.writeOutbound(Unpooled.copiedBuffer("{}", StandardCharsets.UTF_8)));
assertTrue(channel.finish());

final FullHttpRequest request = channel.readOutbound();
assertEquals(HttpMethod.POST, request.method());
assertEquals("/gelf", request.uri());
assertEquals("application/json", request.headers().get(HttpHeaderNames.CONTENT_TYPE));
assertEquals("2", request.headers().get(HttpHeaderNames.CONTENT_LENGTH));

final byte[] bytes = ByteBufUtil.getBytes(request.content());
assertEquals(new byte[]{'{', '}'}, bytes);
}
}
Loading

0 comments on commit dcd71d0

Please sign in to comment.