From 4368319fb668204f345da05b9dd79762506c67b6 Mon Sep 17 00:00:00 2001 From: sewenew Date: Sat, 29 Apr 2023 11:32:23 +0800 Subject: [PATCH] support AsyncRedisCluster::redis, so that we can send command that has no key to cluster in async mode --- README.md | 7 ++++ src/sw/redis++/async_connection.h | 2 ++ src/sw/redis++/async_connection_pool.cpp | 5 ++- src/sw/redis++/async_redis.cpp | 4 +++ src/sw/redis++/async_redis.h | 41 ++++++++++++++++++++---- src/sw/redis++/async_redis_cluster.cpp | 12 +++++++ src/sw/redis++/async_redis_cluster.h | 3 ++ 7 files changed, 66 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index ef90ec0..feb975f 100644 --- a/README.md +++ b/README.md @@ -2597,8 +2597,15 @@ auto mget_res = async_cluster.mget>({"{hashtag}key1" unordered_map m = {{"a", "b"}, {"c", "d"}}; Future hmset_res = async_redis.hmset("hash", m.begin(), m.end()); + +// Create an AsyncRedis object with hash-tag, so that we can send commands that has no key. +// It connects to Redis instance that holds the given key, i.e. hash-tag. +auto r = async_cluster.redis("hash-tag"); +Future ping_res = r.command("ping"); ``` +**NOTE**: By default, when you use `AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection = true)` to create an `AsyncRedis` object, instead of picking a connection from the underlying connection pool, it creates a new connection to the corresponding Redis server. So this is NOT a cheap operation, and you should try to reuse this newly created `AsyncRedis` object as much as possible. If you pass `false` as the second parameter, you can create a `AsyncRedis` object without creating a new connection. However, in this case, you should be very careful, otherwise, you might get bad performance or even dead lock. Please carefully check the related [pipeline section](#very-important-notes) before using this feature. Also the returned `AsyncRedis` object is NOT thread-safe, and if it throws exception, you need to destroy it, and create a new one with the `AsyncRedisCluster::redis` method. + #### Async Subscriber **NOTE**: I'm not quite satisfied with the interface of `AsyncSubscriber`. If you have a better idea, feel free to open an issue for discussion. diff --git a/src/sw/redis++/async_connection.h b/src/sw/redis++/async_connection.h index 34a8b62..e62a40a 100644 --- a/src/sw/redis++/async_connection.h +++ b/src/sw/redis++/async_connection.h @@ -470,6 +470,8 @@ class GuardedAsyncConnection { std::shared_ptr _connection; }; +using GuardedAsyncConnectionSPtr = std::shared_ptr; + namespace detail { // We seperate this function from ClusterEvent to avoid diff --git a/src/sw/redis++/async_connection_pool.cpp b/src/sw/redis++/async_connection_pool.cpp index b849231..ad371c2 100644 --- a/src/sw/redis++/async_connection_pool.cpp +++ b/src/sw/redis++/async_connection_pool.cpp @@ -110,7 +110,10 @@ AsyncConnectionPool& AsyncConnectionPool::operator=(AsyncConnectionPool &&that) } AsyncConnectionPool::~AsyncConnectionPool() { - assert(_loop); + if (!_loop) { + // This pool has been moved. + return; + } // TODO: what if the connection has been borrowed but not returned? // Or we dont' need to worry about that, since it's destructing and diff --git a/src/sw/redis++/async_redis.cpp b/src/sw/redis++/async_redis.cpp index b6e3f45..d7ebf83 100644 --- a/src/sw/redis++/async_redis.cpp +++ b/src/sw/redis++/async_redis.cpp @@ -50,6 +50,10 @@ AsyncRedis::AsyncRedis(const std::shared_ptr &sentinel, opts); } +AsyncRedis::AsyncRedis(const GuardedAsyncConnectionSPtr &connection) : _connection(connection) { + assert(_connection); +} + AsyncSubscriber AsyncRedis::subscriber() { // TODO: maybe we don't need to check this, // since there's no Transaction or Pipeline for AsyncRedis diff --git a/src/sw/redis++/async_redis.h b/src/sw/redis++/async_redis.h index de33d03..381a017 100644 --- a/src/sw/redis++/async_redis.h +++ b/src/sw/redis++/async_redis.h @@ -1017,6 +1017,10 @@ class AsyncRedis { } private: + friend class AsyncRedisCluster; + + explicit AsyncRedis(const GuardedAsyncConnectionSPtr &connection); + explicit AsyncRedis(const Uri &uri); template @@ -1029,10 +1033,20 @@ class AsyncRedis { Future _command_with_parser(Formatter formatter, Args &&...args) { auto formatted_cmd = formatter(std::forward(args)...); - assert(_pool); - SafeAsyncConnection connection(*_pool); + if (_connection) { + // Single connection mode. + auto &connection = _connection->connection(); + if (connection.broken()) { + throw Error("connection is broken"); + } + + return connection.send(std::move(formatted_cmd)); + } else { + assert(_pool); + SafeAsyncConnection connection(*_pool); - return connection.connection().send(std::move(formatted_cmd)); + return connection.connection().send(std::move(formatted_cmd)); + } } template @@ -1065,16 +1079,29 @@ class AsyncRedis { void _callback_command_with_parser(Callback &&cb, Formatter formatter, Args &&...args) { auto formatted_cmd = formatter(std::forward(args)...); - assert(_pool); - SafeAsyncConnection connection(*_pool); + if (_connection) { + // Single connection mode. + auto &connection = _connection->connection(); + if (connection.broken()) { + throw Error("connection is broken"); + } - connection.connection().send( - std::move(formatted_cmd), std::forward(cb)); + connection.send( + std::move(formatted_cmd), std::forward(cb)); + } else { + assert(_pool); + SafeAsyncConnection connection(*_pool); + + connection.connection().send( + std::move(formatted_cmd), std::forward(cb)); + } } EventLoopSPtr _loop; AsyncConnectionPoolSPtr _pool; + + GuardedAsyncConnectionSPtr _connection; }; } diff --git a/src/sw/redis++/async_redis_cluster.cpp b/src/sw/redis++/async_redis_cluster.cpp index f90a776..fa8468d 100644 --- a/src/sw/redis++/async_redis_cluster.cpp +++ b/src/sw/redis++/async_redis_cluster.cpp @@ -35,6 +35,18 @@ AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts, _pool = std::make_shared(_loop, pool_opts, opts, role); } +AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection) { + assert(_pool); + + auto pool = _pool->fetch(hash_tag); + if (new_connection) { + // Create a new pool. + pool = std::make_shared(pool->clone()); + } + + return AsyncRedis(std::make_shared(pool)); +} + AsyncSubscriber AsyncRedisCluster::subscriber() { assert(_pool); diff --git a/src/sw/redis++/async_redis_cluster.h b/src/sw/redis++/async_redis_cluster.h index 949c716..1612873 100644 --- a/src/sw/redis++/async_redis_cluster.h +++ b/src/sw/redis++/async_redis_cluster.h @@ -21,6 +21,7 @@ #include "sw/redis++/utils.h" #include "sw/redis++/async_connection.h" #include "sw/redis++/async_connection_pool.h" +#include "sw/redis++/async_redis.h" #include "sw/redis++/async_shards_pool.h" #include "sw/redis++/async_subscriber.h" #include "sw/redis++/event_loop.h" @@ -48,6 +49,8 @@ class AsyncRedisCluster { ~AsyncRedisCluster() = default; + AsyncRedis redis(const StringView &hash_tag, bool new_connection = true); + AsyncSubscriber subscriber(); template