-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: composer <[email protected]>
- Loading branch information
1 parent
29e7258
commit f0dcdf2
Showing
1 changed file
with
48 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,8 +66,9 @@ static int cb_doris_init(struct flb_output_instance *ins, | |
} | ||
|
||
static int http_put(struct flb_out_doris *ctx, | ||
const void *body, size_t body_len, | ||
const char *tag, int tag_len) | ||
const char *host, int port, | ||
const void *body, size_t body_len, | ||
const char *tag, int tag_len) | ||
{ | ||
int ret; | ||
int out_ret = FLB_OK; | ||
|
@@ -79,7 +80,16 @@ static int http_put(struct flb_out_doris *ctx, | |
struct flb_http_client *c; | ||
|
||
/* Get upstream context and connection */ | ||
u = ctx->u; | ||
if (strcmp(host, ctx->host) == 0 && port == ctx->port) { | ||
u = ctx->u; | ||
} | ||
else { | ||
u = flb_upstream_create(ctx->u->base.config, | ||
host, | ||
port, | ||
ctx->u->base.flags, | ||
ctx->u->base.tls_context); | ||
} | ||
u_conn = flb_upstream_conn_get(u); | ||
if (!u_conn) { | ||
flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", | ||
|
@@ -94,7 +104,7 @@ static int http_put(struct flb_out_doris *ctx, | |
/* Create HTTP client context */ | ||
c = flb_http_client(u_conn, FLB_HTTP_PUT, ctx->uri, | ||
payload_buf, payload_size, | ||
ctx->host, ctx->port, | ||
host, port, | ||
NULL, 0); | ||
|
||
/* | ||
|
@@ -120,13 +130,32 @@ static int http_put(struct flb_out_doris *ctx, | |
|
||
ret = flb_http_do(c, &b_sent); | ||
if (ret == 0) { | ||
flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", | ||
ctx->host, ctx->port, | ||
c->resp.status, c->resp.payload); | ||
if (c->resp.payload_size > 0 && | ||
(strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL || | ||
strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) { | ||
flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", | ||
host, port, | ||
c->resp.status, c->resp.payload); | ||
if (c->resp.status == 307) { // redict | ||
// example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load? | ||
char* location = strstr(c->resp.data, "Location:"); | ||
char* start = strstr(location, "@") + 1; | ||
char* mid = strstr(start, ":"); | ||
char* end = strstr(mid, "/api"); | ||
char redict_host[50] = {0}; | ||
memcpy(redict_host, start, mid - start); | ||
char redict_port[10] = {0}; | ||
memcpy(redict_port, mid + 1, end - (mid + 1)); | ||
|
||
out_ret = http_put(ctx, redict_host, atoi(redict_port), | ||
body, body_len, tag, tag_len); | ||
} | ||
else if (c->resp.status == 200) { | ||
if (c->resp.payload_size > 0 && | ||
(strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL || | ||
strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) { | ||
// continue | ||
} | ||
else { | ||
out_ret = FLB_RETRY; | ||
} | ||
} | ||
else { | ||
out_ret = FLB_RETRY; | ||
|
@@ -154,6 +183,11 @@ static int http_put(struct flb_out_doris *ctx, | |
/* Release the TCP connection */ | ||
flb_upstream_conn_release(u_conn); | ||
|
||
/* Release flb_upstream */ | ||
if (u != ctx->u) { | ||
flb_upstream_destroy(u); | ||
} | ||
|
||
return out_ret; | ||
} | ||
|
||
|
@@ -169,7 +203,7 @@ static int compose_payload(struct flb_out_doris *ctx, | |
encoded = flb_pack_msgpack_to_json_format(in_body, | ||
in_size, | ||
FLB_PACK_JSON_FORMAT_JSON, | ||
FLB_PACK_JSON_DATE_DOUBLE, | ||
FLB_PACK_JSON_DATE_EPOCH, | ||
ctx->time_key); | ||
if (encoded == NULL) { | ||
flb_plg_error(ctx->ins, "failed to convert json"); | ||
|
@@ -178,7 +212,7 @@ static int compose_payload(struct flb_out_doris *ctx, | |
*out_body = (void*)encoded; | ||
*out_size = flb_sds_len(encoded); | ||
|
||
flb_plg_info(ctx->ins, "%s", (char*) *out_body); | ||
flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body); | ||
|
||
return FLB_OK; | ||
} | ||
|
@@ -202,8 +236,8 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk, | |
FLB_OUTPUT_RETURN(ret); | ||
} | ||
|
||
ret = http_put(ctx, out_body, out_size, | ||
event_chunk->tag, flb_sds_len(event_chunk->tag)); | ||
ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size, | ||
event_chunk->tag, flb_sds_len(event_chunk->tag)); | ||
flb_sds_destroy(out_body); | ||
|
||
FLB_OUTPUT_RETURN(ret); | ||
|
@@ -219,8 +253,6 @@ static int cb_doris_exit(void *data, struct flb_config *config) | |
|
||
/* Configuration properties map */ | ||
static struct flb_config_map config_map[] = { | ||
// host | ||
// port | ||
// user | ||
{ | ||
FLB_CONFIG_MAP_STR, "user", NULL, | ||
|