From 734ad8dde3929a81802487a5280d28c7db2a1f0a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 3 Jun 2024 19:13:16 +0900 Subject: [PATCH] out_s3: Make temporary directory for parquet processing to be configurable Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 33 ++++++++++++++------------------- plugins/out_s3/s3.h | 9 +++++++++ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index e190131f68a..487085ba2d4 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1457,34 +1457,23 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } #else -static const char *get_tmpdir() -{ - const char* tmp; -#ifdef __ANDROID__ - tmp = "/data/local/tmp"; -#else - tmp = "/tmp"; -#endif - return tmp; -} - -static int create_tmpfile(char *file_path, char *template, size_t template_len) +static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, size_t template_len) { int ret; int result; flb_sds_t path_buf; - const char *tmpdir; - size_t tmpdir_len; + const char *process_dir; + size_t process_dir_len; path_buf = flb_sds_create_size(PATH_MAX); if (path_buf == NULL) { goto error; } - tmpdir = get_tmpdir(); - tmpdir_len = strlen(tmpdir); + process_dir = ctx->parquet_process_dir; + process_dir_len = flb_sds_len(ctx->parquet_process_dir); - result = flb_sds_cat_safe(&path_buf, tmpdir, tmpdir_len); + result = flb_sds_cat_safe(&path_buf, process_dir, process_dir_len); if (result < 0) { ret = -1; goto error; @@ -1546,13 +1535,13 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = create_tmpfile(infile, template_in_suffix, strlen(template_in_suffix)); + result = create_tmpfile(ctx, infile, template_in_suffix, strlen(template_in_suffix)); if (result < 0) { ret = -1; goto error; } - result = create_tmpfile(outfile, template_out_suffix, strlen(template_out_suffix)); + result = create_tmpfile(ctx, outfile, template_out_suffix, strlen(template_out_suffix)); if (result < 0) { ret = -1; goto error; @@ -3120,6 +3109,12 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Schema file for parquet objects. " }, + { + FLB_CONFIG_MAP_STR, "parquet.process_dir", DEFAULT_PARQUET_PROCESS_DIR, + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_process_dir), + "Specify a temporary directory for processing parquet objects. " + "This paramater is effective for non Windows platforms. " + }, { FLB_CONFIG_MAP_STR, "content_type", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 41c7e062ca7..e9e004d10c1 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -58,6 +58,14 @@ #define DEFAULT_PARQUET_COMMAND_CHECK "columnify -h > /dev/null 2>&1" #endif +#if !defined(FLB_SYSTEM_WINDOWS) +#ifdef __ANDROID__ +#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp" +#else +#define DEFAULT_PARQUET_PROCESS_DIR "/tmp" +#endif +#endif + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -165,6 +173,7 @@ struct flb_s3 { flb_sds_t parquet_record_type; flb_sds_t parquet_schema_type; flb_sds_t parquet_schema_file; + flb_sds_t parquet_process_dir; /* * used to track that unset buffers were found on startup that have not