Skip to content

Commit

Permalink
Extended error hanling for the raw message pack api
Browse files Browse the repository at this point in the history
  • Loading branch information
gznv committed Dec 5, 2023
1 parent 854fb3f commit e68a1cb
Showing 1 changed file with 117 additions and 44 deletions.
161 changes: 117 additions & 44 deletions src/raw_msgpack_api/raw_msgpack_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#define _OPEN_SYS_ITOA_EXT
#include <stdio.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/un.h>

Expand Down Expand Up @@ -81,16 +82,39 @@ typedef struct raw_msgpack_api_context_t {
} raw_msgpack_api_context_t;


static raw_msgpack_api_context_t* raw_msgpack_api_context_create() {
raw_msgpack_api_context_t* ctx = (raw_msgpack_api_context_t*) calloc(1, sizeof(raw_msgpack_api_context_t));
if (ctx == NULL) {
return NULL;
}

ctx->doorbell_cli = -1;
ctx->in_ffd = -1;
ctx->out_ffd = -1;

return ctx;
}

static void raw_msgpack_api_context_delete(raw_msgpack_api_context_t* raw_ctx) {
if (raw_ctx->doorbell_cli != -1) {
close(raw_ctx->doorbell_cli);
unlink(raw_ctx->client_addr);
}

if (raw_ctx->ctx) {
flb_destroy(raw_ctx->ctx);
}

free(raw_ctx);
}

typedef struct doorbell_msg_t {
int data_len;
char* buffer;
} doorbell_msg_t;


#if 1
#include <stdio.h>

void DumpHex(const void* data, size_t size) {
#ifdef VERBOSE_DATA
static void dump_hex_data(const void* data, size_t size) {
char ascii[17];
size_t i, j;
ascii[16] = '\0';
Expand Down Expand Up @@ -118,10 +142,9 @@ void DumpHex(const void* data, size_t size) {
}
}
}
#endif
#endif // VERBOSE_DATA


