Skip to content

Commit

Permalink
added support for UNIX domain sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
kentslaney committed Nov 19, 2023
1 parent fbf4acf commit ce0d6bc
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 14 deletions.
4 changes: 3 additions & 1 deletion include/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class Connection {
size_t m_counter;

protected:
int connectPoll(int fd, struct addrinfo* ai_ptr);
int connectPoll(int fd, const sockaddr* ai_ptr, const socklen_t ai_addrlen);
inline int local();

char m_name[MC_NI_MAXHOST + 1 + MC_NI_MAXSERV];
char m_host[MC_NI_MAXHOST];
Expand All @@ -70,6 +71,7 @@ class Connection {
int m_socketFd;
bool m_alive;
bool m_hasAlias;
bool m_local;
time_t m_deadUntil;
io::BufferWriter* m_buffer_writer; // for send
io::BufferReader* m_buffer_reader; // for recv
Expand Down
20 changes: 13 additions & 7 deletions libmc/_client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -378,19 +378,25 @@ cdef class PyClient:

servers_ = []
for srv in servers:
addr_alias = srv.split(' ')
addr_alias = srv.rsplit(' ', 1)
addr = addr_alias[0]
if len(addr_alias) == 1:
alias = None
elif addr.endswith("\\"):
addr = srv
alias = None
else:
alias = addr_alias[1]

host_port = addr.split(':')
host = host_port[0]
if len(host_port) == 1:
port = MC_DEFAULT_PORT

if addr.startswith("/"):
port = 0
else:
port = int(host_port[1])
host_port = addr.split(':')
host = host_port[0]
if len(host_port) == 1:
port = MC_DEFAULT_PORT
else:
port = int(host_port[1])
if PY_MAJOR_VERSION > 2:
host = PyUnicode_AsUTF8String(host)
alias = PyUnicode_AsUTF8String(alias) if alias else None
Expand Down
25 changes: 25 additions & 0 deletions misc/memcached_server
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ function start()
fi
}

function unix()
{
name="${1:-unix_test}"
if [ ! -f $basedir/var/log/${name}.log ]; then
mkdir -p $basedir/var/log
touch $basedir/var/log/${name}.log
fi
mkdir -p $basedir/var/run
$cmd -d -u $USER -s $basedir/var/run/${name}.socket -t $threads -m ${memory} -P $basedir/var/run/${name}.pid > $basedir/var/log/${name}.log 2>&1
echo "Starting the memcached server on '$basedir/var/run/${name}.socket'... "
}

function stop()
{
port="$1"
Expand Down Expand Up @@ -78,6 +90,19 @@ case "$1" in
wait
fi
;;
unix)
shift
unix $@
;;
unixstop)
if [ `ls $basedir/var/run/ | grep -c .pid` -ge 1 ]; then
names="`basename $basedir/var/run/*.pid | cut -d. -f1`"
for name in $names; do
stop $name &
done
fi
wait
;;
*)
printf 'Usage: %s {start|stop|restart} <port>\n' "$prog"
exit 1
Expand Down
52 changes: 46 additions & 6 deletions src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>
#include <sys/types.h>

Expand All @@ -21,8 +22,8 @@ namespace mc {

Connection::Connection()
: m_counter(0), m_port(0), m_socketFd(-1),
m_alive(false), m_hasAlias(false), m_deadUntil(0),
m_connectTimeout(MC_DEFAULT_CONNECT_TIMEOUT),
m_alive(false), m_hasAlias(false), m_local(false),
m_deadUntil(0), m_connectTimeout(MC_DEFAULT_CONNECT_TIMEOUT),
m_retryTimeout(MC_DEFAULT_RETRY_TIMEOUT),
m_maxRetries(MC_DEFAULT_MAX_RETRIES), m_retires(0) {
m_name[0] = '\0';
Expand All @@ -45,9 +46,14 @@ Connection::~Connection() {
int Connection::init(const char* host, uint32_t port, const char* alias) {
snprintf(m_host, sizeof m_host, "%s", host);
m_port = port;
m_local = m_host[0] == '/'; // un.h UNIX_PATH_MAX < netdb.h NI_MAXHOST
if (alias == NULL) {
m_hasAlias = false;
snprintf(m_name, sizeof m_name, "%s:%u", m_host, m_port);
if (m_local) {
snprintf(m_name, sizeof m_name, "%s", m_host);
} else {
snprintf(m_name, sizeof m_name, "%s:%u", m_host, m_port);
}
} else {
m_hasAlias = true;
snprintf(m_name, sizeof m_name, "%s", alias);
Expand All @@ -59,6 +65,10 @@ int Connection::init(const char* host, uint32_t port, const char* alias) {
int Connection::connect() {
assert(!m_alive);
this->close();
if (m_local) {
return local();
}

struct addrinfo hints, *server_addrinfo = NULL, *ai_ptr = NULL;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
Expand Down Expand Up @@ -104,7 +114,7 @@ int Connection::connect() {
}

// make sure the connection is established
if (connectPoll(fd, ai_ptr) == 0) {
if (connectPoll(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen) == 0) {
m_socketFd = fd;
m_alive = true;
break;
Expand All @@ -121,8 +131,38 @@ int Connection::connect() {
return m_alive ? 0 : -1;
}

int Connection::connectPoll(int fd, struct addrinfo* ai_ptr) {
int conn_rv = ::connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
inline int Connection::local() {
int fd, flags, opt_keepalive = 1;

if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
log_err("socket()");
return -1;
}

if ((flags = fcntl(fd, F_GETFL, 0)) < 0 ||
fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
log_err("setting O_NONBLOCK");
::close(fd);
return -1;
}

setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&opt_keepalive, sizeof opt_keepalive);

struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, m_host, sizeof(addr.sun_path) - 1);
assert(strcmp(addr.sun_path, m_host) == 0);
if (connectPoll(fd, (const struct sockaddr *)&addr, sizeof addr) != 0) {
return -1;
}
m_socketFd = fd;
m_alive = true;
return 0;
}

int Connection::connectPoll(int fd, const sockaddr* ai_addr, const socklen_t ai_addrlen) {
int conn_rv = ::connect(fd, ai_addr, ai_addrlen);
if (conn_rv == 0) {
return 0;
}
Expand Down
11 changes: 11 additions & 0 deletions tests/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ mc::Client* newClient(int n) {
"sierra",
"tango"
};
return md5Client(hosts, ports, n, aliases);
}

mc::Client* md5Client(const char* const * hosts, const uint32_t* ports, const size_t n,
const char* const * aliases = NULL) {
mc::Client* client = new mc::Client();
client->config(CFG_HASH_FUNCTION, OPT_HASH_MD5);
client->init(hosts, ports, n, aliases);
Expand All @@ -87,6 +92,12 @@ mc::Client* newClient(int n) {
return client;
}

mc::Client* newUnixClient() {
const char * hosts[] = { "/tmp/env_mc_dev/var/run/unix_test.socket" };
const uint32_t ports[] = { 0 };
return md5Client(hosts, ports, 1);
}


std::string get_resource_path(const char* basename) {
std::string this_path(__FILE__);
Expand Down
14 changes: 14 additions & 0 deletions tests/test_unix.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "Client.h"
#include "test_common.h"

#include <cassert>
#include "gtest/gtest.h"

using douban::mc::Client;
using douban::mc::tests::newUnixClient;

TEST(test_unix, establish_connection) {
Client* client = newUnixClient();
EXPECT_TRUE(client != NULL);
delete client;
}

0 comments on commit ce0d6bc

Please sign in to comment.