Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSL support for Elasticsearch output #201

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions elasticsearch/sandboxes/heka/output/elasticsearch_bulk_api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ ticker_interval = 10 -- flush every 10 seconds or flush_count (50000) messages
memory_limit = 200e6

address = "127.0.0.1"
ssl_params = {
mode = "client",
protocol = "tlsv1_2",
key = "/etc/hindsight/certs/client-key.pem",
certificate = "/etc/hindsight/certs/client.pem",
cafile = "/etc/hindsight/certs/ca.pem",
verify = {"peer", "fail_if_no_peer_cert"},
options = {"all", "no_sslv3"}
}
-- set tls_params if you want to enable tls connection
port = 9200
timeout = 10 -- socket timeout
flush_count = 50000
Expand Down Expand Up @@ -46,6 +56,7 @@ local timeout = read_config("timeout") or 10
local discard = read_config("discard_on_error")
local abort = read_config("abort_on_error")
local max_retry = read_config("max_retry") or 0
local ssl_params = read_config("ssl_params") or false
assert(not (abort and discard), "abort_on_error and discard_on_error are mutually exclusive")

local encoder_module = read_config("encoder_module") or "encoders.elasticsearch.payload"
Expand All @@ -60,11 +71,62 @@ local ticker_interval = read_config("ticker_interval")
local flush_count = read_config("flush_count") or 50000
assert(flush_count > 0, "flush_count must be greater than zero")

local ssl, ssl_ctx
if ssl_params then
ssl = require "ssl"
-- Force client mode
ssl_params.mode = "client"
ssl_ctx = assert(ssl.newcontext(ssl_params))
end

-- lifted straight from luasec https module
-- Forward calls to the real connection object.
local function reg(conn)
local mt = getmetatable(conn.sock).__index
for name, method in pairs(mt) do
if type(method) == "function" then
conn[name] = function (self, ...)
return method(self.sock, ...)
end
end
end
end
local function ssl_socket(params)
-- 'create' function for LuaSocket
return function ()
local conn = {}
conn.sock = socket.try(socket.tcp())
local so = getmetatable(conn.sock).__index.setoption
function conn:setoption(...)
return so(self.sock, ...)
end
local st = getmetatable(conn.sock).__index.settimeout
function conn:settimeout(...)
return st(self.sock, ...)
end
-- Replace TCP's connection function
function conn:connect(host, port)
socket.try(self.sock:connect(host, port))
self.sock = socket.try(ssl.wrap(self.sock, params))
self.sock:sni(host)
socket.try(self.sock:dohandshake())
reg(self, getmetatable(self.sock))
return 1
end
return conn
end
end

local client
local function create_client()
local client = http.open(address, port)
client.c:setoption("tcp-nodelay", true)
client.c:setoption("keepalive", true)
local client
if ssl_params then
client = http.open(address, port, ssl_socket(ssl_ctx))
else
client = http.open(address, port)
end
--client.c:setoption("tcp-nodelay", true)
--client.c:setoption("keepalive", true)
client.c:settimeout(timeout)
return client
end
Expand Down