Skip to content

Commit

Permalink
out_s3: Make temporary directory for parquet processing to be configu…
Browse files Browse the repository at this point in the history
…rable

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Jun 3, 2024
1 parent c2388c4 commit 734ad8d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
33 changes: 14 additions & 19 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -1457,34 +1457,23 @@ static int s3_compress_parquet(struct flb_s3 *ctx,
}

#else
static const char *get_tmpdir()
{
const char* tmp;
#ifdef __ANDROID__
tmp = "/data/local/tmp";
#else
tmp = "/tmp";
#endif
return tmp;
}

static int create_tmpfile(char *file_path, char *template, size_t template_len)
static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, size_t template_len)
{
int ret;
int result;
flb_sds_t path_buf;
const char *tmpdir;
size_t tmpdir_len;
const char *process_dir;
size_t process_dir_len;

path_buf = flb_sds_create_size(PATH_MAX);
if (path_buf == NULL) {
goto error;
}

tmpdir = get_tmpdir();
tmpdir_len = strlen(tmpdir);
process_dir = ctx->parquet_process_dir;
process_dir_len = flb_sds_len(ctx->parquet_process_dir);

result = flb_sds_cat_safe(&path_buf, tmpdir, tmpdir_len);
result = flb_sds_cat_safe(&path_buf, process_dir, process_dir_len);
if (result < 0) {
ret = -1;
goto error;
Expand Down Expand Up @@ -1546,13 +1535,13 @@ static int s3_compress_parquet(struct flb_s3 *ctx,
goto error;
}

result = create_tmpfile(infile, template_in_suffix, strlen(template_in_suffix));
result = create_tmpfile(ctx, infile, template_in_suffix, strlen(template_in_suffix));
if (result < 0) {
ret = -1;
goto error;
}

result = create_tmpfile(outfile, template_out_suffix, strlen(template_out_suffix));
result = create_tmpfile(ctx, outfile, template_out_suffix, strlen(template_out_suffix));
if (result < 0) {
ret = -1;
goto error;
Expand Down Expand Up @@ -3120,6 +3109,12 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Schema file for parquet objects. "
},
{
FLB_CONFIG_MAP_STR, "parquet.process_dir", DEFAULT_PARQUET_PROCESS_DIR,
0, FLB_TRUE, offsetof(struct flb_s3, parquet_process_dir),
"Specify a temporary directory for processing parquet objects. "
"This paramater is effective for non Windows platforms. "
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
0, FLB_FALSE, 0,
Expand Down
9 changes: 9 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
#define DEFAULT_PARQUET_COMMAND_CHECK "columnify -h > /dev/null 2>&1"
#endif

#if !defined(FLB_SYSTEM_WINDOWS)
#ifdef __ANDROID__
#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp"
#else
#define DEFAULT_PARQUET_PROCESS_DIR "/tmp"
#endif
#endif

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
Expand Down Expand Up @@ -165,6 +173,7 @@ struct flb_s3 {
flb_sds_t parquet_record_type;
flb_sds_t parquet_schema_type;
flb_sds_t parquet_schema_file;
flb_sds_t parquet_process_dir;

/*
* used to track that unset buffers were found on startup that have not
Expand Down

0 comments on commit 734ad8d

Please sign in to comment.