Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: recent fixes that landed in master v2.2 #8053

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &off);
if (ret == -1) {
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("cmt_decode_msgpack_create failed. ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
Expand Down
26 changes: 24 additions & 2 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,14 @@ static void flb_signal_exit(int signal)
};
}

static void flb_signal_handler(int signal)
static void flb_signal_handler_status_line(struct flb_cf *cf_opts)
{
int len;
char ts[32];
char s[] = "[engine] caught signal (";
time_t now;
struct tm *cur;
flb_ctx_t *ctx = flb_context_get();
struct flb_cf *cf_opts = flb_cf_context_get();

now = time(NULL);
cur = localtime(&now);
Expand All @@ -573,6 +572,13 @@ static void flb_signal_handler(int signal)
/* write signal number */
write(STDERR_FILENO, ts, len);
write(STDERR_FILENO, s, sizeof(s) - 1);
}

static void flb_signal_handler(int signal)
{
struct flb_cf *cf_opts = flb_cf_context_get();
flb_signal_handler_status_line(cf_opts);

switch (signal) {
flb_print_signal(SIGINT);
#ifndef FLB_SYSTEM_WINDOWS
Expand Down Expand Up @@ -631,7 +637,19 @@ void flb_console_handler_set_ctx(flb_ctx_t *ctx, struct flb_cf *cf_opts)

static BOOL WINAPI flb_console_handler(DWORD evType)
{
struct flb_cf *cf_opts;

switch(evType) {
case 0 /* CTRL_C_EVENT_0 */:
cf_opts = flb_cf_context_get();
flb_signal_handler_status_line(cf_opts);
write (STDERR_FILENO, "SIGINT)\n", sizeof("SIGINT)\n")-1);
/* signal the main loop to execute reload even if CTRL_C event.
* This is necessary because all signal handlers in win32
* are executed on their own thread.
*/
handler_signal = 2;
break;
case 1 /* CTRL_BREAK_EVENT_1 */:
if (flb_bin_restarting == FLB_RELOAD_IDLE) {
flb_bin_restarting = FLB_RELOAD_IN_PROGRESS;
Expand Down Expand Up @@ -1366,6 +1384,10 @@ int flb_main(int argc, char **argv)
handler_signal = 0;
flb_reload(ctx, cf_opts);
}
else if (handler_signal == 2){
handler_signal = 0;
break;
}
#endif

/* set the context again before checking the status again */
Expand Down
39 changes: 22 additions & 17 deletions src/stream_processor/flb_sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd)
}

/*
* if some aggregated function is required, not aggregated keys are
* not allowed so we return an error (-1).
* If aggregated functions are included in the query, non-aggregated keys are
* not allowed (except for the ones inside GROUP BY statement).
*/
if (aggr > 0 && not_aggr == 0) {
return aggr;
Expand Down Expand Up @@ -490,7 +490,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
/* Check and validate aggregated keys */
ret = sp_cmd_aggregated_keys(task->cmd);
if (ret == -1) {
flb_error("[sp] aggregated query cannot mix not aggregated keys: %s",
flb_error("[sp] aggregated query cannot include the aggregated keys: %s",
query);
flb_sp_task_destroy(task);
return NULL;
Expand All @@ -506,10 +506,10 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
event = &task->window.event;
MK_EVENT_ZERO(event);

/* Run every 'size' seconds */
/* Run every 'window size' seconds */
fd = mk_event_timeout_create(sp->config->evl,
cmd->window.size, (long) 0,
&task->window.event);
event);
if (fd == -1) {
flb_error("[sp] registration for task %s failed", task->name);
flb_free(task);
Expand All @@ -525,7 +525,7 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
/* Run every 'size' seconds */
fd = mk_event_timeout_create(sp->config->evl,
cmd->window.advance_by, (long) 0,
&task->window.event_hop);
event);
if (fd == -1) {
flb_error("[sp] registration for task %s failed", task->name);
flb_free(task);
Expand Down Expand Up @@ -624,8 +624,7 @@ void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
flb_free(aggr_node);
}

void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
struct flb_sp_task_window *window)
void flb_sp_window_destroy(struct flb_sp_task *task)
{
struct flb_sp_window_data *data;
struct aggregate_node *aggr_node;
Expand All @@ -635,39 +634,45 @@ void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
struct mk_list *head_hs;
struct mk_list *tmp_hs;

mk_list_foreach_safe(head, tmp, &window->data) {
mk_list_foreach_safe(head, tmp, &task->window.data) {
data = mk_list_entry(head, struct flb_sp_window_data, _head);
flb_free(data->buf_data);
mk_list_del(&data->_head);
flb_free(data);
}

mk_list_foreach_safe(head, tmp, &window->aggregate_list) {
mk_list_foreach_safe(head, tmp, &task->window.aggregate_list) {
aggr_node = mk_list_entry(head, struct aggregate_node, _head);
mk_list_del(&aggr_node->_head);
flb_sp_aggregate_node_destroy(cmd, aggr_node);
flb_sp_aggregate_node_destroy(task->cmd, aggr_node);
}

mk_list_foreach_safe(head, tmp, &window->hopping_slot) {
mk_list_foreach_safe(head, tmp, &task->window.hopping_slot) {
hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head);
mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) {
aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head);
mk_list_del(&aggr_node->_head);
flb_sp_aggregate_node_destroy(cmd, aggr_node);
flb_sp_aggregate_node_destroy(task->cmd, aggr_node);
}
rb_tree_destroy(&hs->aggregate_tree);
flb_free(hs);
}

rb_tree_destroy(&window->aggregate_tree);
if (task->window.fd > 0) {
mk_event_timeout_destroy(task->sp->config->evl, &task->window.event);
mk_event_closesocket(task->window.fd);
}

rb_tree_destroy(&task->window.aggregate_tree);
}

void flb_sp_task_destroy(struct flb_sp_task *task)
{
flb_sds_destroy(task->name);
flb_sds_destroy(task->query);
flb_sp_window_destroy(task->cmd, &task->window);
flb_sp_window_destroy(task);
flb_sp_snapshot_destroy(task->snapshot);

mk_list_del(&task->_head);

if (task->stream) {
Expand Down Expand Up @@ -1114,6 +1119,7 @@ void package_results(const char *tag, int tag_len,
char **out_buf, size_t *out_size,
struct flb_sp_task *task)
{
char *c_name;
int i;
int len;
int map_entries;
Expand Down Expand Up @@ -1165,14 +1171,13 @@ void package_results(const char *tag, int tag_len,
flb_sds_len(ckey->alias));
}
else {
len = 0;
char *c_name;
if (!ckey->name) {
c_name = "*";
}
else {
c_name = ckey->name;
}
len = strlen(c_name);

msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, c_name, len);
Expand Down
2 changes: 1 addition & 1 deletion src/stream_processor/parser/flb_sp_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func,
struct flb_sp_cmd_key *key;
struct flb_slist_entry *entry;

/* aggregation function ? */
if (func >= FLB_SP_AVG && func <= FLB_SP_FORECAST) {
/* Aggregation function */
aggr_func = func;
}
else if (func >= FLB_SP_NOW && func <= FLB_SP_UNIX_TIMESTAMP) {
Expand Down
16 changes: 15 additions & 1 deletion tests/internal/include/sp_cb_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ static void cb_select_groupby(int id, struct task_check *check,
ret = mp_count_rows(buf, size);
TEST_CHECK(ret == 2);

/* bool is 1 for record 0 (bool=true) */
ret = mp_record_key_cmp(buf, size,
0, "bool",
MSGPACK_OBJECT_POSITIVE_INTEGER,
NULL, 1, 0);
TEST_CHECK(ret == FLB_TRUE);

/* bool is 0 for record 1 (bool=false) */
ret = mp_record_key_cmp(buf, size,
1, "bool",
MSGPACK_OBJECT_POSITIVE_INTEGER,
NULL, 0, 0);
TEST_CHECK(ret == FLB_TRUE);

/* MIN(id) is 0 for record 0 (bool=true) */
ret = mp_record_key_cmp(buf, size,
0, "MIN(id)",
Expand All @@ -556,7 +570,7 @@ static void cb_select_groupby(int id, struct task_check *check,
NULL, 8, 0);
TEST_CHECK(ret == FLB_TRUE);

/* MAX(id) is i9 for record 1 (bool=false) */
/* MAX(id) is 9 for record 1 (bool=false) */
ret = mp_record_key_cmp(buf, size,
1, "MAX(id)",
MSGPACK_OBJECT_POSITIVE_INTEGER,
Expand Down
Loading