forked from bytemaster/disruptor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread.cpp
128 lines (108 loc) · 3.18 KB
/
thread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include <disruptor/thread.hpp>
#include <boost/thread.hpp>
namespace disruptor
{
struct cursor_handler
{
int64_t pos;
int64_t end;
int64_t max_batch;
read_cursor_ptr cur;
thread::handler call;
cursor_handler( read_cursor_ptr c, thread::handler h )
:pos(c->begin()),
end(c->end()),
max_batch(10),cur(c),call(h){}
};
namespace detail {
class thread_impl
{
public:
thread* _self;
std::unique_ptr<boost::thread> _thread;
bool _done;
std::vector<cursor_handler> _handlers;
read_cursor_ptr _read_post_cursor;
void run()
{
uint64_t spin_count = 0;
while( !_done )
{
bool inc_spin = true;
for( uint32_t i = 0; i < _handlers.size(); ++i )
{
cursor_handler& current = _handlers[i];
if( current.pos == current.end )
{
current.cur->publish( current.pos - 1 );
current.end = current.cur->check_end();
}
if( current.pos < current.end )
{
auto next = current.call( current.pos,
std::min(current.end,
current.pos+current.max_batch) );
if( next > current.pos )
{
// we made progress
current.pos = next;
inc_spin = false;
spin_count = 0;
}
}
}
spin_count += inc_spin;
// progress backoff... until a maximum of 10 ms
if( spin_count > 1000 ) usleep( std::min<int64_t>( spin_count >> 4, 40*1000) );
}
}
};
} // namespace detail
thread::thread()
:my( new detail::thread_impl() )
{
my->_self = this;
my->_done = true;
my->_read_post_cursor = std::make_shared<read_cursor>();
post_cursor = std::make_shared<shared_write_cursor>(post_buffer.get_buffer_size());
post_cursor->follows( my->_read_post_cursor );
my->_read_post_cursor->follows( post_cursor );
add_cursor( my->_read_post_cursor,
[=]( int64_t begin, int64_t end ) -> int64_t
{
try
{
post_buffer.at(begin).call();
return begin + 1;
}
catch ( ... )
{
my->_read_post_cursor->set_alert( std::current_exception() );
return begin;
}
});
}
thread::~thread()
{
}
void thread::add_cursor( read_cursor_ptr c, handler h )
{
assert( my->_done && "handlers must be added before starting the thread" );
my->_handlers.push_back( cursor_handler( c, h ) );
}
void thread::start()
{
assert( my->_done && "thread already running" );
my->_done = false;
my->_thread.reset( new boost::thread( [=](){ my->run(); } ) );
}
void thread::stop()
{
my->_done = true;
}
void thread::join()
{
assert( my->_thread );
my->_thread->join();
}
} // namespace disruptor