diff --git a/plugins/filter_kubernetes/kube_conf.c b/plugins/filter_kubernetes/kube_conf.c index f562292409d..73ba2801518 100644 --- a/plugins/filter_kubernetes/kube_conf.c +++ b/plugins/filter_kubernetes/kube_conf.c @@ -89,17 +89,6 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins, if (ctx->use_tag_for_meta) { ctx->api_https = FLB_FALSE; } - else if (ctx->use_kubelet) { - ctx->api_host = flb_strdup(ctx->kubelet_host); - ctx->api_port = ctx->kubelet_port; - ctx->api_https = FLB_TRUE; - - /* This is for unit test diagnostic purposes */ - if (ctx->meta_preload_cache_dir) { - ctx->api_https = FLB_FALSE; - } - - } else if (!url) { ctx->api_host = flb_strdup(FLB_API_HOST); ctx->api_port = FLB_API_PORT; @@ -136,11 +125,6 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins, } } - snprintf(ctx->kube_url, sizeof(ctx->kube_url) - 1, - "%s://%s:%i", - ctx->api_https ? "https" : "http", - ctx->api_host, ctx->api_port); - if (ctx->kube_meta_cache_ttl > 0) { ctx->hash_table = flb_hash_table_create_with_ttl(ctx->kube_meta_cache_ttl, FLB_HASH_TABLE_EVICT_OLDER, @@ -153,7 +137,22 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins, FLB_HASH_TABLE_SIZE); } - if (!ctx->hash_table) { + if (ctx->kube_meta_namespace_cache_ttl > 0) { + ctx->namespace_hash_table = flb_hash_table_create_with_ttl( + ctx->kube_meta_namespace_cache_ttl, + FLB_HASH_TABLE_EVICT_OLDER, + FLB_HASH_TABLE_SIZE, + FLB_HASH_TABLE_SIZE); + } + else { + ctx->namespace_hash_table = flb_hash_table_create( + FLB_HASH_TABLE_EVICT_RANDOM, + FLB_HASH_TABLE_SIZE, + FLB_HASH_TABLE_SIZE); + } + + + if (!ctx->hash_table || !ctx->namespace_hash_table) { flb_kube_conf_destroy(ctx); return NULL; } @@ -203,6 +202,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) flb_hash_table_destroy(ctx->hash_table); } + if (ctx->namespace_hash_table) { + flb_hash_table_destroy(ctx->namespace_hash_table); + } + if (ctx->merge_log == FLB_TRUE) { flb_free(ctx->unesc_buf); } @@ -218,14 +221,20 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) flb_free(ctx->podname); flb_free(ctx->auth); - if (ctx->upstream) { - flb_upstream_destroy(ctx->upstream); + if (ctx->kubelet_upstream) { + flb_upstream_destroy(ctx->kubelet_upstream); + } + if (ctx->kube_api_upstream) { + flb_upstream_destroy(ctx->kube_api_upstream); } #ifdef FLB_HAVE_TLS if (ctx->tls) { flb_tls_destroy(ctx->tls); } + if (ctx->kubelet_tls) { + flb_tls_destroy(ctx->kubelet_tls); + } #endif flb_free(ctx); diff --git a/plugins/filter_kubernetes/kube_conf.h b/plugins/filter_kubernetes/kube_conf.h index 687d484f9c0..2aa18da5973 100644 --- a/plugins/filter_kubernetes/kube_conf.h +++ b/plugins/filter_kubernetes/kube_conf.h @@ -70,13 +70,12 @@ struct kube_meta; /* Filter context */ struct flb_kube { /* Configuration parameters */ - char *api_host; - int api_port; - int api_https; int use_journal; int cache_use_docker_id; int labels; int annotations; + int namespace_labels; + int namespace_annotations; int dummy_meta; int tls_debug; int tls_verify; @@ -113,7 +112,9 @@ struct flb_kube { int keep_log; /* API Server end point */ - char kube_url[1024]; + char *api_host; + int api_port; + int api_https; /* Kubernetes tag prefix */ flb_sds_t kube_tag_prefix; @@ -158,12 +159,16 @@ struct flb_kube { int kubelet_port; int kube_meta_cache_ttl; + int kube_meta_namespace_cache_ttl; struct flb_tls *tls; + struct flb_tls *kubelet_tls; struct flb_config *config; struct flb_hash_table *hash_table; - struct flb_upstream *upstream; + struct flb_hash_table *namespace_hash_table; + struct flb_upstream *kubelet_upstream; + struct flb_upstream *kube_api_upstream; struct flb_filter_instance *ins; }; diff --git a/plugins/filter_kubernetes/kube_meta.c b/plugins/filter_kubernetes/kube_meta.c index 4850ad9dd2c..45d9c445b34 100644 --- a/plugins/filter_kubernetes/kube_meta.c +++ b/plugins/filter_kubernetes/kube_meta.c @@ -306,10 +306,16 @@ static int get_meta_file_info(struct flb_kube *ctx, const char *namespace, int ret; char uri[1024]; - if (ctx->meta_preload_cache_dir && namespace && podname) { + if (ctx->meta_preload_cache_dir && namespace) { - ret = snprintf(uri, sizeof(uri) - 1, "%s/%s_%s.meta", - ctx->meta_preload_cache_dir, namespace, podname); + if (podname && strlen(podname) > 0) { + ret = snprintf(uri, sizeof(uri) - 1, "%s/%s_%s.meta", + ctx->meta_preload_cache_dir, namespace, podname); + } + else { + ret = snprintf(uri, sizeof(uri) - 1, "%s/%s.namespace_meta", + ctx->meta_preload_cache_dir, namespace); + } if (ret > 0) { fd = open(uri, O_RDONLY, 0); if (fd != -1) { @@ -351,22 +357,37 @@ static int get_meta_info_from_request(struct flb_kube *ctx, const char *podname, char **buffer, size_t *size, int *root_type, - char* uri) + char* uri, + int use_kubelet_connection) { struct flb_http_client *c; struct flb_connection *u_conn; int ret; size_t b_sent; int packed; + + if(use_kubelet_connection == FLB_TRUE) { + if (!ctx->kubelet_upstream) { + return -1; + } - if (!ctx->upstream) { - return -1; - } + u_conn = flb_upstream_conn_get(ctx->kubelet_upstream); + } + else { + if (!ctx->kube_api_upstream) { + return -1; + } - u_conn = flb_upstream_conn_get(ctx->upstream); + u_conn = flb_upstream_conn_get(ctx->kube_api_upstream); + } if (!u_conn) { - flb_plg_error(ctx->ins, "kubelet upstream connection error"); + if(use_kubelet_connection == FLB_TRUE) { + flb_plg_error(ctx->ins, "kubelet upstream connection error"); + } + else { + flb_plg_error(ctx->ins, "kube api upstream connection error"); + } return -1; } @@ -443,7 +464,50 @@ static int get_pods_from_kubelet(struct flb_kube *ctx, flb_plg_debug(ctx->ins, "Send out request to Kubelet for pods information."); packed = get_meta_info_from_request(ctx, namespace, podname, - &buf, &size, &root_type, uri); + &buf, &size, &root_type, uri, + ctx->use_kubelet); + } + + /* validate pack */ + if (packed == -1) { + return -1; + } + + *out_buf = buf; + *out_size = size; + + return 0; +} + +/* Gather namespace metadata from API Server */ +static int get_namespace_api_server_info(struct flb_kube *ctx, const char *namespace, + char **out_buf, size_t *out_size) +{ + int ret; + int packed = -1; + int root_type; + char uri[1024]; + char *buf; + size_t size; + + *out_buf = NULL; + *out_size = 0; + + /* used for unit test purposes*/ + packed = get_meta_file_info(ctx, namespace, "", + &buf, &size, &root_type); + + if (packed == -1) { + ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_NAMESPACE_FMT, namespace); + + if (ret == -1) { + return -1; + } + flb_plg_debug(ctx->ins, + "Send out request to API Server for namespace information: %s", uri); + // Namespace data is only available from kuberenetes api, not kubelet + packed = get_meta_info_from_request(ctx, namespace, "", + &buf, &size, &root_type, uri, FLB_FALSE); } /* validate pack */ @@ -457,8 +521,8 @@ static int get_pods_from_kubelet(struct flb_kube *ctx, return 0; } -/* Gather metadata from API Server */ -static int get_api_server_info(struct flb_kube *ctx, +/* Gather pod metadata from API Server */ +static int get_pod_api_server_info(struct flb_kube *ctx, const char *namespace, const char *podname, char **out_buf, size_t *out_size) { @@ -478,7 +542,7 @@ static int get_api_server_info(struct flb_kube *ctx, if (packed == -1) { - ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_FMT, namespace, + ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_POD_FMT, namespace, podname); if (ret == -1) { @@ -487,7 +551,8 @@ static int get_api_server_info(struct flb_kube *ctx, flb_plg_debug(ctx->ins, "Send out request to API Server for pods information"); packed = get_meta_info_from_request(ctx, namespace, podname, - &buf, &size, &root_type, uri); + &buf, &size, &root_type, uri, + ctx->use_kubelet); } /* validate pack */ @@ -904,7 +969,146 @@ static int merge_meta_from_tag(struct flb_kube *ctx, struct flb_kube_meta *meta, return 0; } -static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, +static int merge_namespace_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, + const char *api_buf, size_t api_size, + char **out_buf, size_t *out_size) +{ + int i; + int ret; + int map_size = 0; + int meta_found = FLB_FALSE; + int have_labels = -1; + int have_annotations = -1; + size_t off = 0; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + + msgpack_unpacked api_result; + msgpack_unpacked meta_result; + msgpack_object k; + msgpack_object v; + msgpack_object meta_val; + msgpack_object api_map; + + /* + * + * - api_buf: metadata associated to namespace coming from the API server. + * + * When merging data we aim to add the following keys from the API server: + * + * - labels + * - annotations + */ + + /* Initialize output msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Iterate API server msgpack and lookup specific fields */ + if (api_buf != NULL) { + msgpack_unpacked_init(&api_result); + ret = msgpack_unpack_next(&api_result, api_buf, api_size, &off); + if (ret == MSGPACK_UNPACK_SUCCESS) { + api_map = api_result.data; + + /* At this point map points to the ROOT map, eg: + * + * { + * "kind": "Namespace", + * "apiVersion": "v1", + * "metadata": { + * "name": "fluent-bit", + * "uid": "6d1e2042-8013-449c-aa93-e7238391c45f", + * .... + * } + * + * We are interested in the 'metadata' map value. + */ + for (i = 0; !meta_found && i < api_map.via.map.size; i++) { + k = api_map.via.map.ptr[i].key; + if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) { + meta_val = api_map.via.map.ptr[i].val; + if (meta_val.type == MSGPACK_OBJECT_MAP) { + meta_found = FLB_TRUE; + } + } + } + + if (meta_found == FLB_TRUE) { + /* Process metadata map value */ + msgpack_unpacked_init(&meta_result); + for (i = 0; i < meta_val.via.map.size; i++) { + k = meta_val.via.map.ptr[i].key; + + char *ptr = (char *) k.via.str.ptr; + size_t size = k.via.str.size; + + + if (size == 6 && strncmp(ptr, "labels", 6) == 0) { + have_labels = i; + if (ctx->namespace_labels == FLB_TRUE) { + map_size++; + } + } + else if (size == 11 && strncmp(ptr, "annotations", 11) == 0) { + have_annotations = i; + if (ctx->namespace_annotations == FLB_TRUE) { + map_size++; + } + } + + if (have_labels >= 0 && have_annotations >= 0) { + break; + } + } + } + + } + } + + + /* Set Map Size */ + map_size += 1; // +1 for the namespace name + msgpack_pack_map(&mp_pck, map_size); + if (meta->namespace != NULL) { + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "name", 4); + msgpack_pack_str(&mp_pck, meta->namespace_len); + msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len); + } + + /* Append API Server content */ + if (have_labels >= 0 && ctx->namespace_labels == FLB_TRUE) { + k = meta_val.via.map.ptr[have_labels].key; + v = meta_val.via.map.ptr[have_labels].val; + + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } + + if (have_annotations >= 0 && ctx->namespace_annotations == FLB_TRUE) { + k = meta_val.via.map.ptr[have_annotations].key; + v = meta_val.via.map.ptr[have_annotations].val; + + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } + + if (api_buf != NULL) { + msgpack_unpacked_destroy(&api_result); + if (meta_found == FLB_TRUE) { + msgpack_unpacked_destroy(&meta_result); + } + } + + /* Set outgoing msgpack buffer */ + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + + return 0; +} + +static int merge_pod_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, const char *api_buf, size_t api_size, char **out_buf, size_t *out_size) { @@ -1183,10 +1387,10 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, return 0; } -static inline int extract_meta(struct flb_kube *ctx, - const char *tag, int tag_len, - const char *data, size_t data_size, - struct flb_kube_meta *meta) +static inline int parse_regex_tag_data(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + struct flb_kube_meta *meta) { int i; size_t off = 0; @@ -1203,9 +1407,6 @@ static inline int extract_meta(struct flb_kube *ctx, msgpack_object key; msgpack_object val; - /* Reset meta context */ - memset(meta, '\0', sizeof(struct flb_kube_meta)); - /* Journald */ if (ctx->use_journal == FLB_TRUE) { off = 0; @@ -1274,6 +1475,67 @@ static inline int extract_meta(struct flb_kube *ctx, /* Parse the regex results */ flb_regex_parse(ctx->regex, &result, cb_results, meta); + return 0; +} + +static inline int extract_namespace_meta(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + struct flb_kube_meta *meta) +{ + ssize_t n; + size_t off = 0; + int ret; + + /* Reset meta context */ + memset(meta, '\0', sizeof(struct flb_kube_meta)); + + ret = parse_regex_tag_data(ctx, tag, tag_len, data, data_size, meta); + if( ret != 0 ) { + return ret; + } + + /* Compose API server cache key */ + if (meta->namespace) { + n = meta->namespace_len + 1; + meta->cache_key = flb_malloc(n); + if (!meta->cache_key) { + flb_errno(); + return -1; + } + + /* Copy namespace */ + memcpy(meta->cache_key, meta->namespace, meta->namespace_len); + off = meta->namespace_len; + + meta->cache_key[off] = '\0'; + meta->cache_key_len = off; + } + else { + meta->cache_key = NULL; + meta->cache_key_len = 0; + } + + return 0; +} + +static inline int extract_pod_meta(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + struct flb_kube_meta *meta) +{ + size_t off = 0; + ssize_t n; + int ret; + + /* Reset meta context */ + memset(meta, '\0', sizeof(struct flb_kube_meta)); + + ret = parse_regex_tag_data(ctx, tag, tag_len, data, data_size, meta); + if( ret != 0 ) { + return ret; + } + /* Compose API server cache key */ if (meta->podname && meta->namespace) { /* calculate estimated buffer size */ @@ -1326,11 +1588,38 @@ static inline int extract_meta(struct flb_kube *ctx, return 0; } +/* + * Given a fixed meta data (namespace), get API server information + * and merge buffers. + */ +static int get_and_merge_namespace_meta(struct flb_kube *ctx, struct flb_kube_meta *meta, + char **out_buf, size_t *out_size) +{ + int ret; + char *api_buf; + size_t api_size; + + ret = get_namespace_api_server_info(ctx, meta->namespace, + &api_buf, &api_size); + if (ret == -1) { + return -1; + } + + ret = merge_namespace_meta(meta, ctx, api_buf, api_size, + out_buf, out_size); + + if (api_buf != NULL) { + flb_free(api_buf); + } + + return ret; +} + /* * Given a fixed meta data (namespace and podname), get API server information * and merge buffers. */ -static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta, +static int get_and_merge_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *meta, char **out_buf, size_t *out_size) { int ret; @@ -1346,14 +1635,14 @@ static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta, &api_buf, &api_size); } else { - ret = get_api_server_info(ctx, meta->namespace, meta->podname, + ret = get_pod_api_server_info(ctx, meta->namespace, meta->podname, &api_buf, &api_size); } if (ret == -1) { return -1; } - ret = merge_meta(meta, ctx, + ret = merge_pod_meta(meta, ctx, api_buf, api_size, out_buf, out_size); @@ -1390,12 +1679,65 @@ static int wait_for_dns(struct flb_kube *ctx) return -1; } +static int flb_kubelet_network_init(struct flb_kube *ctx, struct flb_config *config) +{ + int io_type = FLB_IO_TCP; + int api_https = FLB_TRUE; + ctx->kubelet_upstream = NULL; + + if(ctx->use_kubelet == FLB_FALSE) { + return 0; + } + + // This is for unit test diagnostic purposes + if (ctx->meta_preload_cache_dir) { + api_https = FLB_FALSE; + } + + if (api_https == FLB_TRUE) { + if (!ctx->tls_ca_path && !ctx->tls_ca_file) { + ctx->tls_ca_file = flb_strdup(FLB_KUBE_CA); + } + ctx->kubelet_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + ctx->tls_verify, + ctx->tls_debug, + ctx->tls_vhost, + ctx->tls_ca_path, + ctx->tls_ca_file, + NULL, NULL, NULL); + if (!ctx->kubelet_tls) { + return -1; + } + + io_type = FLB_IO_TLS; + } + + /* Create an Upstream context */ + ctx->kubelet_upstream = flb_upstream_create(config, + ctx->kubelet_host, + ctx->kubelet_port, + io_type, + ctx->kubelet_tls); + if (!ctx->kubelet_upstream) { + /* note: if ctx->tls.context is set, it's destroyed upon context exit */ + flb_plg_debug(ctx->ins, "kubelet network init create upstream failed"); + return -1; + } + + /* Remove async flag from upstream */ + flb_stream_disable_async_mode(&ctx->kubelet_upstream->base); + + return 0; +} + static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config) { int io_type = FLB_IO_TCP; + int kubelet_network_init_ret = 0; - ctx->upstream = NULL; + ctx->kube_api_upstream = NULL; + /* Initialize Kube API Connection */ if (ctx->api_https == FLB_TRUE) { if (!ctx->tls_ca_path && !ctx->tls_ca_file) { ctx->tls_ca_file = flb_strdup(FLB_KUBE_CA); @@ -1415,21 +1757,22 @@ static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config } /* Create an Upstream context */ - ctx->upstream = flb_upstream_create(config, + ctx->kube_api_upstream = flb_upstream_create(config, ctx->api_host, ctx->api_port, io_type, ctx->tls); - if (!ctx->upstream) { + if (!ctx->kube_api_upstream) { /* note: if ctx->tls.context is set, it's destroyed upon context exit */ flb_plg_debug(ctx->ins, "kube network init create upstream failed"); return -1; } /* Remove async flag from upstream */ - flb_stream_disable_async_mode(&ctx->upstream->base); - - return 0; + flb_stream_disable_async_mode(&ctx->kube_api_upstream->base); + + kubelet_network_init_ret = flb_kubelet_network_init(ctx, config); + return kubelet_network_init_ret; } /* Initialize local context */ @@ -1467,14 +1810,15 @@ int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config) /* Gather info from Kubelet */ flb_plg_info(ctx->ins, "testing connectivity with Kubelet..."); ret = get_pods_from_kubelet(ctx, ctx->namespace, ctx->podname, - &meta_buf, &meta_size); + &meta_buf, &meta_size); } else { /* Gather info from API server */ flb_plg_info(ctx->ins, "testing connectivity with API server..."); - ret = get_api_server_info(ctx, ctx->namespace, ctx->podname, + ret = get_pod_api_server_info(ctx, ctx->namespace, ctx->podname, &meta_buf, &meta_size); } + if (ret == -1) { if (!ctx->podname) { flb_plg_warn(ctx->ins, "could not get meta for local POD"); @@ -1485,6 +1829,19 @@ int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config) } return -1; } + + // Using namespace labels/annotations requires a kube api connection (even if Use_Kubelet On) + if(ctx->namespace_labels == FLB_TRUE || ctx->namespace_annotations == FLB_TRUE) { + // Ensure we have read access to the namespace the local pod is running under + flb_plg_info(ctx->ins, "testing connectivity with API server for namespaces..."); + ret = get_namespace_api_server_info(ctx, ctx->namespace, &meta_buf, &meta_size); + } + if (ret == -1) { + flb_plg_warn(ctx->ins, "could not get meta for namespace %s", + ctx->namespace); + return -1; + } + flb_plg_info(ctx->ins, "connectivity OK"); flb_free(meta_buf); } @@ -1528,7 +1885,7 @@ int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size) return 0; } -int flb_kube_meta_get(struct flb_kube *ctx, +static inline int flb_kube_pod_meta_get(struct flb_kube *ctx, const char *tag, int tag_len, const char *data, size_t data_size, const char **out_buf, size_t *out_size, @@ -1544,7 +1901,7 @@ int flb_kube_meta_get(struct flb_kube *ctx, msgpack_unpacked result; /* Get metadata from tag or record (cache key is the important one) */ - ret = extract_meta(ctx, tag, tag_len, data, data_size, meta); + ret = extract_pod_meta(ctx, tag, tag_len, data, data_size, meta); if (ret != 0) { return -1; } @@ -1555,7 +1912,7 @@ int flb_kube_meta_get(struct flb_kube *ctx, (void *) &hash_meta_buf, &hash_meta_size); if (ret == -1) { /* Retrieve API server meta and merge with local meta */ - ret = get_and_merge_meta(ctx, meta, + ret = get_and_merge_pod_meta(ctx, meta, &tmp_hash_meta_buf, &hash_meta_size); if (ret == -1) { *out_buf = NULL; @@ -1568,8 +1925,8 @@ int flb_kube_meta_get(struct flb_kube *ctx, tmp_hash_meta_buf, hash_meta_size); if (id >= 0) { /* - * Release the original buffer created on extract_meta() as a new - * copy have been generated into the hash table, then re-set + * Release the original buffer created on extract_pod_meta() as a new + * copy has been generated into the hash table, then re-set * the outgoing buffer and size. */ flb_free(tmp_hash_meta_buf); @@ -1608,6 +1965,107 @@ int flb_kube_meta_get(struct flb_kube *ctx, return 0; } +static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + const char **out_buf, size_t *out_size, + struct flb_kube_meta *meta) +{ + int id; + int ret; + const char *hash_meta_buf; + char *tmp_hash_meta_buf; + size_t off = 0; + size_t hash_meta_size; + msgpack_unpacked result; + + /* Get metadata from tag or record (cache key is the important one) */ + ret = extract_namespace_meta(ctx, tag, tag_len, data, data_size, meta); + if (ret != 0) { + return -1; + } + + /* Check if we have some data associated to the cache key */ + ret = flb_hash_table_get(ctx->namespace_hash_table, + meta->cache_key, meta->cache_key_len, + (void *) &hash_meta_buf, &hash_meta_size); + if (ret == -1) { + /* Retrieve API server meta and merge with local meta */ + ret = get_and_merge_namespace_meta(ctx, meta, + &tmp_hash_meta_buf, &hash_meta_size); + if (ret == -1) { + *out_buf = NULL; + *out_size = 0; + return 0; + } + + id = flb_hash_table_add(ctx->namespace_hash_table, + meta->cache_key, meta->cache_key_len, + tmp_hash_meta_buf, hash_meta_size); + if (id >= 0) { + /* + * Release the original buffer created on extract_namespace_meta() + * as a new copy has been generated into the hash table, then reset + * the outgoing buffer and size. + */ + flb_free(tmp_hash_meta_buf); + flb_hash_table_get_by_id(ctx->namespace_hash_table, id, meta->cache_key, + &hash_meta_buf, &hash_meta_size); + } + } + + /* + * The retrieved buffer may have serialized items: + * + * [0] = kubernetes metadata (annotations, labels) + * + */ + msgpack_unpacked_init(&result); + + /* Unpack to get the offset/bytes of the first item */ + msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off); + + /* Set the pointer and proper size for the caller */ + *out_buf = hash_meta_buf; + *out_size = off; + + msgpack_unpacked_destroy(&result); + + return 0; +} + +int flb_kube_meta_get(struct flb_kube *ctx, + const char *tag, int tag_len, + const char *data, size_t data_size, + const char **out_buf, size_t *out_size, + const char **namespace_out_buf, + size_t *namespace_out_size, + struct flb_kube_meta *meta, + struct flb_kube_props *props, + struct flb_kube_meta *namespace_meta + ) +{ + int ret_namespace_meta = -1; + int ret_pod_meta = -1; + + if(ctx->namespace_labels == FLB_TRUE || ctx->namespace_annotations == FLB_TRUE) { + ret_namespace_meta = flb_kube_namespace_meta_get(ctx, tag, tag_len, data, + data_size, namespace_out_buf, namespace_out_size, namespace_meta); + } + + if(ctx->labels == FLB_TRUE || ctx->annotations == FLB_TRUE) { + ret_pod_meta = flb_kube_pod_meta_get(ctx, tag, tag_len, data, data_size, + out_buf, out_size, meta, props); + } + + // If we get metadata from either namespace or pod info, return success + if( ret_pod_meta == 0 || ret_namespace_meta == 0) { + return 0; + } + + return -1; +} + int flb_kube_meta_release(struct flb_kube_meta *meta) { int r = 0; diff --git a/plugins/filter_kubernetes/kube_meta.h b/plugins/filter_kubernetes/kube_meta.h index c6f425ca13a..e4e3b06685a 100644 --- a/plugins/filter_kubernetes/kube_meta.h +++ b/plugins/filter_kubernetes/kube_meta.h @@ -52,7 +52,8 @@ struct flb_kube_meta { #define FLB_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" #define FLB_KUBE_API_HOST "kubernetes.default.svc" #define FLB_KUBE_API_PORT 443 -#define FLB_KUBE_API_FMT "/api/v1/namespaces/%s/pods/%s" +#define FLB_KUBE_API_POD_FMT "/api/v1/namespaces/%s/pods/%s" +#define FLB_KUBE_API_NAMESPACE_FMT "/api/v1/namespaces/%s" #define FLB_KUBELET_PODS "/pods" int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config); @@ -62,8 +63,11 @@ int flb_kube_meta_get(struct flb_kube *ctx, const char *tag, int tag_len, const char *data, size_t data_size, const char **out_buf, size_t *out_size, + const char **namespace_out_buf, + size_t *namespace_out_size, struct flb_kube_meta *meta, - struct flb_kube_props *props); + struct flb_kube_props *props, + struct flb_kube_meta *namespace_meta); int flb_kube_meta_release(struct flb_kube_meta *meta); #endif diff --git a/plugins/filter_kubernetes/kubernetes.c b/plugins/filter_kubernetes/kubernetes.c index 65d619160ba..877c1f7ac1d 100644 --- a/plugins/filter_kubernetes/kubernetes.c +++ b/plugins/filter_kubernetes/kubernetes.c @@ -214,7 +214,8 @@ static int cb_kube_init(struct flb_filter_instance *f_ins, static int pack_map_content(struct flb_log_event_encoder *log_encoder, msgpack_object source_map, const char *kube_buf, size_t kube_size, - struct flb_kube_meta *meta, + const char *namespace_kube_buf, + size_t namespace_kube_size, struct flb_time *time_lookup, struct flb_parser *parser, struct flb_kube *ctx) @@ -512,6 +513,29 @@ static int pack_map_content(struct flb_log_event_encoder *log_encoder, return -8; } + /* Kubernetes Namespace */ + if (namespace_kube_buf && namespace_kube_size > 0) { + ret = flb_log_event_encoder_append_body_cstring( + log_encoder, + "kubernetes_namespace"); + + off = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, namespace_kube_buf, + namespace_kube_size, &off); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder, + (char *) namespace_kube_buf, off); + } + + msgpack_unpacked_destroy(&result); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -8; + } + return 0; } @@ -529,11 +553,14 @@ static int cb_kube_filter(const void *data, size_t bytes, char *dummy_cache_buf = NULL; const char *cache_buf = NULL; size_t cache_size = 0; + const char *namespace_cache_buf = NULL; + size_t namespace_cache_size = 0; msgpack_object map; struct flb_parser *parser = NULL; struct flb_kube *ctx = filter_context; struct flb_kube_meta meta = {0}; struct flb_kube_props props = {0}; + struct flb_kube_meta namespace_meta = {0}; struct flb_log_event_encoder log_encoder; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; @@ -552,7 +579,10 @@ static int cb_kube_filter(const void *data, size_t bytes, ret = flb_kube_meta_get(ctx, tag, tag_len, data, bytes, - &cache_buf, &cache_size, &meta, &props); + &cache_buf, &cache_size, + &namespace_cache_buf, &namespace_cache_size, + &meta, &props, + &namespace_meta); } if (ret == -1) { return FLB_FILTER_NOTOUCH; @@ -567,6 +597,7 @@ static int cb_kube_filter(const void *data, size_t bytes, flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); return FLB_FILTER_NOTOUCH; } @@ -581,6 +612,7 @@ static int cb_kube_filter(const void *data, size_t bytes, flb_log_event_decoder_destroy(&log_decoder); flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); return FLB_FILTER_NOTOUCH; } @@ -600,7 +632,10 @@ static int cb_kube_filter(const void *data, size_t bytes, ret = flb_kube_meta_get(ctx, tag, tag_len, (char *) data + pre, off - pre, - &cache_buf, &cache_size, &meta, &props); + &cache_buf, &cache_size, + &namespace_cache_buf, &namespace_cache_size, + &meta, &props, + &namespace_meta); if (ret == -1) { continue; } @@ -618,6 +653,7 @@ static int cb_kube_filter(const void *data, size_t bytes, if (ctx->use_journal == FLB_TRUE) { flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); } continue; } @@ -633,6 +669,7 @@ static int cb_kube_filter(const void *data, size_t bytes, if (ctx->use_journal == FLB_TRUE) { flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); } continue; } @@ -667,7 +704,8 @@ static int cb_kube_filter(const void *data, size_t bytes, ret = pack_map_content(&log_encoder, map, cache_buf, cache_size, - &meta, &log_event.timestamp, parser, ctx); + namespace_cache_buf, namespace_cache_size, + &log_event.timestamp, parser, ctx); if (ret != 0) { flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -678,6 +716,7 @@ static int cb_kube_filter(const void *data, size_t bytes, flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); return FLB_FILTER_NOTOUCH; } @@ -693,6 +732,7 @@ static int cb_kube_filter(const void *data, size_t bytes, if (ctx->use_journal == FLB_TRUE) { flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); } } @@ -700,6 +740,7 @@ static int cb_kube_filter(const void *data, size_t bytes, if (ctx->use_journal == FLB_FALSE) { flb_kube_meta_release(&meta); flb_kube_prop_destroy(&props); + flb_kube_meta_release(&namespace_meta); } if (ctx->dummy_meta == FLB_TRUE) { @@ -861,6 +902,19 @@ static struct flb_config_map config_map[] = { "include Kubernetes annotations on every record" }, + /* Include Kubernetes Namespace Labels in the final record ? */ + { + FLB_CONFIG_MAP_BOOL, "namespace_labels", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, namespace_labels), + "include Kubernetes namespace labels on every record" + }, + /* Include Kubernetes Namespace Annotations in the final record ? */ + { + FLB_CONFIG_MAP_BOOL, "namespace_annotations", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, namespace_annotations), + "include Kubernetes namespace annotations on every record" + }, + /* * The Application may 'propose' special configuration keys * to the logging agent (Fluent Bit) through the annotations @@ -985,6 +1039,14 @@ static struct flb_config_map config_map[] = { "For example, set this value to 60 or 60s and cache entries " "which have been created more than 60s will be evicted" }, + { + FLB_CONFIG_MAP_TIME, "kube_meta_namespace_cache_ttl", "15m", + 0, FLB_TRUE, offsetof(struct flb_kube, kube_meta_namespace_cache_ttl), + "configurable TTL for K8s cached namespace metadata. " + "By default, it is set to 15m and cached entries will be evicted after 15m." + "Setting this to 0 will disable the cache TTL and " + "will evict entries once the cache reaches capacity." + }, /* EOF */ {0} }; diff --git a/tests/runtime/data/kubernetes/log/core/core_base-with-namespace-labels-and-annotations_fluent-bit.log b/tests/runtime/data/kubernetes/log/core/core_base-with-namespace-labels-and-annotations_fluent-bit.log new file mode 100644 index 00000000000..259723131be --- /dev/null +++ b/tests/runtime/data/kubernetes/log/core/core_base-with-namespace-labels-and-annotations_fluent-bit.log @@ -0,0 +1 @@ +{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"} diff --git a/tests/runtime/data/kubernetes/meta/core.namespace_meta b/tests/runtime/data/kubernetes/meta/core.namespace_meta new file mode 100644 index 00000000000..12e10df0307 --- /dev/null +++ b/tests/runtime/data/kubernetes/meta/core.namespace_meta @@ -0,0 +1,19 @@ +{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": { + "annotations": { + "fake-annotation1": "test1", + "fake-annotation2": "test2" + }, + "creationTimestamp": "2019-04-03T09:29:00Z", + "labels": { + "fake-namespace-label1": "label1", + "fake-namespace-label2": "label2" + }, + "name": "core", + "resourceVersion": "74466568", + "uid": "e9f2963f-55f2-11e9-84c5-02e422b8a84b" + }, + "spec": {} +} diff --git a/tests/runtime/data/kubernetes/meta/core_base-with-namespace-labels-and-annotations.meta b/tests/runtime/data/kubernetes/meta/core_base-with-namespace-labels-and-annotations.meta new file mode 100644 index 00000000000..3ccca44357e --- /dev/null +++ b/tests/runtime/data/kubernetes/meta/core_base-with-namespace-labels-and-annotations.meta @@ -0,0 +1,116 @@ +{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": { + "prometheus.io/path": "/api/v1/metrics/prometheus", + "prometheus.io/port": "2020", + "prometheus.io/scrape": "true" + }, + "creationTimestamp": "2019-04-03T09:29:00Z", + "labels": { + "app.kubernetes.io/name": "fluent-bit" + }, + "name": "base", + "namespace": "core", + "resourceVersion": "74466568", + "selfLink": "/api/v1/namespaces/core/pods/base", + "uid": "e9f2963f-55f2-11e9-84c5-02e422b8a84a" + }, + "spec": { + "containers": [ + { + "image": "fluent/fluent-bit", + "imagePullPolicy": "Always", + "name": "fluent-bit", + "resources": {}, + "stdin": true, + "stdinOnce": true, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "tty": true, + "volumeMounts": [ + { + "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount", + "name": "default-token-9ffht", + "readOnly": true + } + ] + } + ], + "dnsPolicy": "ClusterFirst", + "nodeName": "ip-10-49-18-80.eu-west-1.compute.internal", + "restartPolicy": "Never", + "schedulerName": "default-scheduler", + "securityContext": {}, + "serviceAccount": "default", + "serviceAccountName": "default", + "terminationGracePeriodSeconds": 30, + "tolerations": [ + { + "effect": "NoExecute", + "key": "node.kubernetes.io/not-ready", + "operator": "Exists", + "tolerationSeconds": 300 + }, + { + "effect": "NoExecute", + "key": "node.kubernetes.io/unreachable", + "operator": "Exists", + "tolerationSeconds": 300 + } + ], + "volumes": [ + { + "name": "default-token-9ffht", + "secret": { + "defaultMode": 420, + "secretName": "default-token-9ffht" + } + } + ] + }, + "status": { + "conditions": [ + { + "lastProbeTime": null, + "lastTransitionTime": "2019-04-03T09:29:00Z", + "status": "True", + "type": "Initialized" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2019-04-03T09:29:06Z", + "status": "True", + "type": "Ready" + }, + { + "lastProbeTime": null, + "lastTransitionTime": "2019-04-03T09:29:00Z", + "status": "True", + "type": "PodScheduled" + } + ], + "containerStatuses": [ + { + "containerID": "docker://c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16", + "image": "fluent/fluent-bit:latest", + "imageID": "docker-pullable://fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f", + "lastState": {}, + "name": "fluent-bit", + "ready": true, + "restartCount": 0, + "state": { + "running": { + "startedAt": "2019-04-03T09:29:05Z" + } + } + } + ], + "hostIP": "10.49.18.80", + "phase": "Running", + "podIP": "100.116.192.42", + "qosClass": "BestEffort", + "startTime": "2019-04-03T09:29:00Z" + } +} diff --git a/tests/runtime/data/kubernetes/out/core/core_base-with-namespace-labels-and-annotations_fluent-bit.out b/tests/runtime/data/kubernetes/out/core/core_base-with-namespace-labels-and-annotations_fluent-bit.out new file mode 100644 index 00000000000..c5063d17300 --- /dev/null +++ b/tests/runtime/data/kubernetes/out/core/core_base-with-namespace-labels-and-annotations_fluent-bit.out @@ -0,0 +1 @@ +[1554141513.598656,{"log":"Fluent Bit is logging\n","stream":"stdout","kubernetes":{"pod_name":"base-with-namespace-labels-and-annotations","namespace_name":"core","pod_id":"e9f2963f-55f2-11e9-84c5-02e422b8a84a","labels":{"app.kubernetes.io/name":"fluent-bit"},"annotations":{"prometheus.io/path":"/api/v1/metrics/prometheus","prometheus.io/port":"2020","prometheus.io/scrape":"true"},"host":"ip-10-49-18-80.eu-west-1.compute.internal","container_name":"fluent-bit","docker_id":"c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16","container_hash":"fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f","container_image":"fluent/fluent-bit:latest"},"kubernetes_namespace":{"name":"core","labels":{"fake-namespace-label1":"label1","fake-namespace-label2":"label2"},"annotations":{"fake-annotation1":"test1","fake-annotation2":"test2"}}}] diff --git a/tests/runtime/filter_kubernetes.c b/tests/runtime/filter_kubernetes.c index 46c9a966dff..0693f601ba4 100644 --- a/tests/runtime/filter_kubernetes.c +++ b/tests/runtime/filter_kubernetes.c @@ -400,6 +400,16 @@ static void flb_test_core_unescaping_json() flb_test_core("core_unescaping_json", NULL, 1); } +#define flb_test_namespace_labels_and_annotations(target, suffix, nExpected) \ + kube_test("core/" target, KUBE_TAIL, suffix, nExpected, \ + "Namespace_labels", "On", \ + "Namespace_annotations", "On", \ + NULL); \ + +static void flb_test_core_base_with_namespace_labels_and_annotations() +{ + flb_test_namespace_labels_and_annotations("core_base-with-namespace-labels-and-annotations_fluent-bit", NULL, 1); +} #define flb_test_options_use_kubelet_enabled(target, suffix, nExpected) \ kube_test("options/" target, KUBE_TAIL, suffix, nExpected, \ @@ -994,6 +1004,7 @@ TEST_LIST = { {"kube_core_no_meta", flb_test_core_no_meta}, {"kube_core_unescaping_text", flb_test_core_unescaping_text}, {"kube_core_unescaping_json", flb_test_core_unescaping_json}, + {"kube_core_base_with_namespace_labels_and_annotations", flb_test_core_base_with_namespace_labels_and_annotations}, {"kube_options_use-kubelet_enabled_json", flb_test_options_use_kubelet_enabled_json}, {"kube_options_use-kubelet_disabled_json", flb_test_options_use_kubelet_disabled_json}, {"kube_options_merge_log_enabled_text", flb_test_options_merge_log_enabled_text},