Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreading via C++ thread pool of clients #125

Merged
merged 95 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
a6e238b
switching machines
kentslaney Dec 11, 2023
22b1e47
switch back...
kentslaney Dec 11, 2023
9fe14b3
LockPool
kentslaney Dec 13, 2023
02c941d
ClientPool
kentslaney Dec 13, 2023
3fb5dd7
missed renamed symbol
kentslaney Dec 13, 2023
b7305ca
cppcheck fixes
kentslaney Dec 13, 2023
c936fdf
minor build fixes
kentslaney Dec 13, 2023
a4662c2
std::execution::par[_unseq] requires -fexceptions
kentslaney Dec 14, 2023
f5eca1c
act support
kentslaney Dec 14, 2023
448458b
python bindings
kentslaney Dec 15, 2023
b5410a3
avoiding iota
kentslaney Dec 17, 2023
2324b0f
switching machines
kentslaney Dec 17, 2023
1e9f944
still broken...
kentslaney Dec 20, 2023
6c860c3
...with a different error though
kentslaney Dec 20, 2023
9ce5327
stranger error
kentslaney Dec 21, 2023
07964a8
well that feels silly after the fact
kentslaney Dec 21, 2023
550ef71
redundancies
kentslaney Dec 21, 2023
f099e68
partially fix cython integration
kentslaney Dec 21, 2023
cfdd1b4
reset cmake
kentslaney Dec 21, 2023
e4af40d
python class
kentslaney Dec 21, 2023
f538b6e
remove header...
kentslaney Dec 21, 2023
f0b37f1
__cinit__ semantics
kentslaney Dec 21, 2023
4d7ecdb
split up tests
kentslaney Dec 21, 2023
87b834f
flustered and cygdb can't find debugging logs
kentslaney Dec 21, 2023
df5f3d3
mild QOL patch
kentslaney Dec 21, 2023
5eff5b3
trailing whitespace
kentslaney Dec 22, 2023
6285a81
forgot to init ClientPool
kentslaney Dec 22, 2023
167d123
pre-commit still fails on macos
kentslaney Dec 22, 2023
9416571
made applying debug patch easier
kentslaney Dec 22, 2023
80d19c6
forgot to update method reference
kentslaney Dec 22, 2023
c675ec5
TCP capture
kentslaney Dec 24, 2023
a25a7cd
cython debug script
kentslaney Dec 24, 2023
5f2c427
messed up setup.py
kentslaney Dec 24, 2023
6e40df0
python versioning and resolve symlinks
kentslaney Dec 24, 2023
d440b51
ubuntu compat
kentslaney Dec 26, 2023
e1a5de4
missed _update_servers ref
kentslaney Dec 26, 2023
5369e41
stricter inheritance
kentslaney Dec 26, 2023
4079333
invalid keys in test
kentslaney Jan 13, 2024
2d05017
removed race conditions
kentslaney Jan 14, 2024
1e9c2bc
hung threads illustration
kentslaney Jan 15, 2024
8362719
commit for easier copying to a docker environment with a development …
kentslaney Jan 15, 2024
7afa87e
QoL nit
kentslaney Jan 15, 2024
4673c90
fresh environment fixes
kentslaney Jan 15, 2024
15a705e
threaded traceback bug
kentslaney Jan 17, 2024
03cb651
excepthook behavior
kentslaney Jan 17, 2024
71d44e4
debugging tools
kentslaney Jan 17, 2024
9f3f4ef
ln fails if symlink exists but is broken
kentslaney Jan 17, 2024
3303ad3
venv for both debug and test
kentslaney Jan 17, 2024
0b2346f
support for <3.11
kentslaney Jan 17, 2024
f54eb9b
WIP debug build flags
kentslaney Jan 18, 2024
e367a3f
debug build flags
kentslaney Jan 18, 2024
c3ff9ae
can't assign to python scope variables w/o GIL and underlying functio…
kentslaney Jan 19, 2024
bdaf276
sequential benchmark, but measures overhead
kentslaney Jan 20, 2024
d8d0014
remove debugging statements and shared cython resources
kentslaney Jan 20, 2024
f8d4728
cython threading hangs under load
kentslaney Jan 20, 2024
804a18e
Revert "can't assign to python scope variables w/o GIL and underlying…
kentslaney Jan 23, 2024
340a8e5
revert reverted debug setup removal
kentslaney Jan 23, 2024
f7964e5
debugging conviences
kentslaney Jan 23, 2024
a82f34b
cython gil misunderstanding
kentslaney Jan 23, 2024
aff16cf
benchmark setup
kentslaney Jan 23, 2024
e06cafe
greenify initial pass
kentslaney Jan 23, 2024
1def670
greenlet misunderstanding
kentslaney Jan 23, 2024
ecc5ec7
eventlet unittest
kentslaney Jan 23, 2024
59662fc
eventlet name instead of thread name
kentslaney Jan 23, 2024
723addc
threaded python benchmarks
kentslaney Jan 27, 2024
afa71a0
typo
kentslaney Jan 27, 2024
3787008
missed gevent failure
kentslaney Jan 27, 2024
ad0bd7a
not sure why this one hangs
kentslaney Jan 27, 2024
37eb65c
add py FIFO thread pool
kentslaney Jan 27, 2024
31d8569
pytest stdout
kentslaney Feb 2, 2024
7a92e7b
config typo
kentslaney Feb 4, 2024
f767931
allow pytest stdout for run-test
kentslaney Feb 4, 2024
63ab7f5
remove greenify debug setup and internal threading metrics
kentslaney Feb 6, 2024
5cbc847
switch hash function back
kentslaney Feb 6, 2024
39a87a5
fail fast
kentslaney Feb 6, 2024
299508a
cppcheck lint
kentslaney Feb 6, 2024
2722b56
build fixes
kentslaney Feb 6, 2024
af0240f
remove dead code
kentslaney Feb 6, 2024
d80eff1
add c++ std flags
kentslaney Feb 6, 2024
b1263d1
update cgo CXXFLAGS
kentslaney Feb 6, 2024
36f1644
fix double acquire attempt for m_pool_lock
kentslaney Feb 6, 2024
ccd20a5
explicit irange constructor
kentslaney Feb 7, 2024
7e7dfae
README update
kentslaney Feb 7, 2024
1c8c6bc
link format
kentslaney Feb 7, 2024
399e4e6
typo
kentslaney Feb 7, 2024
728ddca
md -> rst
kentslaney Feb 7, 2024
d561b84
remove dead link
kentslaney Feb 7, 2024
1f8dd88
clarifying statistics
kentslaney Feb 7, 2024
87c4e64
better footnote about traffic stats
kentslaney Feb 7, 2024
78a5431
wording
kentslaney Feb 7, 2024
f8ecc8e
gevent support explanation
kentslaney May 31, 2024
51e627f
version bump
kentslaney May 31, 2024
1327704
remove gevent test for ThreadedClient
kentslaney May 31, 2024
f0263ad
Separation of concerns
kentslaney Jun 4, 2024
081db1d
mistake in benchmark thread spawning
kentslaney Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set(CMAKE_MACOSX_RPATH 1)

