Skip to content

Commit

Permalink
change REDIS_CHUNK_SIZE to REDIS_READ_CHUNK_SIZE
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Jan 30, 2022
1 parent a32f117 commit 002c998
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 23 deletions.
4 changes: 3 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
# REDIS_DB=0
# REDIS_AUTH=PLEASE_REPLACE_ME
# number of items to read from Redis at a time
# REDIS_CHUNK_SIZE=1000
# REDIS_READ_CHUNK_SIZE=1000
# number of items to write from Redis at a time
# REDIS_WRITE_CHUNK_SIZE=1000
# redis socket connection timeout
# REDIS_SOCKET_TIMEOUT=5
# REDIS_POLL_INTERVAL=0.01
Expand Down
7 changes: 4 additions & 3 deletions pgsync/redisqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from redis import Redis
from redis.exceptions import ConnectionError

from .settings import REDIS_CHUNK_SIZE, REDIS_SOCKET_TIMEOUT
from .settings import REDIS_READ_CHUNK_SIZE, REDIS_SOCKET_TIMEOUT
from .urls import get_redis_url

logger = logging.getLogger(__name__)
Expand All @@ -32,13 +32,14 @@ def __init__(self, name: str, namespace: str = "queue", **kwargs):
logger.exception(f"Redis server is not running: {e}")
raise

@property
def qsize(self) -> int:
"""Return the approximate size of the queue."""
return self.__db.llen(self.key)

def empty(self) -> bool:
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
return self.qsize == 0

def push(self, item) -> None:
"""Push item into the queue."""
Expand All @@ -60,7 +61,7 @@ def pop(self, block: bool = True, timeout: int = None) -> dict:

def bulk_pop(self, chunk_size: Optional[int] = None) -> List[dict]:
"""Remove and return multiple items from the queue."""
chunk_size: int = chunk_size or REDIS_CHUNK_SIZE
chunk_size: int = chunk_size or REDIS_READ_CHUNK_SIZE
pipeline = self.__db.pipeline()
pipeline.lrange(self.key, 0, chunk_size - 1)
pipeline.ltrim(self.key, chunk_size, -1)
Expand Down
2 changes: 1 addition & 1 deletion pgsync/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
REDIS_DB = env.int("REDIS_DB", default=0)
REDIS_AUTH = env.str("REDIS_AUTH", default=None)
# number of items to read from Redis at a time
REDIS_CHUNK_SIZE = env.int("REDIS_CHUNK_SIZE", default=1000)
REDIS_READ_CHUNK_SIZE = env.int("REDIS_READ_CHUNK_SIZE", default=1000)
# number of items to write to Redis at a time
REDIS_WRITE_CHUNK_SIZE = env.int("REDIS_WRITE_CHUNK_SIZE", default=1000)
# redis socket connection timeout
Expand Down
21 changes: 11 additions & 10 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
CHECKPOINT_PATH,
LOG_INTERVAL,
POLL_TIMEOUT,
REDIS_WRITE_CHUNK_SIZE,
REDIS_POLL_INTERVAL,
REDIS_WRITE_CHUNK_SIZE,
REPLICATION_SLOT_CLEANUP_INTERVAL,
)
from .transform import get_private_keys, transform
Expand Down Expand Up @@ -992,14 +992,15 @@ def poll_db(self) -> None:
channel: str = self.database
cursor.execute(f'LISTEN "{channel}"')
logger.debug(f'Listening for notifications on channel "{channel}"')
item_queue = []
items: list = []

while True:
# NB: consider reducing POLL_TIMEOUT to increase throughout
if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []):
# Catch any hanging items from the last poll
if len(item_queue)>0:
self.redis.bulk_push(item_queue)
item_queue = []
if items:
self.redis.bulk_push(items)
items = []
continue

try:
Expand All @@ -1009,13 +1010,13 @@ def poll_db(self) -> None:
os._exit(-1)

while conn.notifies:
if len(item_queue)>=REDIS_WRITE_CHUNK_SIZE:
self.redis.bulk_push(item_queue)
item_queue=[]
if len(items) >= REDIS_WRITE_CHUNK_SIZE:
self.redis.bulk_push(items)
items = []
notification: AnyStr = conn.notifies.pop(0)
if notification.channel == channel:
payload = json.loads(notification.payload)
item_queue.append(payload)
items.append(payload)
logger.debug(f"on_notify: {payload}")
self.count["db"] += 1

Expand Down Expand Up @@ -1094,7 +1095,7 @@ def status(self):
f"Xlog: [{self.count['xlog']:,}] => "
f"Db: [{self.count['db']:,}] => "
f"Redis: [total = {self.count['redis']:,} "
f"pending = {self.redis.qsize():,}] => "
f"pending = {self.redis.qsize:,}] => "
f"Elastic: [{self.es.doc_count:,}] ...\n"
)
sys.stdout.flush()
Expand Down
16 changes: 8 additions & 8 deletions pgsync/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

def timeit(func):
def timed(*args, **kwargs):
since = time()
since: float = time()
retval = func(*args, **kwargs)
until = time()
until: float = time()
sys.stdout.write(
f"{func.__name__} ({args}, {kwargs}) {until-since} secs\n"
)
Expand All @@ -41,18 +41,18 @@ def timed(*args, **kwargs):

class Timer:
def __init__(self, message: Optional[str] = None):
self._message = message or ""
self.message: str = message or ""

def __enter__(self):
self.start = time()
return self

def __exit__(self, *args):
self.end = time()
self.elapsed = self.end - self.start
end: float = time()
elapsed: float = end - self.start
sys.stdout.write(
f"{self._message} {str(timedelta(seconds=self.elapsed))} "
f"({self.elapsed:2.2f} sec)\n"
f"{self.message} {(timedelta(seconds=elapsed))} "
f"({elapsed:2.2f} sec)\n"
)


Expand Down Expand Up @@ -90,7 +90,7 @@ def show_settings(schema: str = None, **kwargs) -> None:
def threaded(fn):
"""Decorator for threaded code execution."""

def wrapper(*args, **kwargs):
def wrapper(*args, **kwargs) -> Thread:
thread: Thread = Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread
Expand Down

0 comments on commit 002c998

Please sign in to comment.