From 2a39afd6391098c116a708a97c12a767fa7fe0dd Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 28 Sep 2023 14:25:55 +0900 Subject: [PATCH 1/4] bin: windows: Restore Ctrl-C behavior on windows After supporting fleet management on Windows, Ctrl-C events won't be caught up. This commit re-enables for the behavior. Windows' ctrl event handlers stolen signals which are overlapped ones. So, we need to handle Ctrl-C events on the newly added event handler for Windows. In Windows, SIGINT and SIGBREAK are valid ctrl events on their terminal. This is why we only need to handle CTRL_C_EVENT and CTRL_BREAK_EVENT events on console handler. Signed-off-by: Hiroshi Hatake --- src/fluent-bit.c | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 51b814cfd02..141839dbbe9 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -550,7 +550,7 @@ static void flb_signal_exit(int signal) }; } -static void flb_signal_handler(int signal) +static void flb_signal_handler_status_line() { int len; char ts[32]; @@ -573,6 +573,12 @@ static void flb_signal_handler(int signal) /* write signal number */ write(STDERR_FILENO, ts, len); write(STDERR_FILENO, s, sizeof(s) - 1); +} + +static void flb_signal_handler(int signal) +{ + flb_signal_handler_status_line(); + switch (signal) { flb_print_signal(SIGINT); #ifndef FLB_SYSTEM_WINDOWS @@ -632,6 +638,15 @@ void flb_console_handler_set_ctx(flb_ctx_t *ctx, struct flb_cf *cf_opts) static BOOL WINAPI flb_console_handler(DWORD evType) { switch(evType) { + case 0 /* CTRL_C_EVENT_0 */: + flb_signal_handler_status_line(); + write (STDERR_FILENO, "SIGINT)\n", sizeof("SIGINT)\n")-1); + /* signal the main loop to execute reload even if CTRL_C event. + * This is necessary because all signal handlers in win32 + * are executed on their own thread. + */ + handler_signal = 2; + break; case 1 /* CTRL_BREAK_EVENT_1 */: if (flb_bin_restarting == FLB_RELOAD_IDLE) { flb_bin_restarting = FLB_RELOAD_IN_PROGRESS; @@ -1366,6 +1381,10 @@ int flb_main(int argc, char **argv) handler_signal = 0; flb_reload(ctx, cf_opts); } + else if (handler_signal == 2){ + handler_signal = 0; + break; + } #endif /* set the context again before checking the status again */ From e1cc2241f4e412221ca49d3c58c58906f90248ec Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 28 Sep 2023 16:40:31 +0900 Subject: [PATCH 2/4] bin: Fix build on non-Windows Signed-off-by: Hiroshi Hatake --- src/fluent-bit.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 141839dbbe9..e4ee6c5ca64 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -550,7 +550,7 @@ static void flb_signal_exit(int signal) }; } -static void flb_signal_handler_status_line() +static void flb_signal_handler_status_line(struct flb_cf *cf_opts) { int len; char ts[32]; @@ -558,7 +558,6 @@ static void flb_signal_handler_status_line() time_t now; struct tm *cur; flb_ctx_t *ctx = flb_context_get(); - struct flb_cf *cf_opts = flb_cf_context_get(); now = time(NULL); cur = localtime(&now); @@ -577,7 +576,8 @@ static void flb_signal_handler_status_line() static void flb_signal_handler(int signal) { - flb_signal_handler_status_line(); + struct flb_cf *cf_opts = flb_cf_context_get(); + flb_signal_handler_status_line(cf_opts); switch (signal) { flb_print_signal(SIGINT); @@ -637,9 +637,12 @@ void flb_console_handler_set_ctx(flb_ctx_t *ctx, struct flb_cf *cf_opts) static BOOL WINAPI flb_console_handler(DWORD evType) { + struct flb_cf *cf_opts; + switch(evType) { case 0 /* CTRL_C_EVENT_0 */: - flb_signal_handler_status_line(); + cf_opts = flb_cf_context_get(); + flb_signal_handler_status_line(cf_opts); write (STDERR_FILENO, "SIGINT)\n", sizeof("SIGINT)\n")-1); /* signal the main loop to execute reload even if CTRL_C event. * This is necessary because all signal handlers in win32 From 7833942a7ed537e6e52a47e3c979f773de022c59 Mon Sep 17 00:00:00 2001 From: Masoud Koleini <7603254+koleini@users.noreply.github.com> Date: Tue, 17 Oct 2023 09:20:01 -0500 Subject: [PATCH 3/4] sp: fix missing key names when query contains GROUP BY (#8028) Signed-off-by: Masoud Koleini --- src/stream_processor/flb_sp.c | 39 ++++++++++++--------- src/stream_processor/parser/flb_sp_parser.c | 2 +- tests/internal/include/sp_cb_functions.h | 16 ++++++++- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/stream_processor/flb_sp.c b/src/stream_processor/flb_sp.c index 00eb2f18b78..41a7ac8f432 100644 --- a/src/stream_processor/flb_sp.c +++ b/src/stream_processor/flb_sp.c @@ -266,8 +266,8 @@ static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd) } /* - * if some aggregated function is required, not aggregated keys are - * not allowed so we return an error (-1). + * If aggregated functions are included in the query, non-aggregated keys are + * not allowed (except for the ones inside GROUP BY statement). */ if (aggr > 0 && not_aggr == 0) { return aggr; @@ -490,7 +490,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, /* Check and validate aggregated keys */ ret = sp_cmd_aggregated_keys(task->cmd); if (ret == -1) { - flb_error("[sp] aggregated query cannot mix not aggregated keys: %s", + flb_error("[sp] aggregated query cannot include the aggregated keys: %s", query); flb_sp_task_destroy(task); return NULL; @@ -506,10 +506,10 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, event = &task->window.event; MK_EVENT_ZERO(event); - /* Run every 'size' seconds */ + /* Run every 'window size' seconds */ fd = mk_event_timeout_create(sp->config->evl, cmd->window.size, (long) 0, - &task->window.event); + event); if (fd == -1) { flb_error("[sp] registration for task %s failed", task->name); flb_free(task); @@ -525,7 +525,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, /* Run every 'size' seconds */ fd = mk_event_timeout_create(sp->config->evl, cmd->window.advance_by, (long) 0, - &task->window.event_hop); + event); if (fd == -1) { flb_error("[sp] registration for task %s failed", task->name); flb_free(task); @@ -624,8 +624,7 @@ void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd, flb_free(aggr_node); } -void flb_sp_window_destroy(struct flb_sp_cmd *cmd, - struct flb_sp_task_window *window) +void flb_sp_window_destroy(struct flb_sp_task *task) { struct flb_sp_window_data *data; struct aggregate_node *aggr_node; @@ -635,39 +634,45 @@ void flb_sp_window_destroy(struct flb_sp_cmd *cmd, struct mk_list *head_hs; struct mk_list *tmp_hs; - mk_list_foreach_safe(head, tmp, &window->data) { + mk_list_foreach_safe(head, tmp, &task->window.data) { data = mk_list_entry(head, struct flb_sp_window_data, _head); flb_free(data->buf_data); mk_list_del(&data->_head); flb_free(data); } - mk_list_foreach_safe(head, tmp, &window->aggregate_list) { + mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) { aggr_node = mk_list_entry(head, struct aggregate_node, _head); mk_list_del(&aggr_node->_head); - flb_sp_aggregate_node_destroy(cmd, aggr_node); + flb_sp_aggregate_node_destroy(task->cmd, aggr_node); } - mk_list_foreach_safe(head, tmp, &window->hopping_slot) { + mk_list_foreach_safe(head, tmp, &task->window.hopping_slot) { hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head); mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) { aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head); mk_list_del(&aggr_node->_head); - flb_sp_aggregate_node_destroy(cmd, aggr_node); + flb_sp_aggregate_node_destroy(task->cmd, aggr_node); } rb_tree_destroy(&hs->aggregate_tree); flb_free(hs); } - rb_tree_destroy(&window->aggregate_tree); + if (task->window.fd > 0) { + mk_event_timeout_destroy(task->sp->config->evl, &task->window.event); + mk_event_closesocket(task->window.fd); + } + + rb_tree_destroy(&task->window.aggregate_tree); } void flb_sp_task_destroy(struct flb_sp_task *task) { flb_sds_destroy(task->name); flb_sds_destroy(task->query); - flb_sp_window_destroy(task->cmd, &task->window); + flb_sp_window_destroy(task); flb_sp_snapshot_destroy(task->snapshot); + mk_list_del(&task->_head); if (task->stream) { @@ -1114,6 +1119,7 @@ void package_results(const char *tag, int tag_len, char **out_buf, size_t *out_size, struct flb_sp_task *task) { + char *c_name; int i; int len; int map_entries; @@ -1165,14 +1171,13 @@ void package_results(const char *tag, int tag_len, flb_sds_len(ckey->alias)); } else { - len = 0; - char *c_name; if (!ckey->name) { c_name = "*"; } else { c_name = ckey->name; } + len = strlen(c_name); msgpack_pack_str(&mp_pck, len); msgpack_pack_str_body(&mp_pck, c_name, len); diff --git a/src/stream_processor/parser/flb_sp_parser.c b/src/stream_processor/parser/flb_sp_parser.c index 429d0b4c105..a72cf04efb0 100644 --- a/src/stream_processor/parser/flb_sp_parser.c +++ b/src/stream_processor/parser/flb_sp_parser.c @@ -139,8 +139,8 @@ struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func, struct flb_sp_cmd_key *key; struct flb_slist_entry *entry; - /* aggregation function ? */ if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) { + /* Aggregation function */ aggr_func = func; } else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) { diff --git a/tests/internal/include/sp_cb_functions.h b/tests/internal/include/sp_cb_functions.h index aee3fb393e6..346a4bb7e12 100644 --- a/tests/internal/include/sp_cb_functions.h +++ b/tests/internal/include/sp_cb_functions.h @@ -535,6 +535,20 @@ static void cb_select_groupby(int id, struct task_check *check, ret = mp_count_rows(buf, size); TEST_CHECK(ret == 2); + /* bool is 1 for record 0 (bool=true) */ + ret = mp_record_key_cmp(buf, size, + 0, "bool", + MSGPACK_OBJECT_POSITIVE_INTEGER, + NULL, 1, 0); + TEST_CHECK(ret == FLB_TRUE); + + /* bool is 0 for record 1 (bool=false) */ + ret = mp_record_key_cmp(buf, size, + 1, "bool", + MSGPACK_OBJECT_POSITIVE_INTEGER, + NULL, 0, 0); + TEST_CHECK(ret == FLB_TRUE); + /* MIN(id) is 0 for record 0 (bool=true) */ ret = mp_record_key_cmp(buf, size, 0, "MIN(id)", @@ -556,7 +570,7 @@ static void cb_select_groupby(int id, struct task_check *check, NULL, 8, 0); TEST_CHECK(ret == FLB_TRUE); - /* MAX(id) is i9 for record 1 (bool=false) */ + /* MAX(id) is 9 for record 1 (bool=false) */ ret = mp_record_key_cmp(buf, size, 1, "MAX(id)", MSGPACK_OBJECT_POSITIVE_INTEGER, From 79e9730336cfa5438e4b6b4d59c32fb1fa912e4b Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Tue, 17 Oct 2023 23:22:17 +0900 Subject: [PATCH 4/4] in_forward: fix checking return value of cmt_decode_msgpack_create(#8000) (#8015) Signed-off-by: Takahiro Yamashita --- plugins/in_forward/fw_prot.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index 2a23b625441..b656a100321 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -778,7 +778,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } else if (event_type == FLB_EVENT_TYPE_METRICS) { ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off); - if (ret == -1) { + if (ret != CMT_DECODE_MSGPACK_SUCCESS) { + flb_error("cmt_decode_msgpack_create failed. ret=%d", ret); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); flb_sds_destroy(out_tag);