Skip to content

Commit

Permalink
output_thread: add support for scheduled timer jobs with coroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Nov 29, 2023
1 parent bbaa4d8 commit 6132ad8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 8 deletions.
10 changes: 8 additions & 2 deletions include/fluent-bit/flb_output_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ struct flb_out_thread_instance {
* 'flushes' running by a threaded instance, then the access to the 'flush_list'
* must be protected: we use 'flush_mutex for that purpose.
*/
pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */
pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */

/* Same as flush_mutex but for timer coros */
struct mk_list timer_coro_list; /* flush context list */
struct mk_list timer_coro_list_destroy; /* flust context destroy list */
pthread_mutex_t timer_mutex; /* mutex for 'flush_list' */

/* List of mapped 'upstream' contexts */
struct mk_list upstreams;
Expand All @@ -100,7 +105,8 @@ int flb_output_thread_pool_start(struct flb_output_instance *ins);
int flb_output_thread_pool_flush(struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config);

int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins);
void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins);

void flb_output_thread_instance_init();
struct flb_out_thread_instance *flb_output_thread_instance_get();
Expand Down
66 changes: 60 additions & 6 deletions src/flb_output_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ static void output_thread(void *data)
struct flb_output_instance *ins;
struct flb_output_flush *out_flush;
struct flb_out_thread_instance *th_ins = data;
struct flb_out_flush_params *params;
struct flb_out_flush_params *flush_params = NULL;
struct flb_out_timer_coro_params *timer_params = NULL;
struct flb_net_dns dns_ctx;

/* Register thread instance */
Expand Down Expand Up @@ -333,7 +334,7 @@ static void output_thread(void *data)
flb_sched_timer_cleanup(sched);

/* Check if we should stop the event loop */
if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0) {
if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->timer_coro_list) == 0) {
/*
* If there are no busy network connections (and no coroutines) its
* safe to stop it.
Expand All @@ -359,9 +360,13 @@ static void output_thread(void *data)
flb_upstream_conn_pending_destroy_list(&th_ins->upstreams);

flb_sched_destroy(sched);
params = FLB_TLS_GET(out_flush_params);
if (params) {
flb_free(params);
flush_params = FLB_TLS_GET(out_flush_params);
if (flush_params) {
flb_free(flush_params);
}
timer_params = FLB_TLS_GET(timer_coro_params);
if (timer_params) {
flb_free(timer_params);
}
mk_event_loop_destroy(th_ins->evl);
flb_bucket_queue_destroy(th_ins->evl_bktq);
Expand Down Expand Up @@ -437,7 +442,10 @@ int flb_output_thread_pool_create(struct flb_config *config,
th_ins->flush_id = 0;
mk_list_init(&th_ins->flush_list);
mk_list_init(&th_ins->flush_list_destroy);
mk_list_init(&th_ins->timer_coro_list);
mk_list_init(&th_ins->timer_coro_list_destroy);
pthread_mutex_init(&th_ins->flush_mutex, NULL);
pthread_mutex_init(&th_ins->timer_mutex, NULL);
mk_list_init(&th_ins->upstreams);

upstream_thread_create(th_ins, ins);
Expand Down Expand Up @@ -495,6 +503,53 @@ int flb_output_thread_pool_create(struct flb_config *config,
return 0;
}

int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins)
{
int n;
int size = 0;
struct mk_list *head;
struct flb_tp *tp = ins->tp;
struct flb_tp_thread *th;
struct flb_out_thread_instance *th_ins;

mk_list_foreach(head, &tp->list_threads) {
th = mk_list_entry(head, struct flb_tp_thread, _head);
if (th->status != FLB_THREAD_POOL_RUNNING) {
continue;
}

th_ins = th->params.data;

pthread_mutex_lock(&th_ins->flush_mutex);
n = mk_list_size(&th_ins->timer_coro_list);
pthread_mutex_unlock(&th_ins->flush_mutex);
size += n;
}

return size;
}

void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins)
{
struct mk_list *head;
struct mk_list *tmp;
struct flb_tp *tp = ins->tp;
struct flb_tp_thread *th;
struct flb_out_thread_instance *th_ins;

mk_list_foreach_safe(head, tmp, &tp->list_threads) {
th = mk_list_entry(head, struct flb_tp_thread, _head);
if (th->status != FLB_THREAD_POOL_RUNNING) {
continue;
}

th_ins = th->params.data;
pthread_mutex_lock(&th_ins->timer_mutex);
flb_timer_coros_print(&th_ins->timer_coro_list);
pthread_mutex_unlock(&th_ins->timer_mutex);
}
}

int flb_output_thread_pool_coros_size(struct flb_output_instance *ins)
{
int n;
Expand All @@ -504,7 +559,6 @@ int flb_output_thread_pool_coros_size(struct flb_output_instance *ins)
struct flb_tp_thread *th;
struct flb_out_thread_instance *th_ins;

/* Signal each worker thread that needs to stop doing work */
mk_list_foreach(head, &tp->list_threads) {
th = mk_list_entry(head, struct flb_tp_thread, _head);
if (th->status != FLB_THREAD_POOL_RUNNING) {
Expand Down

0 comments on commit 6132ad8

Please sign in to comment.