From e68a1cb8d88b59f4350df77c8617ab4c42775324 Mon Sep 17 00:00:00 2001 From: Grigorii Zagorodnev Date: Wed, 6 Dec 2023 00:27:24 +0200 Subject: [PATCH] Extended error hanling for the raw message pack api --- src/raw_msgpack_api/raw_msgpack_api.c | 161 +++++++++++++++++++------- 1 file changed, 117 insertions(+), 44 deletions(-) diff --git a/src/raw_msgpack_api/raw_msgpack_api.c b/src/raw_msgpack_api/raw_msgpack_api.c index c7eb49b144b..5f765933616 100644 --- a/src/raw_msgpack_api/raw_msgpack_api.c +++ b/src/raw_msgpack_api/raw_msgpack_api.c @@ -33,6 +33,7 @@ #define _OPEN_SYS_ITOA_EXT #include +#include #include #include @@ -81,16 +82,39 @@ typedef struct raw_msgpack_api_context_t { } raw_msgpack_api_context_t; +static raw_msgpack_api_context_t* raw_msgpack_api_context_create() { + raw_msgpack_api_context_t* ctx = (raw_msgpack_api_context_t*) calloc(1, sizeof(raw_msgpack_api_context_t)); + if (ctx == NULL) { + return NULL; + } + + ctx->doorbell_cli = -1; + ctx->in_ffd = -1; + ctx->out_ffd = -1; + + return ctx; +} + +static void raw_msgpack_api_context_delete(raw_msgpack_api_context_t* raw_ctx) { + if (raw_ctx->doorbell_cli != -1) { + close(raw_ctx->doorbell_cli); + unlink(raw_ctx->client_addr); + } + + if (raw_ctx->ctx) { + flb_destroy(raw_ctx->ctx); + } + + free(raw_ctx); +} + typedef struct doorbell_msg_t { int data_len; char* buffer; } doorbell_msg_t; - -#if 1 -#include - -void DumpHex(const void* data, size_t size) { +#ifdef VERBOSE_DATA +static void dump_hex_data(const void* data, size_t size) { char ascii[17]; size_t i, j; ascii[16] = '\0'; @@ -118,10 +142,9 @@ void DumpHex(const void* data, size_t size) { } } } -#endif +#endif // VERBOSE_DATA - -int ipc_unix_sock_cli_create(char *sock_path) { +static int ipc_unix_sock_cli_create(const char* sock_path) { int socket_fd; struct sockaddr_un client_address; @@ -185,8 +208,8 @@ bool ring_doorbell(raw_msgpack_api_context_t* raw_ctx, int client_fd, int data_l } -void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output_plugin_name, - const char * host, const char * port, const char * socket_prefix) { +static void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output_plugin_name, + const char* host, const char* port, const char* socket_prefix) { char postfix[128] = ""; bool is_prefix = strlen(socket_prefix) > 0; bool is_plugin = strlen(output_plugin_name) > 0; @@ -195,12 +218,12 @@ void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output sprintf(postfix, "%s_%s_%s_%s_%p", (is_prefix ? socket_prefix : "-"), - (is_plugin ? output_plugin_name : "defPluguin"), + (is_plugin ? output_plugin_name : "defPlugin"), (is_host ? host : "defHost"), (is_port ? port : "defPort"), (void*) raw_ctx); - get_socket_path(CLIENT_SOCK_PATH, postfix, raw_ctx->client_addr ); + get_socket_path(CLIENT_SOCK_PATH, postfix, raw_ctx->client_addr); get_socket_path(SERVER_SOCK_PATH, postfix, raw_ctx->server_addr); @@ -211,74 +234,119 @@ void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output } -void* init(const char* output_plugin_name, const char * host, const char * port, - void* plugin_params, const char * socket_prefix) { +void* init(const char* output_plugin_name, const char* host, const char* port, + void* plugin_params, const char* socket_prefix) { #ifdef VERBOSE printf("[Raw Msgpack API] Initialization started.\n"); #endif plugin_params_t* params = (plugin_params_t *) plugin_params; - raw_msgpack_api_context_t* raw_ctx = (raw_msgpack_api_context_t*) calloc(1, sizeof(raw_msgpack_api_context_t)); + raw_msgpack_api_context_t* raw_ctx = raw_msgpack_api_context_create(); + if (raw_ctx == NULL) { + printf("[Raw Msgpack API] critical: failed to create raw context.\n"); + return NULL; + } prepare_socket_names(raw_ctx, output_plugin_name, host, port, socket_prefix); /* Initialize library */ raw_ctx->ctx = flb_create(); if (!raw_ctx->ctx) { - printf("[Raw Msgpack API] could not create flb context. Returning Null.\n"); - return NULL; + printf("[Raw Msgpack API] critical: could not create flb context.\n"); + goto error; + } + + if (flb_service_set(raw_ctx->ctx, "Flush", "0.1", NULL) != 0) { + printf("[Raw Msgpack API] warning: failed to set service property Flush to 0.1\n"); + } + + if (flb_service_set(raw_ctx->ctx, "Grace", "1", NULL) != 0) { + printf("[Raw Msgpack API] warning: failed to set service property Grace to 1\n"); } - flb_service_set(raw_ctx->ctx, "Flush", "0.1", NULL); - flb_service_set(raw_ctx->ctx, "Grace", "1", NULL); // create a client socket here to be ready to ring to "doorbell" raw_ctx->doorbell_cli = ipc_unix_sock_cli_create(raw_ctx->client_addr); + if (raw_ctx->doorbell_cli < 0) { + printf("[Raw Msgpack API] critical: failed to cerate doorbell socket\n"); + goto error; + } + #ifdef VERBOSE printf("[Raw Msgpack API] created client sock %d\n", raw_ctx->doorbell_cli); #endif in_plugin_data_t* in_data = &(raw_ctx->in_data); in_data->server_addr = raw_ctx->server_addr; - // raw_ctx->i_ins = flb_input_new(raw_ctx->ctx->config, "raw_msgpack", (void *) in_data, FLB_TRUE); - // if (!raw_ctx->i_ins) { - // return NULL; - // } + raw_ctx->in_ffd = flb_input(raw_ctx->ctx, "raw_msgpack", (void *) in_data); + if (raw_ctx->in_ffd < 0) { + printf("[Raw Msgpack API] critical: failed to cerate input instance 'raw_msgpack'\n"); + goto error; + } - raw_ctx->out_ffd = -1; + const char* default_output_plugin_name = "forward"; if (strlen(output_plugin_name) > 0) { // simple check for plugin name raw_ctx->out_ffd = flb_output(raw_ctx->ctx, output_plugin_name, NULL); + if (raw_ctx->out_ffd < 0) { + printf("[Raw Msgpack API] warning: failed to cerate output instance '%s', will use the default '%s'\n", + output_plugin_name, default_output_plugin_name); + } } + if (raw_ctx->out_ffd == -1) { // if cannot find 'output_plugin_name' plugin, use the default 'forward' - raw_ctx->out_ffd = flb_output(raw_ctx->ctx, "forward", NULL); + raw_ctx->out_ffd = flb_output(raw_ctx->ctx, default_output_plugin_name, NULL); + if (raw_ctx->out_ffd == -1) { + printf("[Raw Msgpack API] critical: failed to cerate output instance '%s'\n", default_output_plugin_name); + goto error; + } } - flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Host", host, NULL); - flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Port", port, NULL); + if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Host", host, NULL) < 0) { + printf("[Raw Msgpack API] warning: failed to set output parameter Host to '%s'\n", host); + } + + if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Port", port, NULL) < 0) { + printf("[Raw Msgpack API] warning: failed to set output parameter Port to '%s'\n", port); + } if (params != NULL) { - int i; if (params->num_params > 0) { printf("\n[Raw Msgpack API] Setting '%s' output plugin parameters:\n", output_plugin_name); } - for (i = 0; i < params->num_params; i++) { - printf("\t\t\t\t'%s' to '%s'\n", params->params[i].name, params->params[i].val); - if (strcmp(params->params[i].name, "tag_match_pair") != 0) { - flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, params->params[i].name, params->params[i].val, NULL); + for (int i = 0; i < params->num_params; i++) { + const char* name = params->params[i].name; + const char* val = params->params[i].val; + printf("\t\t\t\t'%s' to '%s'\n", name, val); + if (strcmp(name, "tag_match_pair") != 0) { + if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, name, val, NULL) < 0) { + printf("[Raw Msgpack API] warning: failed to set output parameter '%s' to '%s'\n", name, val); + } } else { - flb_input_set(raw_ctx->ctx, raw_ctx->in_ffd, "tag", params->params[i].val, NULL); - flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "match", params->params[i].val, NULL); + if (flb_input_set(raw_ctx->ctx, raw_ctx->in_ffd, "tag", val, NULL) < 0) { + printf("[Raw Msgpack API] warning: failed to set input parameter '%s' to '%s'\n", "tag", val); + } + if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "match", val, NULL) < 0) { + printf("[Raw Msgpack API] warning: failed to set output parameter '%s' to '%s'\n", "match", val); + } } } } // Start the background worker - flb_start(raw_ctx->ctx); + if (flb_start(raw_ctx->ctx) < 0) { + printf("[Raw Msgpack API] critical: failed to start background worker\n"); + goto error; + } + #ifdef VERBOSE printf("[Raw Msgpack API] init finished\n\n"); #endif - return (void*) raw_ctx; + return raw_ctx; + +error: + raw_msgpack_api_context_delete(raw_ctx); + return NULL; } @@ -292,8 +360,10 @@ int add_data(void* api_ctx, void* data, int len) { return 0; } #ifdef VERBOSE - //printf("Append raw data of len %d\n", len); - // DumpHex(data, len); + printf("Append raw data of len %d\n", len); +#endif +#ifdef VERBOSE_DATA + dump_hex_data(data, len); #endif ring_doorbell(raw_ctx, raw_ctx->doorbell_cli, len, (char*) data); return 0; @@ -311,11 +381,14 @@ int finalize(void* api_ctx) { // printf("\t\t\t\t\t\tserver_addr '%s'\n", raw_ctx->server_addr); // printf("\t\t\t\t\t\tbuffer_addr '%p'\n", raw_ctx->buffer); #endif - // clean up socket - close(raw_ctx->doorbell_cli); - unlink(raw_ctx->client_addr); - // finilize fluent bit - flb_stop(raw_ctx->ctx); - flb_destroy(raw_ctx->ctx); + + // finalize fluent bit + int ret = flb_stop(raw_ctx->ctx); + if (ret != 0) { + printf("[Raw Msgpack API] warning: background worker stopped with the error code %d\n", ret); + } + + raw_msgpack_api_context_delete(raw_ctx); + return 0; }