diff --git a/kombu/transport/redis_cluster.py b/kombu/transport/redis_cluster.py index babe9a020..05998ab5a 100644 --- a/kombu/transport/redis_cluster.py +++ b/kombu/transport/redis_cluster.py @@ -221,7 +221,8 @@ def _register_BRPOP(self, channel): except Exception as e: logger.error('Error while registering BRPOP', extra={"e": e, "key": conn.key}) - channel._brpop_start() + timeout = channel.connection.client.transport_options.get('brpop_timeout', 1) + channel._brpop_start(timeout) def on_poll_init(self, poller): self.poller = poller @@ -317,6 +318,7 @@ class Channel(RedisChannel): 'namespace', 'keyprefix_queue', 'keyprefix_fanout', + 'brpop_timeout' ) def __init__(self, conn, *args, **kwargs): @@ -364,13 +366,11 @@ def close(self): RedisClusterConnection.close(self.client) - def _brpop_start(self, timeout=1): + def _brpop_start(self, timeout): queues = self._queue_cycle.consume(len(self.active_queues)) if not queues: return - timeout = timeout or 0 - for key in queues: for _, _, conn, _ in self.connection.cycle._chan_to_sock: if conn.key == key and conn.in_poll == False: @@ -396,7 +396,7 @@ def _brpop_read(self, **options): # We should not throw error on this method to make kombu to continue operation raise Empty() - conn.client.connection.send_command('BRPOP', conn.key, conn.timeout) # schedule next BRPOP + conn.client.connection.send_command('BRPOP', conn.key, conn.timeout) # schedule next BRPOP if resp: self.deliver_response(resp) diff --git a/t/integration/test_redis_cluster.py b/t/integration/test_redis_cluster.py index 2fa2bb5f5..5d4b7d733 100644 --- a/t/integration/test_redis_cluster.py +++ b/t/integration/test_redis_cluster.py @@ -42,6 +42,24 @@ def invalid_connection(): return kombu.Connection('redis-cluster://localhost:12345') +def test_brpop_timeout(): + def patched_brpop_start(self, timeout): + assert timeout == 10 + + + with patch('kombu.transport.redis_cluster.Channel._brpop_start', patched_brpop_start): + conn = kombu.Connection('redis-cluster://localhost:7000', transport_options={'brpop_timeout': 10}) + + queue = conn.SimpleQueue('test_connectionerror') + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + try: + _ = queue.get(timeout=1) + except queue.Empty: + pass + + conn.close() + + def test_connection_reuse(connection): from kombu.transport.redis_cluster import RedisClusterConnection