Skip to content

Commit

Permalink
batching redis writes (#234)
Browse files Browse the repository at this point in the history
* batching redis writes

* properly clearing item queue
  • Loading branch information
JacobReynolds authored Jan 30, 2022
1 parent a08fcb4 commit a32f117
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pgsync/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
REDIS_AUTH = env.str("REDIS_AUTH", default=None)
# number of items to read from Redis at a time
REDIS_CHUNK_SIZE = env.int("REDIS_CHUNK_SIZE", default=1000)
# number of items to write to Redis at a time
REDIS_WRITE_CHUNK_SIZE = env.int("REDIS_WRITE_CHUNK_SIZE", default=1000)
# redis socket connection timeout
REDIS_SOCKET_TIMEOUT = env.int("REDIS_SOCKET_TIMEOUT", default=5)
# redis poll interval (in secs)
Expand Down
12 changes: 10 additions & 2 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
CHECKPOINT_PATH,
LOG_INTERVAL,
POLL_TIMEOUT,
REDIS_WRITE_CHUNK_SIZE,
REDIS_POLL_INTERVAL,
REPLICATION_SLOT_CLEANUP_INTERVAL,
)
Expand Down Expand Up @@ -991,10 +992,14 @@ def poll_db(self) -> None:
channel: str = self.database
cursor.execute(f'LISTEN "{channel}"')
logger.debug(f'Listening for notifications on channel "{channel}"')

item_queue = []
while True:
# NB: consider reducing POLL_TIMEOUT to increase throughout
if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []):
# Catch any hanging items from the last poll
if len(item_queue)>0:
self.redis.bulk_push(item_queue)
item_queue = []
continue

try:
Expand All @@ -1004,10 +1009,13 @@ def poll_db(self) -> None:
os._exit(-1)

while conn.notifies:
if len(item_queue)>=REDIS_WRITE_CHUNK_SIZE:
self.redis.bulk_push(item_queue)
item_queue=[]
notification: AnyStr = conn.notifies.pop(0)
if notification.channel == channel:
payload = json.loads(notification.payload)
self.redis.push(payload)
item_queue.append(payload)
logger.debug(f"on_notify: {payload}")
self.count["db"] += 1

Expand Down

0 comments on commit a32f117

Please sign in to comment.