From 574732257868eb0554dcf3b75e1e675966fa5491 Mon Sep 17 00:00:00 2001 From: Mikael Knutsson Date: Thu, 21 Dec 2017 14:18:55 +0800 Subject: [PATCH] Add SSL support for Elasticsearch output For some reason I cannot identify, I get an error on the setoption passthrough saying that the first parameter isn't of type tcp{any}. I can't really figure out why and I haven't spent a lot of time on it, but if anyone has any ideas, that'd be great. For now, I commented them out, but I understand if you do not want that merged. --- .../heka/output/elasticsearch_bulk_api.lua | 68 ++++++++++++++++++- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/elasticsearch/sandboxes/heka/output/elasticsearch_bulk_api.lua b/elasticsearch/sandboxes/heka/output/elasticsearch_bulk_api.lua index fc0d21599..8eb872042 100644 --- a/elasticsearch/sandboxes/heka/output/elasticsearch_bulk_api.lua +++ b/elasticsearch/sandboxes/heka/output/elasticsearch_bulk_api.lua @@ -13,6 +13,16 @@ ticker_interval = 10 -- flush every 10 seconds or flush_count (50000) messages memory_limit = 200e6 address = "127.0.0.1" +ssl_params = { + mode = "client", + protocol = "tlsv1_2", + key = "/etc/hindsight/certs/client-key.pem", + certificate = "/etc/hindsight/certs/client.pem", + cafile = "/etc/hindsight/certs/ca.pem", + verify = {"peer", "fail_if_no_peer_cert"}, + options = {"all", "no_sslv3"} +} +-- set tls_params if you want to enable tls connection port = 9200 timeout = 10 -- socket timeout flush_count = 50000 @@ -46,6 +56,7 @@ local timeout = read_config("timeout") or 10 local discard = read_config("discard_on_error") local abort = read_config("abort_on_error") local max_retry = read_config("max_retry") or 0 +local ssl_params = read_config("ssl_params") or false assert(not (abort and discard), "abort_on_error and discard_on_error are mutually exclusive") local encoder_module = read_config("encoder_module") or "encoders.elasticsearch.payload" @@ -60,11 +71,62 @@ local ticker_interval = read_config("ticker_interval") local flush_count = read_config("flush_count") or 50000 assert(flush_count > 0, "flush_count must be greater than zero") +local ssl, ssl_ctx +if ssl_params then + ssl = require "ssl" + -- Force client mode + ssl_params.mode = "client" + ssl_ctx = assert(ssl.newcontext(ssl_params)) +end + +-- lifted straight from luasec https module +-- Forward calls to the real connection object. +local function reg(conn) + local mt = getmetatable(conn.sock).__index + for name, method in pairs(mt) do + if type(method) == "function" then + conn[name] = function (self, ...) + return method(self.sock, ...) + end + end + end +end +local function ssl_socket(params) + -- 'create' function for LuaSocket + return function () + local conn = {} + conn.sock = socket.try(socket.tcp()) + local so = getmetatable(conn.sock).__index.setoption + function conn:setoption(...) + return so(self.sock, ...) + end + local st = getmetatable(conn.sock).__index.settimeout + function conn:settimeout(...) + return st(self.sock, ...) + end + -- Replace TCP's connection function + function conn:connect(host, port) + socket.try(self.sock:connect(host, port)) + self.sock = socket.try(ssl.wrap(self.sock, params)) + self.sock:sni(host) + socket.try(self.sock:dohandshake()) + reg(self, getmetatable(self.sock)) + return 1 + end + return conn + end +end + local client local function create_client() - local client = http.open(address, port) - client.c:setoption("tcp-nodelay", true) - client.c:setoption("keepalive", true) + local client + if ssl_params then + client = http.open(address, port, ssl_socket(ssl_ctx)) + else + client = http.open(address, port) + end + --client.c:setoption("tcp-nodelay", true) + --client.c:setoption("keepalive", true) client.c:settimeout(timeout) return client end