-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy paththreadpool.h
118 lines (109 loc) · 4.58 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* Copyright 2016 Jakub Klama <[email protected]>
* All rights reserved
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted providing that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef LIB9P_THREADPOOL_H
#define LIB9P_THREADPOOL_H
#include <stdbool.h>
#include <pthread.h>
#include <sys/queue.h>
#include "lib9p.h"
STAILQ_HEAD(l9p_request_queue, l9p_request);
/*
* Most of the workers in the threadpool run requests.
*
* One distinguished worker delivers responses from the
* response queue. The reason this worker exists is to
* guarantee response order, so that flush responses go
* after their flushed requests.
*/
struct l9p_threadpool {
struct l9p_connection * ltp_conn; /* the connection */
struct l9p_request_queue ltp_workq; /* requests awaiting a worker */
struct l9p_request_queue ltp_replyq; /* requests that are done */
pthread_mutex_t ltp_mtx; /* locks queues and cond vars */
pthread_cond_t ltp_work_cv; /* to signal regular workers */
pthread_cond_t ltp_reply_cv; /* to signal reply-worker */
LIST_HEAD(, l9p_worker) ltp_workers; /* list of all workers */
};
/*
* All workers, including the responder, use this as their
* control structure. (The only thing that distinguishes the
* responder is that it runs different code and waits on the
* reply_cv.)
*/
struct l9p_worker {
struct l9p_threadpool * ltw_tp;
pthread_t ltw_thread;
bool ltw_exiting;
bool ltw_responder;
LIST_ENTRY(l9p_worker) ltw_link;
};
/*
* Each request has a "work state" telling where the request is,
* in terms of workers working on it. That is, this tells us
* which threadpool queue, if any, the request is in now or would
* go in, or what's happening with it.
*/
enum l9p_workstate {
L9P_WS_NOTSTARTED, /* not yet started */
L9P_WS_IMMEDIATE, /* Tflush being done sans worker */
L9P_WS_INPROGRESS, /* worker is working on it */
L9P_WS_RESPQUEUED, /* worker is done, response queued */
L9P_WS_REPLYING, /* responder is in final reply path */
};
/*
* Each request has a "flush state", initally NONE meaning no
* Tflush affected the request.
*
* If a Tflush comes in before we ever assign a work thread,
* the flush state goes to FLUSH_REQUESTED_PRE_START.
*
* If a Tflush comes in after we assign a work thread, the
* flush state goes to FLUSH_REQUESTED_POST_START. The flush
* request may be too late: the request might finish anyway.
* Or it might be soon enough to abort. In all cases, though, the
* operation requesting the flush (the "flusher") must wait for
* the other request (the "flushee") to go through the respond
* path. The respond routine gets to decide whether to send a
* normal response, send an error, or drop the request
* entirely.
*
* There's one especially annoying case: what if a Tflush comes in
* *while* we're sending a response? In this case it's too late:
* the flush just waits for the fully-composed response.
*/
enum l9p_flushstate {
L9P_FLUSH_NONE = 0, /* must be zero */
L9P_FLUSH_REQUESTED_PRE_START, /* not even started before flush */
L9P_FLUSH_REQUESTED_POST_START, /* started, then someone said flush */
L9P_FLUSH_TOOLATE /* too late, already responding */
};
void l9p_threadpool_flushee_done(struct l9p_request *);
int l9p_threadpool_init(struct l9p_threadpool *, int);
void l9p_threadpool_run(struct l9p_threadpool *, struct l9p_request *);
int l9p_threadpool_shutdown(struct l9p_threadpool *);
int l9p_threadpool_tflush(struct l9p_request *);
#endif /* LIB9P_THREADPOOL_H */