set (MC_VERSION_MAJOR 1)
set (MC_VERSION_MINOR 4)
set (MC_VERSION_PATCH 1)
set (MC_VERSION_PATCH 4)

set (MC_VERSION ${MC_VERSION_MAJOR}.${MC_VERSION_MINOR})
set (MC_APIVERSION ${MC_VERSION}.${MC_VERSION_PATCH})
Expand All @@ -15,7 +15,7 @@ if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build, options are: Debug Release." FORCE)
endif (NOT CMAKE_BUILD_TYPE)

set(CMAKE_CXX_FLAGS_COMMON "-Wall -fno-rtti -fno-exceptions")
set(CMAKE_CXX_FLAGS_COMMON "-Wall -fno-rtti -fno-exceptions -std=c++17")
set(CMAKE_CXX_FLAGS_DEBUG "-DDEBUG -g2 ${CMAKE_CXX_FLAGS_COMMON}" CACHE STRING "CXX DEBUG FLAGS" FORCE)
set(CMAKE_CXX_FLAGS_RELEASE "-DNDEBUG -O3 ${CMAKE_CXX_FLAGS_COMMON}" CACHE STRING "CXX RELEASE FLAGS" FORCE)
set(CMAKE_INSTALL_INCLUDE include CACHE PATH "Output directory for header files")
Expand Down
106 changes: 106 additions & 0 deletions include/ClientPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <shared_mutex>
#include <iterator>
#include <array>

#include "Client.h"
#include "LockPool.h"

