Skip to content

Commit

Permalink
in_tcp: Add a capability to inject source IP
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Jul 10, 2023
1 parent fb7d4c8 commit b93950f
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 3 deletions.
5 changes: 5 additions & 0 deletions plugins/in_tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_tcp_config, buffer_size_str),
"Set the buffer size"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_in_tcp_config, source_address_key),
"Key where the source address will be injected"
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct flb_in_tcp_config {
char *tcp_port; /* TCP Port */
flb_sds_t raw_separator; /* Unescaped string delimiterr */
flb_sds_t separator; /* String delimiter */
flb_sds_t source_address_key; /* Source IP address */
int collector_id; /* Listener collector id */
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
Expand Down
130 changes: 127 additions & 3 deletions plugins/in_tcp/tcp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,84 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int append_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int result;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_NOT_MODIFIED;

return result;
}

result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
}

if (result != FLB_MAP_EXPAND_SUCCESS) {
result = FLB_MAP_EXPANSION_ERROR;

return result;
}

*result_buffer = modified_data_buffer;
*result_size = modified_data_size;

return result;
}

static inline int process_pack(struct tcp_conn *conn,
char *pack, size_t size)
{
int ret;
size_t off = 0;
msgpack_unpacked result;
msgpack_sbuffer sbuf;
msgpack_packer pck;
msgpack_object entry;
struct flb_in_tcp_config *ctx;
char *appended_address_buffer;
size_t appended_address_size;
char *source_address;
int i;
int len;

ctx = conn->ctx;

Expand All @@ -50,22 +120,72 @@ static inline int process_pack(struct tcp_conn *conn,
while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
entry = result.data;

appended_address_buffer = NULL;
source_address = NULL;

ret = flb_log_event_encoder_begin_record(ctx->log_encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(conn->connection);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (entry.type == MSGPACK_OBJECT_MAP) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
ctx->log_encoder, &entry);
if (source_address != NULL) {
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);

len = entry.via.map.size;
msgpack_pack_map(&pck, len);

for (i=0; i<len; i++) {
msgpack_pack_object(&pck, entry.via.map.ptr[i].key);
msgpack_pack_object(&pck, entry.via.map.ptr[i].val);
}

ret = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
sbuf.data,
sbuf.size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
msgpack_sbuffer_destroy(&sbuf);
}

if (ret == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret);
}

if (appended_address_buffer != NULL) {
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
ctx->log_encoder, &entry);
}
}
else if (entry.type == MSGPACK_OBJECT_ARRAY) {
ret = flb_log_event_encoder_append_body_values(
if (source_address != NULL) {
ret = flb_log_event_encoder_append_body_values(
ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key),
FLB_LOG_EVENT_CSTRING_VALUE(source_address));
}
else {
ret = flb_log_event_encoder_append_body_values(
ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry));
}
}
else {
ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE;
Expand All @@ -75,6 +195,10 @@ static inline int process_pack(struct tcp_conn *conn,
ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
}

if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
break;
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/in_tcp/tcp_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

#define FLB_IN_TCP_CHUNK "32768"

#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2

enum {
TCP_NEW = 1, /* it's a new connection */
TCP_CONNECTED = 2, /* MQTT connection per protocol spec OK */
Expand Down

0 comments on commit b93950f

Please sign in to comment.