Skip to content

Commit

Permalink
feat: add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: composer <[email protected]>
  • Loading branch information
joker-star-l committed Oct 23, 2024
1 parent 93639c3 commit 29e7258
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 0 deletions.
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ if(FLB_IN_LIB)
FLB_RT_TEST(FLB_OUT_S3 "out_s3.c")
FLB_RT_TEST(FLB_OUT_TD "out_td.c")
FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c")
FLB_RT_TEST(FLB_OUT_DORIS "out_doris.c")

endif()

Expand Down
268 changes: 268 additions & 0 deletions tests/runtime/out_doris.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019-2022 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fluent-bit.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_time.h>
#include <float.h>
#include <math.h>
#include <msgpack.h>
#include "flb_tests_runtime.h"

struct test_ctx {
flb_ctx_t *flb; /* Fluent Bit library context */
int i_ffd; /* Input fd */
int f_ffd; /* Filter fd (unused) */
int o_ffd; /* Output fd */
};

pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
int num_output = 0;
static int get_output_num()
{
int ret;
pthread_mutex_lock(&result_mutex);
ret = num_output;
pthread_mutex_unlock(&result_mutex);

return ret;
}

static void set_output_num(int num)
{
pthread_mutex_lock(&result_mutex);
num_output = num;
pthread_mutex_unlock(&result_mutex);
}

static void clear_output_num()
{
set_output_num(0);
}

struct str_list {
size_t size;
char **lists;
};

/* Callback to check expected results */
static void cb_check_str_list(void *ctx, int ffd, int res_ret,
void *res_data, size_t res_size, void *data)
{
char *p;
flb_sds_t out_line = res_data;
int num = get_output_num();
size_t i;
struct str_list *l = (struct str_list *)data;

if (!TEST_CHECK(res_data != NULL)) {
TEST_MSG("res_data is NULL");
return;
}

if (!TEST_CHECK(l != NULL)) {
TEST_MSG("l is NULL");
flb_sds_destroy(out_line);
return;
}

if(!TEST_CHECK(res_ret == 0)) {
TEST_MSG("callback ret=%d", res_ret);
}
if (!TEST_CHECK(res_data != NULL)) {
TEST_MSG("res_data is NULL");
flb_sds_destroy(out_line);
return;
}

for (i=0; i<l->size; i++) {
p = strstr(out_line, l->lists[i]);
if (!TEST_CHECK(p != NULL)) {
TEST_MSG(" Got :%s\n expect:%s", out_line, l->lists[i]);
}
}
set_output_num(num+1);

flb_sds_destroy(out_line);
}

static struct test_ctx *test_ctx_create()
{
int i_ffd;
int o_ffd;
struct test_ctx *ctx = NULL;

ctx = flb_malloc(sizeof(struct test_ctx));
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("malloc failed");
flb_errno();
return NULL;
}

/* Service config */
ctx->flb = flb_create();
flb_service_set(ctx->flb,
"Flush", "0.200000000",
"Grace", "1",
"Log_Level", "error",
NULL);

/* Input */
i_ffd = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffd >= 0);
ctx->i_ffd = i_ffd;

/* Output */
o_ffd = flb_output(ctx->flb, (char *) "doris", NULL);
ctx->o_ffd = o_ffd;

return ctx;
}

static void test_ctx_destroy(struct test_ctx *ctx)
{
TEST_CHECK(ctx != NULL);

sleep(1);
flb_stop(ctx->flb);
flb_destroy(ctx->flb);
flb_free(ctx);
}

void flb_test_json()
{
struct test_ctx *ctx;
int ret;
int num;

char *buf1 = "[1, {\"msg\":\"hello world\"}]";
size_t size1 = strlen(buf1);
char *buf2 = "[2, {\"msg\":\"hello world\"}]";
size_t size2 = strlen(buf2);

char *expected_strs[] = {"[{\"date\":1.0,\"msg\":\"hello world\"},{\"date\":2.0,\"msg\":\"hello world\"}]"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
};

clear_output_num();

ctx = test_ctx_create();
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"user", "admin",
"database", "d_fb",
"table", "t_fb",
NULL);
TEST_CHECK(ret == 0);

ret = flb_output_set_test(ctx->flb, ctx->o_ffd,
"formatter", cb_check_str_list,
&expected, NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf1, size1);
TEST_CHECK(ret >= 0);
ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf2, size2);
TEST_CHECK(ret >= 0);

/* waiting to flush */
flb_time_msleep(500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}

test_ctx_destroy(ctx);
}

void flb_test_time_key()
{
struct test_ctx *ctx;
int ret;
int num;

char *buf1 = "[1, {\"msg\":\"hello world\"}]";
size_t size1 = strlen(buf1);

char *expected_strs[] = {"{\"timestamp\":1.0,\"msg\":\"hello world\"}"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
};

clear_output_num();

ctx = test_ctx_create();
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"user", "admin",
"database", "d_fb",
"table", "t_fb",
"time_key", "timestamp",
NULL);
TEST_CHECK(ret == 0);

ret = flb_output_set_test(ctx->flb, ctx->o_ffd,
"formatter", cb_check_str_list,
&expected, NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* Ingest data sample */
ret = flb_lib_push(ctx->flb, ctx->i_ffd, (char *) buf1, size1);
TEST_CHECK(ret >= 0);

/* waiting to flush */
flb_time_msleep(500);

num = get_output_num();
if (!TEST_CHECK(num > 0)) {
TEST_MSG("no outputs");
}

test_ctx_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"json" , flb_test_json},
{"time_key" , flb_test_time_key},
{NULL, NULL}
};

0 comments on commit 29e7258

Please sign in to comment.