Skip to content

Commit

Permalink
Merge pull request #15 from sendbird/feat/brpop_timeout
Browse files Browse the repository at this point in the history
Add brpop timeout option
  • Loading branch information
dlunch authored Aug 22, 2023
2 parents a7cece2 + 18cf462 commit c3ea070
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
10 changes: 5 additions & 5 deletions kombu/transport/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ def _register_BRPOP(self, channel):
except Exception as e:
logger.error('Error while registering BRPOP', extra={"e": e, "key": conn.key})

channel._brpop_start()
timeout = channel.connection.client.transport_options.get('brpop_timeout', 1)
channel._brpop_start(timeout)

def on_poll_init(self, poller):
self.poller = poller
Expand Down Expand Up @@ -317,6 +318,7 @@ class Channel(RedisChannel):
'namespace',
'keyprefix_queue',
'keyprefix_fanout',
'brpop_timeout'
)

def __init__(self, conn, *args, **kwargs):
Expand Down Expand Up @@ -364,13 +366,11 @@ def close(self):

RedisClusterConnection.close(self.client)

def _brpop_start(self, timeout=1):
def _brpop_start(self, timeout):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return

timeout = timeout or 0

for key in queues:
for _, _, conn, _ in self.connection.cycle._chan_to_sock:
if conn.key == key and conn.in_poll == False:
Expand All @@ -396,7 +396,7 @@ def _brpop_read(self, **options):
# We should not throw error on this method to make kombu to continue operation
raise Empty()

conn.client.connection.send_command('BRPOP', conn.key, conn.timeout) # schedule next BRPOP
conn.client.connection.send_command('BRPOP', conn.key, conn.timeout) # schedule next BRPOP

if resp:
self.deliver_response(resp)
Expand Down
18 changes: 18 additions & 0 deletions t/integration/test_redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ def invalid_connection():
return kombu.Connection('redis-cluster://localhost:12345')


def test_brpop_timeout():
def patched_brpop_start(self, timeout):
assert timeout == 10


with patch('kombu.transport.redis_cluster.Channel._brpop_start', patched_brpop_start):
conn = kombu.Connection('redis-cluster://localhost:7000', transport_options={'brpop_timeout': 10})

queue = conn.SimpleQueue('test_connectionerror')
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
try:
_ = queue.get(timeout=1)
except queue.Empty:
pass

conn.close()


def test_connection_reuse(connection):
from kombu.transport.redis_cluster import RedisClusterConnection

Expand Down

0 comments on commit c3ea070

Please sign in to comment.