-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread_specific.h
176 lines (146 loc) · 3.45 KB
/
thread_specific.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#ifndef MDB_THREAD_SPECIFIC_H
#define MDB_THREAD_SPECIFIC_H
#include <stdlib.h>
#include <pthread.h>
#include <mutex>
#include <deque>
#include <vector>
#include <atomic>
// pthread key 上限个数
extern unsigned long mdb_hash_index_max_num;
namespace mdb {
class thread_specific
{
private:
typedef void (*destr_func)(void *);
struct key_struct
{
std::atomic<uintptr_t> seq;
destr_func destr;
key_struct() = default;
key_struct(const key_struct &other) :
seq(other.seq.load()),
destr(other.destr)
{}
key_struct(key_struct &&other):
seq(other.seq.load()),
destr(other.destr)
{
}
};
struct key_data
{
uintptr_t seq = 0;
void * data = nullptr;
};
typedef std::vector<key_struct> key_container_t;
typedef std::vector<key_data> value_container_t;
inline bool key_unsed(uintptr_t seq) const
{
return (seq & 1) == 0;
}
inline bool key_usable(uintptr_t seq) const
{
return seq < (seq + 2); // in case of overflow
}
private:
thread_specific()
{
int rc = pthread_key_create(&m_thread_key, on_thread_exit);
MDB_ASSERT2(rc == 0, "key create return %d", rc);
m_keys.resize(mdb_hash_index_max_num * 2);
}
public:
~thread_specific()
{
pthread_key_delete(m_thread_key);
}
static thread_specific &instance()
{
static thread_specific obj;
return obj;
}
public:
ssize_t key_create(void (*destr)(void *) )
{
size_t size = m_keys.size();
for (size_t i = 0; i < size; i++)
{
key_struct &key = m_keys[i];
uintptr_t seq = key.seq.load();
if (key_unsed(seq) && key_usable(seq) &&
key.seq.compare_exchange_weak(seq, seq+1))
{
key.destr = destr;
return i;
}
}
return -1;
}
int key_delete(ssize_t key)
{
if (key >= 0 && key < (ssize_t)m_keys.size())
{
auto &k = m_keys[key];
uintptr_t seq = k.seq.load();
if (!key_unsed(seq) &&
k.seq.compare_exchange_weak(seq, seq+1))
return 0;
}
return EINVAL;
}
void *getspecific(ssize_t key)
{
void *ptr = pthread_getspecific(m_thread_key);
if (!ptr)
return nullptr;
value_container_t *values = (value_container_t *)ptr;
if (key >= (ssize_t)values->size())
return nullptr;
auto & value = (*values)[key];
if (value.data == nullptr ||
value.seq != m_keys[key].seq.load())
return nullptr;
return value.data;
}
void setspecific(ssize_t key, void *ptr)
{
value_container_t *values = (value_container_t *)pthread_getspecific(m_thread_key);
if (!values)
{
values = new (std::nothrow) value_container_t;
MDB_ASSERT2(values != nullptr, "alloc deque fail");
int rc = pthread_setspecific(m_thread_key, values);
MDB_ASSERT2(0 == rc, "setspecific return %d, thread key %u", rc, m_thread_key);
}
if (key >= (ssize_t)values->size())
values->resize(key + 1); // 不做key的范围校验
auto & value = (*values)[key];
value.seq = m_keys[key].seq.load();
value.data = ptr;
}
private:
static void on_thread_exit(void *ptr)
{
if (!ptr)
return ;
value_container_t *values = (value_container_t *)ptr;
auto &ts = thread_specific::instance();
const size_t size = values->size();
for (size_t i = 0; i < size; i++)
{
destr_func destr = ts.m_keys[i].destr;
if (!destr)
continue;
auto &thread_value = (*values)[i];
if (thread_value.seq == ts.m_keys[i].seq.load())
(*destr)(thread_value.data);
}
delete values;
}
private:
key_container_t m_keys;
pthread_key_t m_thread_key;
};
} // namespace mdb
#endif // MDB_THREAD_SPECIFIC_H