diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index e692f674cd1..1e4d83089af 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -351,6 +351,8 @@ static int in_fw_init(struct flb_input_instance *ins, ctx->coll_fd = ret; + pthread_mutex_init(&ctx->conn_mutex, NULL); + return 0; } @@ -365,8 +367,11 @@ static void in_fw_pause(void *data, struct flb_config *config) * and wait for the ingestion to resume. */ flb_input_collector_pause(ctx->coll_fd, ctx->ins); - fw_conn_del_all(ctx); - ctx->is_paused = FLB_TRUE; + if (pthread_mutex_lock(&ctx->conn_mutex)) { + fw_conn_del_all(ctx); + ctx->is_paused = FLB_TRUE; + } + pthread_mutex_unlock(&ctx->conn_mutex); } /* @@ -385,8 +390,11 @@ static void in_fw_pause(void *data, struct flb_config *config) static void in_fw_resume(void *data, struct flb_config *config) { struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { - ctx->is_paused = FLB_FALSE; flb_input_collector_resume(ctx->coll_fd, ctx->ins); + if (pthread_mutex_lock(&ctx->conn_mutex)) { + ctx->is_paused = FLB_FALSE; + } + pthread_mutex_unlock(&ctx->conn_mutex); } } diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 032371b7d3a..d51e1c47925 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -73,6 +73,8 @@ struct flb_in_fw_config { struct flb_log_event_decoder *log_decoder; struct flb_log_event_encoder *log_encoder; + pthread_mutex_t conn_mutex; + /* Plugin is paused */ int is_paused; };