int ipc_unix_sock_cli_create(char *sock_path) {
static int ipc_unix_sock_cli_create(const char* sock_path) {
int socket_fd;
struct sockaddr_un client_address;

Expand Down Expand Up @@ -185,8 +208,8 @@ bool ring_doorbell(raw_msgpack_api_context_t* raw_ctx, int client_fd, int data_l
}


void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output_plugin_name,
const char * host, const char * port, const char * socket_prefix) {
static void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output_plugin_name,
const char* host, const char* port, const char* socket_prefix) {
char postfix[128] = "";
bool is_prefix = strlen(socket_prefix) > 0;
bool is_plugin = strlen(output_plugin_name) > 0;
Expand All @@ -195,12 +218,12 @@ void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output

sprintf(postfix, "%s_%s_%s_%s_%p",
(is_prefix ? socket_prefix : "-"),
(is_plugin ? output_plugin_name : "defPluguin"),
(is_plugin ? output_plugin_name : "defPlugin"),
(is_host ? host : "defHost"),
(is_port ? port : "defPort"),
(void*) raw_ctx);

get_socket_path(CLIENT_SOCK_PATH, postfix, raw_ctx->client_addr );
get_socket_path(CLIENT_SOCK_PATH, postfix, raw_ctx->client_addr);
get_socket_path(SERVER_SOCK_PATH, postfix, raw_ctx->server_addr);


Expand All @@ -211,74 +234,119 @@ void prepare_socket_names(raw_msgpack_api_context_t* raw_ctx, const char* output
}


void* init(const char* output_plugin_name, const char * host, const char * port,
void* plugin_params, const char * socket_prefix) {
void* init(const char* output_plugin_name, const char* host, const char* port,
void* plugin_params, const char* socket_prefix) {
#ifdef VERBOSE
printf("[Raw Msgpack API] Initialization started.\n");
#endif

plugin_params_t* params = (plugin_params_t *) plugin_params;
raw_msgpack_api_context_t* raw_ctx = (raw_msgpack_api_context_t*) calloc(1, sizeof(raw_msgpack_api_context_t));
raw_msgpack_api_context_t* raw_ctx = raw_msgpack_api_context_create();
if (raw_ctx == NULL) {
printf("[Raw Msgpack API] critical: failed to create raw context.\n");
return NULL;
}

prepare_socket_names(raw_ctx, output_plugin_name, host, port, socket_prefix);

/* Initialize library */
raw_ctx->ctx = flb_create();
if (!raw_ctx->ctx) {
printf("[Raw Msgpack API] could not create flb context. Returning Null.\n");
return NULL;
printf("[Raw Msgpack API] critical: could not create flb context.\n");
goto error;
}

if (flb_service_set(raw_ctx->ctx, "Flush", "0.1", NULL) != 0) {
printf("[Raw Msgpack API] warning: failed to set service property Flush to 0.1\n");
}

if (flb_service_set(raw_ctx->ctx, "Grace", "1", NULL) != 0) {
printf("[Raw Msgpack API] warning: failed to set service property Grace to 1\n");
}
flb_service_set(raw_ctx->ctx, "Flush", "0.1", NULL);
flb_service_set(raw_ctx->ctx, "Grace", "1", NULL);

// create a client socket here to be ready to ring to "doorbell"
raw_ctx->doorbell_cli = ipc_unix_sock_cli_create(raw_ctx->client_addr);
if (raw_ctx->doorbell_cli < 0) {
printf("[Raw Msgpack API] critical: failed to cerate doorbell socket\n");
goto error;
}

#ifdef VERBOSE
printf("[Raw Msgpack API] created client sock %d\n", raw_ctx->doorbell_cli);
#endif
in_plugin_data_t* in_data = &(raw_ctx->in_data);

in_data->server_addr = raw_ctx->server_addr;
// raw_ctx->i_ins = flb_input_new(raw_ctx->ctx->config, "raw_msgpack", (void *) in_data, FLB_TRUE);
// if (!raw_ctx->i_ins) {
// return NULL;
// }

raw_ctx->in_ffd = flb_input(raw_ctx->ctx, "raw_msgpack", (void *) in_data);
if (raw_ctx->in_ffd < 0) {
printf("[Raw Msgpack API] critical: failed to cerate input instance 'raw_msgpack'\n");
goto error;
}

raw_ctx->out_ffd = -1;
const char* default_output_plugin_name = "forward";
if (strlen(output_plugin_name) > 0) { // simple check for plugin name
raw_ctx->out_ffd = flb_output(raw_ctx->ctx, output_plugin_name, NULL);
if (raw_ctx->out_ffd < 0) {
printf("[Raw Msgpack API] warning: failed to cerate output instance '%s', will use the default '%s'\n",
output_plugin_name, default_output_plugin_name);
}
}

if (raw_ctx->out_ffd == -1) {
// if cannot find 'output_plugin_name' plugin, use the default 'forward'
raw_ctx->out_ffd = flb_output(raw_ctx->ctx, "forward", NULL);
raw_ctx->out_ffd = flb_output(raw_ctx->ctx, default_output_plugin_name, NULL);
if (raw_ctx->out_ffd == -1) {
printf("[Raw Msgpack API] critical: failed to cerate output instance '%s'\n", default_output_plugin_name);
goto error;
}
}

flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Host", host, NULL);
flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Port", port, NULL);
if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Host", host, NULL) < 0) {
printf("[Raw Msgpack API] warning: failed to set output parameter Host to '%s'\n", host);
}

if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "Port", port, NULL) < 0) {
printf("[Raw Msgpack API] warning: failed to set output parameter Port to '%s'\n", port);
}

if (params != NULL) {
int i;
if (params->num_params > 0) {
printf("\n[Raw Msgpack API] Setting '%s' output plugin parameters:\n", output_plugin_name);
}
for (i = 0; i < params->num_params; i++) {
printf("\t\t\t\t'%s' to '%s'\n", params->params[i].name, params->params[i].val);
if (strcmp(params->params[i].name, "tag_match_pair") != 0) {
flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, params->params[i].name, params->params[i].val, NULL);
for (int i = 0; i < params->num_params; i++) {
const char* name = params->params[i].name;
const char* val = params->params[i].val;
printf("\t\t\t\t'%s' to '%s'\n", name, val);
if (strcmp(name, "tag_match_pair") != 0) {
if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, name, val, NULL) < 0) {
printf("[Raw Msgpack API] warning: failed to set output parameter '%s' to '%s'\n", name, val);
}
} else {
flb_input_set(raw_ctx->ctx, raw_ctx->in_ffd, "tag", params->params[i].val, NULL);
flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "match", params->params[i].val, NULL);
if (flb_input_set(raw_ctx->ctx, raw_ctx->in_ffd, "tag", val, NULL) < 0) {
printf("[Raw Msgpack API] warning: failed to set input parameter '%s' to '%s'\n", "tag", val);
}
if (flb_output_set(raw_ctx->ctx, raw_ctx->out_ffd, "match", val, NULL) < 0) {
printf("[Raw Msgpack API] warning: failed to set output parameter '%s' to '%s'\n", "match", val);
}
}
}
}

// Start the background worker
flb_start(raw_ctx->ctx);
if (flb_start(raw_ctx->ctx) < 0) {
printf("[Raw Msgpack API] critical: failed to start background worker\n");
goto error;
}

#ifdef VERBOSE
printf("[Raw Msgpack API] init finished\n\n");
#endif
return (void*) raw_ctx;
return raw_ctx;

error:
raw_msgpack_api_context_delete(raw_ctx);
return NULL;
}


Expand All @@ -292,8 +360,10 @@ int add_data(void* api_ctx, void* data, int len) {
return 0;
}
#ifdef VERBOSE
//printf("Append raw data of len %d\n", len);
// DumpHex(data, len);
printf("Append raw data of len %d\n", len);
#endif
#ifdef VERBOSE_DATA
dump_hex_data(data, len);
#endif
ring_doorbell(raw_ctx, raw_ctx->doorbell_cli, len, (char*) data);
return 0;
Expand All @@ -311,11 +381,14 @@ int finalize(void* api_ctx) {
// printf("\t\t\t\t\t\tserver_addr '%s'\n", raw_ctx->server_addr);
// printf("\t\t\t\t\t\tbuffer_addr '%p'\n", raw_ctx->buffer);
#endif
// clean up socket
close(raw_ctx->doorbell_cli);
unlink(raw_ctx->client_addr);
// finilize fluent bit
flb_stop(raw_ctx->ctx);
flb_destroy(raw_ctx->ctx);

// finalize fluent bit
int ret = flb_stop(raw_ctx->ctx);
if (ret != 0) {
printf("[Raw Msgpack API] warning: background worker stopped with the error code %d\n", ret);
}

raw_msgpack_api_context_delete(raw_ctx);

return 0;
}

0 comments on commit e68a1cb

Please sign in to comment.