namespace douban {
namespace mc {

template <size_t N>
void duplicate_strings(const char* const * strs, const size_t n,
std::deque<std::array<char, N> >& out, std::vector<char*>& refs) {
out.resize(n);
refs.resize(n);
for (size_t i = 0; i < n; i++) {
if (strs == NULL || strs[i] == NULL) {
out[i][0] = '\0';
refs[i] = NULL;
continue;
}
std::snprintf(out[i].data(), N, "%s", strs[i]);
refs[i] = out[i].data();
}
}

class irange {
int i;

public:
using value_type = int;
using pointer = const int*;
using reference = const int&;
using difference_type = int;
using iterator_category = std::random_access_iterator_tag;

explicit irange(int i) : i(i) {}

reference operator*() const { return i; }
pointer operator->() const { return &i; }
value_type operator[](int n) const { return i + n; }
friend bool operator< (const irange& lhs, const irange& rhs) { return lhs.i < rhs.i; }
friend bool operator> (const irange& lhs, const irange& rhs) { return rhs < lhs; }
friend bool operator<=(const irange& lhs, const irange& rhs) { return !(lhs > rhs); }
friend bool operator>=(const irange& lhs, const irange& rhs) { return !(lhs < rhs); }
friend bool operator==(const irange& lhs, const irange& rhs) { return lhs.i == rhs.i; }
friend bool operator!=(const irange& lhs, const irange& rhs) { return !(lhs == rhs); }
irange& operator++() { ++i; return *this; }
irange& operator--() { --i; return *this; }
irange operator++(int) { irange tmp = *this; ++tmp; return tmp; }
irange operator--(int) { irange tmp = *this; --tmp; return tmp; }
irange& operator+=(difference_type n) { i += n; return *this; }
irange& operator-=(difference_type n) { i -= n; return *this; }
friend irange operator+(const irange& lhs, difference_type n) { irange tmp = lhs; tmp += n; return tmp; }
friend irange operator+(difference_type n, const irange& rhs) { return rhs + n; }
friend irange operator-(const irange& lhs, difference_type n) { irange tmp = lhs; tmp -= n; return tmp; }
friend difference_type operator-(const irange& lhs, const irange& rhs) { return lhs.i - rhs.i; }
};

typedef struct {
Client c;
int index;
} IndexedClient;

class ClientPool : LockPool {
public:
ClientPool();
~ClientPool();
void config(config_options_t opt, int val);
int init(const char* const * hosts, const uint32_t* ports,
const size_t n, const char* const * aliases = NULL);
int updateServers(const char* const * hosts, const uint32_t* ports,
const size_t n, const char* const * aliases = NULL);
IndexedClient* _acquire();
void _release(const IndexedClient* idx);
Client* acquire();
void release(const Client* ref);

private:
int growPool(size_t by);
int setup(Client* c);
inline bool shouldGrowUnsafe();
int autoGrow();

bool m_opt_changed[CLIENT_CONFIG_OPTION_COUNT];
int m_opt_value[CLIENT_CONFIG_OPTION_COUNT];
std::deque<IndexedClient> m_clients;
size_t m_initial_clients;
size_t m_max_clients;
size_t m_max_growth;

std::deque<std::array<char, MC_NI_MAXHOST> > m_hosts_data;
std::deque<std::array<char, MC_NI_MAXHOST + 1 + MC_NI_MAXSERV> > m_aliases_data;
std::vector<uint32_t> m_ports;

std::vector<char*> m_hosts;
std::vector<char*> m_aliases;

std::mutex m_pool_lock;
mutable std::shared_mutex m_acquiring_growth;
};

} // namespace mc
} // namespace douban
14 changes: 12 additions & 2 deletions include/Export.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@


typedef enum {
CFG_POLL_TIMEOUT,
// Client config options
CFG_POLL_TIMEOUT = 0,
CFG_CONNECT_TIMEOUT,
CFG_RETRY_TIMEOUT,
CFG_HASH_FUNCTION,
CFG_MAX_RETRIES
CFG_MAX_RETRIES,
CFG_SET_FAILOVER,

// type separator to track number of Client config options to save
CLIENT_CONFIG_OPTION_COUNT,

// ClientPool config options
CFG_INITIAL_CLIENTS,
CFG_MAX_CLIENTS,
CFG_MAX_GROWTH
} config_options_t;


Expand Down
100 changes: 100 additions & 0 deletions include/LockPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once

#include <mutex>
#include <condition_variable>
#include <queue>
#include <deque>
#include <vector>
#include <atomic>

