diff --git a/pom.xml b/pom.xml index 10c84c8..1d649ab 100644 --- a/pom.xml +++ b/pom.xml @@ -93,18 +93,25 @@ B1606F22 1.7.25 + 4.1.20.Final + 2.8.9 io.netty netty-handler - 4.1.20.Final + ${netty.version} + + + io.netty + netty-codec-http + ${netty.version} com.fasterxml.jackson.core jackson-core - 2.8.9 + ${jackson.version} org.slf4j @@ -129,6 +136,12 @@ ${slf4j.version} test + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + test + diff --git a/src/main/java/org/graylog2/gelfclient/GelfConfiguration.java b/src/main/java/org/graylog2/gelfclient/GelfConfiguration.java index 4663a0a..cae574e 100644 --- a/src/main/java/org/graylog2/gelfclient/GelfConfiguration.java +++ b/src/main/java/org/graylog2/gelfclient/GelfConfiguration.java @@ -18,6 +18,7 @@ import java.io.File; import java.net.InetSocketAddress; +import java.net.URI; /** * The configuration used by a {@link org.graylog2.gelfclient.transport.GelfTransport}. @@ -25,8 +26,10 @@ public class GelfConfiguration { private static final int DEFAULT_PORT = 12201; private static final String DEFAULT_HOSTNAME = "127.0.0.1"; + private final String hostname; private final int port; + private URI uri = URI.create("http://127.0.0.1:12201/gelf"); private GelfTransports transport = GelfTransports.TCP; private Compression compression = Compression.GZIP; private int queueSize = 512; @@ -114,6 +117,15 @@ public InetSocketAddress getRemoteAddress() { return new InetSocketAddress(hostname, port); } + public URI getUri() { + return uri; + } + + public GelfConfiguration uri(URI uri) { + this.uri = uri; + return this; + } + /** * Get the transport protocol used with the GELF server. * diff --git a/src/main/java/org/graylog2/gelfclient/GelfTransports.java b/src/main/java/org/graylog2/gelfclient/GelfTransports.java index 8b2799c..ee7ce58 100644 --- a/src/main/java/org/graylog2/gelfclient/GelfTransports.java +++ b/src/main/java/org/graylog2/gelfclient/GelfTransports.java @@ -16,6 +16,7 @@ package org.graylog2.gelfclient; +import org.graylog2.gelfclient.transport.GelfHttpTransport; import org.graylog2.gelfclient.transport.GelfTcpTransport; import org.graylog2.gelfclient.transport.GelfTransport; import org.graylog2.gelfclient.transport.GelfUdpTransport; @@ -25,7 +26,8 @@ */ public enum GelfTransports { TCP, - UDP; + UDP, + HTTP; /** * Creates a {@link GelfTransport} from the given protocol and configuration. @@ -44,6 +46,9 @@ public static GelfTransport create(final GelfTransports transport, final GelfCon case UDP: gelfTransport = new GelfUdpTransport(config); break; + case HTTP: + gelfTransport = new GelfHttpTransport(config); + break; default: throw new IllegalArgumentException("Unsupported GELF transport: " + transport); } diff --git a/src/main/java/org/graylog2/gelfclient/encoder/GelfHttpEncoder.java b/src/main/java/org/graylog2/gelfclient/encoder/GelfHttpEncoder.java new file mode 100644 index 0000000..e459c7d --- /dev/null +++ b/src/main/java/org/graylog2/gelfclient/encoder/GelfHttpEncoder.java @@ -0,0 +1,44 @@ +package org.graylog2.gelfclient.encoder; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.List; + +public class GelfHttpEncoder extends MessageToMessageEncoder { + private static final Logger LOG = LoggerFactory.getLogger(GelfHttpEncoder.class); + + private final URI uri; + + public GelfHttpEncoder(URI uri) { + this.uri = uri; + } + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List list) throws Exception { + final FullHttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath(), msg.retain()); + request.headers().set(HttpHeaderNames.HOST, uri.getHost()); + request.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON); + request.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.readableBytes()); + request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + + list.add(request); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.error("Error while encoding HTTP request", cause); + ctx.close(); + } +} diff --git a/src/main/java/org/graylog2/gelfclient/transport/GelfHttpTransport.java b/src/main/java/org/graylog2/gelfclient/transport/GelfHttpTransport.java new file mode 100644 index 0000000..d3b8dd3 --- /dev/null +++ b/src/main/java/org/graylog2/gelfclient/transport/GelfHttpTransport.java @@ -0,0 +1,134 @@ +/* + * Copyright 2018 Graylog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.graylog2.gelfclient.transport; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.graylog2.gelfclient.GelfConfiguration; +import org.graylog2.gelfclient.encoder.GelfHttpEncoder; +import org.graylog2.gelfclient.encoder.GelfMessageJsonEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link GelfTransport} implementation that uses HTTP(S) to send GELF messages. + *

This class is thread-safe.

+ */ +public class GelfHttpTransport extends AbstractGelfTransport { + private static final Logger LOG = LoggerFactory.getLogger(GelfHttpTransport.class); + + /** + * Creates a new TCP GELF transport. + * + * @param config the GELF client configuration + */ + public GelfHttpTransport(GelfConfiguration config) { + super(config); + } + + @Override + protected void createBootstrap(final EventLoopGroup workerGroup) { + final Bootstrap bootstrap = new Bootstrap(); + final GelfSenderThread senderThread = new GelfSenderThread(queue, config.getMaxInflightSends()); + + bootstrap.group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()) + .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()) + .option(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive()) + .remoteAddress(config.getRemoteAddress()) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + if (config.isTlsEnabled()) { + LOG.debug("TLS enabled."); + final SslContext sslContext; + + if (!config.isTlsCertVerificationEnabled()) { + // If the cert should not be verified just use an insecure trust manager. + LOG.debug("TLS certificate verification disabled!"); + sslContext = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } else if (config.getTlsTrustCertChainFile() != null) { + // If a cert chain file is set, use it. + LOG.debug("TLS certificate chain file: {}", config.getTlsTrustCertChainFile()); + sslContext = SslContextBuilder.forClient() + .trustManager(config.getTlsTrustCertChainFile()) + .build(); + } else { + // Otherwise use the JVM default cert chain. + sslContext = SslContextBuilder.forClient().build(); + } + + ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); + } + + ch.pipeline().addLast(new HttpClientCodec()); + ch.pipeline().addLast(new HttpContentDecompressor()); + ch.pipeline().addLast(new GelfHttpEncoder(config.getUri())); + ch.pipeline().addLast(new GelfMessageJsonEncoder()); + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + senderThread.start(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channel disconnected!"); + senderThread.stop(); + scheduleReconnect(ctx.channel().eventLoop()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.error("Exception caught", cause); + } + }); + } + }); + + if (config.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, config.getSendBufferSize()); + } + + bootstrap.connect().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + LOG.debug("Connected!"); + } else { + LOG.error("Connection failed: {}", future.cause().getMessage()); + scheduleReconnect(future.channel().eventLoop()); + } + } + }); + } +} diff --git a/src/test/java/org/graylog2/gelfclient/encoder/GelfHttpEncoderTest.java b/src/test/java/org/graylog2/gelfclient/encoder/GelfHttpEncoderTest.java new file mode 100644 index 0000000..e3255fa --- /dev/null +++ b/src/test/java/org/graylog2/gelfclient/encoder/GelfHttpEncoderTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2018 Graylog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.graylog2.gelfclient.encoder; + +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.EncoderException; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import org.testng.annotations.Test; + +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +public class GelfHttpEncoderTest { + + @Test(expectedExceptions = EncoderException.class) + public void testExceptionIsPassedThrough() throws Exception { + final EmbeddedChannel channel = new EmbeddedChannel(new GelfHttpEncoder(null)); + channel.writeOutbound(Unpooled.EMPTY_BUFFER); + } + + @Test + public void testEncode() throws Exception { + final URI uri = URI.create("http://example.org:8080/gelf"); + final EmbeddedChannel channel = new EmbeddedChannel(new GelfHttpEncoder(uri)); + assertTrue(channel.writeOutbound(Unpooled.copiedBuffer("{}", StandardCharsets.UTF_8))); + assertTrue(channel.finish()); + + final FullHttpRequest request = channel.readOutbound(); + assertEquals(HttpMethod.POST, request.method()); + assertEquals("/gelf", request.uri()); + assertEquals("application/json", request.headers().get(HttpHeaderNames.CONTENT_TYPE)); + assertEquals("2", request.headers().get(HttpHeaderNames.CONTENT_LENGTH)); + + final byte[] bytes = ByteBufUtil.getBytes(request.content()); + assertEquals(new byte[]{'{', '}'}, bytes); + } +} \ No newline at end of file diff --git a/src/test/java/org/graylog2/gelfclient/transport/GelfHttpTransportTest.java b/src/test/java/org/graylog2/gelfclient/transport/GelfHttpTransportTest.java new file mode 100644 index 0000000..0d7aed0 --- /dev/null +++ b/src/test/java/org/graylog2/gelfclient/transport/GelfHttpTransportTest.java @@ -0,0 +1,141 @@ +/* + * Copyright 2018 Graylog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.graylog2.gelfclient.transport; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.logging.LoggingHandler; +import org.graylog2.gelfclient.GelfConfiguration; +import org.graylog2.gelfclient.GelfMessage; +import org.graylog2.gelfclient.GelfTransports; +import org.graylog2.gelfclient.util.Uninterruptibles; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +public class GelfHttpTransportTest { + + private EventLoopGroup eventLoopGroup; + private Channel channel; + private HttpRequestHandler requestHandler; + + @BeforeMethod + public void setUp() throws Exception { + eventLoopGroup = new NioEventLoopGroup(); + requestHandler = new HttpRequestHandler(); + final ServerBootstrap bootstrap = new ServerBootstrap() + .group(eventLoopGroup) + .channel(NioServerSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(ServerSocketChannel ch) throws Exception { + ch.pipeline().addLast("logging", new LoggingHandler()); + } + }) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(102400)); + ch.pipeline().addLast(requestHandler); + } + }); + channel = bootstrap.bind(InetAddress.getLoopbackAddress(), 0).sync().channel(); + + } + + @AfterMethod + public void tearDown() throws Exception { + channel.close(); + channel.closeFuture().syncUninterruptibly(); + eventLoopGroup.shutdownGracefully(); + } + + @Test + public void gelfHttp() throws Exception { + final GelfMessage message = new GelfMessage("Test", "example.org"); + final InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); + + final URI uri = new URI("http://" + localAddress.getHostString() + ":" + localAddress.getPort()); + final GelfConfiguration configuration = new GelfConfiguration(localAddress) + .transport(GelfTransports.HTTP) + .uri(uri); + final GelfTransport transport = GelfTransports.create(configuration); + transport.send(message); + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + assertFalse(requestHandler.getRequests().isEmpty()); + + final FullHttpRequest request = requestHandler.getRequests().get(0); + assertEquals(request.method(), HttpMethod.POST); + assertEquals(request.headers().get(HttpHeaderNames.CONTENT_TYPE), HttpHeaderValues.APPLICATION_JSON.toString()); + + final byte[] payload = ByteBufUtil.getBytes(request.content()); + final ObjectMapper objectMapper = new ObjectMapper(); + final JsonNode tree = objectMapper.readTree(payload); + assertEquals(tree.path("version").asText(), "1.1"); + assertEquals(tree.path("host").asText(), "example.org"); + assertEquals(tree.path("short_message").asText(), "Test"); + + transport.stop(); + } + + public static class HttpRequestHandler extends SimpleChannelInboundHandler { + private final List requests = new ArrayList<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + requests.add(msg.retain()); + } + + public List getRequests() { + return requests; + } + } +} \ No newline at end of file diff --git a/src/test/java/org/graylog2/gelfclient/transport/GelfTcpTransportTest.java b/src/test/java/org/graylog2/gelfclient/transport/GelfTcpTransportTest.java new file mode 100644 index 0000000..de3b549 --- /dev/null +++ b/src/test/java/org/graylog2/gelfclient/transport/GelfTcpTransportTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2018 Graylog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.graylog2.gelfclient.transport; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LoggingHandler; +import org.graylog2.gelfclient.GelfConfiguration; +import org.graylog2.gelfclient.GelfMessage; +import org.graylog2.gelfclient.GelfTransports; +import org.graylog2.gelfclient.util.Uninterruptibles; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +public class GelfTcpTransportTest { + + private EventLoopGroup eventLoopGroup; + private Channel channel; + private TcpRequestHandler requestHandler; + + @BeforeMethod + public void setUp() throws Exception { + eventLoopGroup = new NioEventLoopGroup(); + requestHandler = new TcpRequestHandler(); + final ServerBootstrap bootstrap = new ServerBootstrap() + .group(eventLoopGroup) + .channel(NioServerSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(ServerSocketChannel ch) throws Exception { + ch.pipeline().addLast("logging", new LoggingHandler()); + } + }) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(requestHandler); + } + }); + channel = bootstrap.bind(InetAddress.getLoopbackAddress(), 0).sync().channel(); + + } + + @AfterMethod + public void tearDown() throws Exception { + channel.close(); + channel.closeFuture().syncUninterruptibly(); + eventLoopGroup.shutdownGracefully(); + } + + @Test + public void gelfTcp() throws Exception { + final GelfMessage message = new GelfMessage("Test", "example.org"); + final InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); + final GelfConfiguration configuration = new GelfConfiguration(localAddress) + .transport(GelfTransports.TCP); + final GelfTransport transport = GelfTransports.create(configuration); + transport.send(message); + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + assertFalse(requestHandler.getRequests().isEmpty()); + + final byte[] payload = requestHandler.getRequests().get(0); + final ObjectMapper objectMapper = new ObjectMapper(); + final JsonNode tree = objectMapper.readTree(payload); + assertEquals(tree.path("version").asText(), "1.1"); + assertEquals(tree.path("host").asText(), "example.org"); + assertEquals(tree.path("short_message").asText(), "Test"); + + transport.stop(); + } + + public static class TcpRequestHandler extends SimpleChannelInboundHandler { + private final List requests = new ArrayList<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + requests.add(ByteBufUtil.getBytes(msg)); + } + + public List getRequests() { + return requests; + } + } +} \ No newline at end of file diff --git a/src/test/java/org/graylog2/gelfclient/transport/GelfUdpTransportTest.java b/src/test/java/org/graylog2/gelfclient/transport/GelfUdpTransportTest.java new file mode 100644 index 0000000..6ca416e --- /dev/null +++ b/src/test/java/org/graylog2/gelfclient/transport/GelfUdpTransportTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2018 Graylog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.graylog2.gelfclient.transport; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.codec.DatagramPacketDecoder; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.logging.LoggingHandler; +import org.graylog2.gelfclient.Compression; +import org.graylog2.gelfclient.GelfConfiguration; +import org.graylog2.gelfclient.GelfMessage; +import org.graylog2.gelfclient.GelfTransports; +import org.graylog2.gelfclient.util.Uninterruptibles; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +public class GelfUdpTransportTest { + + private EventLoopGroup eventLoopGroup; + private Channel channel; + private UdpRequestHandler requestHandler; + + @BeforeMethod + public void setUp() throws Exception { + eventLoopGroup = new NioEventLoopGroup(); + requestHandler = new UdpRequestHandler(); + final Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(NioDatagramChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(NioDatagramChannel ch) throws Exception { + ch.pipeline().addLast("logging", new LoggingHandler()); + ch.pipeline().addLast(new DatagramPacketDecoder(requestHandler)); + } + }); + channel = bootstrap.bind(InetAddress.getLoopbackAddress(), 0).sync().channel(); + + } + + @AfterMethod + public void tearDown() throws Exception { + channel.close(); + channel.closeFuture().syncUninterruptibly(); + eventLoopGroup.shutdownGracefully(); + } + + @Test + public void gelfUdpUncompressed() throws Exception { + final GelfMessage message = new GelfMessage("Test", "example.org"); + final InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); + final GelfConfiguration configuration = new GelfConfiguration(localAddress) + .transport(GelfTransports.UDP) + .compression(Compression.NONE); + final GelfTransport transport = GelfTransports.create(configuration); + transport.send(message); + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + assertFalse(requestHandler.getRequests().isEmpty()); + + final byte[] payload = requestHandler.getRequests().get(0); + final ObjectMapper objectMapper = new ObjectMapper(); + final JsonNode tree = objectMapper.readTree(payload); + assertEquals(tree.path("version").asText(), "1.1"); + assertEquals(tree.path("host").asText(), "example.org"); + assertEquals(tree.path("short_message").asText(), "Test"); + + transport.stop(); + } + + public static class UdpRequestHandler extends MessageToMessageDecoder { + private final List requests = new ArrayList<>(); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + requests.add(ByteBufUtil.getBytes(msg)); + } + + public List getRequests() { + return requests; + } + } +} \ No newline at end of file