namespace douban {
namespace mc {

// https://stackoverflow.com/a/14792685/3476782
class OrderedLock {
std::queue<std::condition_variable*> m_fifo_locks;
protected:
std::mutex m_fifo_access;
std::atomic<bool> m_locked;

protected:
OrderedLock() : m_locked(true) {};
std::unique_lock<std::mutex> lock() {
std::unique_lock<std::mutex> acquire(m_fifo_access);
if (m_locked) {
std::condition_variable signal;
m_fifo_locks.emplace(&signal);
signal.wait(acquire);
m_fifo_locks.pop();
} else {
m_locked = true;
}
return acquire;
}

void unlock() {
if (m_fifo_locks.empty()) {
m_locked = false;
} else {
m_fifo_locks.front()->notify_all();
}
}
};

class LockPool : public OrderedLock {
std::deque<size_t> m_available;
std::list<std::mutex*> m_muxes;
std::list<std::mutex*> m_mux_mallocs;

protected:
std::deque<std::mutex*> m_thread_workers;

LockPool() {}
~LockPool() {
std::lock_guard<std::mutex> freeing(m_fifo_access);
for (auto worker : m_thread_workers) {
std::lock_guard<std::mutex> freeing_worker(*worker);
}
for (auto mem : m_muxes) {
mem->std::mutex::~mutex();
}
for (auto mem : m_mux_mallocs) {
delete[] mem;
}
}

void addWorkers(size_t n) {
std::unique_lock<std::mutex> growing_pool(m_fifo_access);
const auto from = m_thread_workers.size();
const auto muxes = new std::mutex[n];
m_mux_mallocs.push_back(muxes);
for (size_t i = 0; i < n; i++) {
m_available.push_back(from + i);
m_muxes.push_back(&muxes[i]);
}
// static_cast needed for some versions of C++
std::transform(
muxes, muxes + n, std::back_inserter(m_thread_workers),
static_cast<std::mutex*(*)(std::mutex&)>(std::addressof<std::mutex>));
unlock();
}

int acquireWorker() {
auto fifo_lock = lock();
const auto res = m_available.front();
m_available.pop_front();
if (!m_available.empty()) {
unlock();
}
return res;
}

void releaseWorker(int worker) {
std::unique_lock<std::mutex> growing_pool(m_fifo_access);
m_available.push_front(worker);
unlock();
}
};

} // namespace mc
} // namespace douban
41 changes: 37 additions & 4 deletions libmc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import os
import functools
from ._client import (
PyClient, ThreadUnsafe,
encode_value,
decode_value,
PyClientPool, PyClientUnsafe as ClientUnsafe,

MC_DEFAULT_EXPTIME,
MC_POLL_TIMEOUT,
MC_CONNECT_TIMEOUT,
MC_RETRY_TIMEOUT,
MC_SET_FAILOVER,
MC_INITIAL_CLIENTS,
MC_MAX_CLIENTS,
MC_MAX_GROWTH,

MC_HASH_MD5,
MC_HASH_FNV1_32,
Expand All @@ -27,25 +33,52 @@
__file__ as _libmc_so_file
)

__VERSION__ = "1.4.3"
__version__ = "v1.4.3"
__VERSION__ = "1.4.4"
__version__ = "v1.4.4"
__author__ = "mckelvin"
__email__ = "[email protected]"
__date__ = "Fri Dec 1 07:43:12 2023 +0800"
__date__ = "Sat Jun 1 05:10:05 2024 +0800"


class Client(PyClient):
pass

class ClientPool(PyClientPool):
pass

class ThreadedClient:
def __init__(self, *args, **kwargs):
self._client_pool = ClientPool(*args, **kwargs)

def update_servers(self, servers):
return self._client_pool.update_servers(servers)

def config(self, opt, val):
self._client_pool.config(opt, val)

def __getattr__(self, key):
if not hasattr(Client, key):
raise AttributeError
result = getattr(Client, key)
if callable(result):
@functools.wraps(result)
def wrapper(*args, **kwargs):
with self._client_pool.client() as mc:
return getattr(mc, key)(*args, **kwargs)
return wrapper
return result


DYNAMIC_LIBRARIES = [os.path.abspath(_libmc_so_file)]


__all__ = [
'Client', 'ThreadUnsafe', '__VERSION__', 'encode_value', 'decode_value',
'ClientUnsafe', 'ClientPool', 'ThreadedClient',

'MC_DEFAULT_EXPTIME', 'MC_POLL_TIMEOUT', 'MC_CONNECT_TIMEOUT',
'MC_RETRY_TIMEOUT',
'MC_RETRY_TIMEOUT', 'MC_SET_FAILOVER', 'MC_INITIAL_CLIENTS',
'MC_MAX_CLIENTS', 'MC_MAX_GROWTH',

'MC_HASH_MD5', 'MC_HASH_FNV1_32', 'MC_HASH_FNV1A_32', 'MC_HASH_CRC_32',

Expand Down
Loading
Loading