diff --git a/bin/parallel_sync b/bin/parallel_sync index 187c1d9d..072b5f88 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -1,42 +1,42 @@ #!/usr/bin/env python """ -Parallel sync is an innovative, experimental feature designed to optimize -throughput by utilizing available CPUs/threads, particularly beneficial +Parallel sync is an innovative, experimental feature designed to optimize +throughput by utilizing available CPUs/threads, particularly beneficial in environments experiencing high network latency. Scenario & Challenge: -In instances where your PG database, Elasticsearch/OpenSearch, and PGSync -servers operate on divergent networks, a delay in request/response time is -noticeable. The primary constraint emerges from the database query's roundtrip, -which even server-side cursors can address only to a limited extent by fetching -a certain number of records at a time. The consequent delay in fetching the +In instances where your PG database, Elasticsearch/OpenSearch, and PGSync +servers operate on divergent networks, a delay in request/response time is +noticeable. The primary constraint emerges from the database query's roundtrip, +which even server-side cursors can address only to a limited extent by fetching +a certain number of records at a time. The consequent delay in fetching the next cursor significantly hampers the overall synchronization speed. Solution: -To mitigate this, the strategy is to conduct an initial fast/parallel sync, -thereby populating Elasticsearch/OpenSearch in a single iteration. +To mitigate this, the strategy is to conduct an initial fast/parallel sync, +thereby populating Elasticsearch/OpenSearch in a single iteration. Post this, the regular pgsync can continue running as a daemon. Approach and Technical Implementation: -The approach centers around utilizing the Tuple identifier record of the table -columns. Every table incorporates a system column – "ctid" of type "tid," +The approach centers around utilizing the Tuple identifier record of the table +columns. Every table incorporates a system column – "ctid" of type "tid," which helps identify the page record and the row number in each block. This element facilitates the pagination of the sync process. -Technically, pagination implies dividing each paged record amongst the -available CPUs/threads. This division enables the parallel execution of -Elasticsearch/OpenSearch bulk inserts. The "ctid" serves as a tuple +Technically, pagination implies dividing each paged record amongst the +available CPUs/threads. This division enables the parallel execution of +Elasticsearch/OpenSearch bulk inserts. The "ctid" serves as a tuple (for instance, (1, 5)), pinpointing the row in a disk page. -By leveraging this method, all paged row records are retrieved upfront and -allocated as work units across the worker threads/CPUs. -Each work unit, defined by the BLOCK_SIZE, denotes the number of root node +By leveraging this method, all paged row records are retrieved upfront and +allocated as work units across the worker threads/CPUs. +Each work unit, defined by the BLOCK_SIZE, denotes the number of root node records assigned for each worker to process. -Subsequently, the workers execute queries for each assigned chunk of work, -filtered based on the page number and row numbers. -This systematic and parallel approach optimizes the synchronization process, +Subsequently, the workers execute queries for each assigned chunk of work, +filtered based on the page number and row numbers. +This systematic and parallel approach optimizes the synchronization process, especially in environments challenged by network latency. """ @@ -56,40 +56,34 @@ import sqlalchemy as sa from pgsync.settings import BLOCK_SIZE, CHECKPOINT_PATH from pgsync.sync import Sync -from pgsync.utils import ( - compiled_query, - config_loader, - get_config, - show_settings, - timeit, -) +from pgsync.utils import config_loader, get_config, show_settings, timeit -def save_ctid(page: int, row: int, name: str) -> None: +def save_ctid(page: int, row: int, filename: str) -> None: """ Save the checkpoint for a given page and row in a file with the given name. Args: page (int): The page number to save. row (int): The row number to save. - name (str): The name of the file to save the checkpoint in. + filename (str): The name of the file to save the checkpoint in. """ - checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid") + checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid") with open(checkpoint_file, "w+") as fp: fp.write(f"{page},{row}\n") -def read_ctid(name: str) -> None: +def read_ctid(filename: str) -> t.Tuple[t.Optional[int], t.Optional[int]]: """ Reads the checkpoint file for the given name and returns the page and row numbers. Args: - name (str): The name of the checkpoint file. + filename (str): The name of the checkpoint file. Returns: tuple: A tuple containing the page and row numbers. If the checkpoint file does not exist, returns (None, None). """ - checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid") + checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid") if os.path.exists(checkpoint_file): with open(checkpoint_file, "r") as fp: pairs: str = fp.read().split()[0].split(",") @@ -141,10 +135,10 @@ def fetch_tasks( sync: Sync = Sync(doc) page: t.Optional[int] = None row: t.Optional[int] = None - name: str = re.sub( + filename: str = re.sub( "[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}" ) - page, row = read_ctid(name=name) + page, row = read_ctid(filename) statement: sa.sql.Select = sa.select( *[ sa.literal_column("1").label("x"), @@ -374,10 +368,10 @@ def run_task( if len(task) > 0: page: int = max(task.keys()) row: int = max(task[page]) - name: str = re.sub( + filename: str = re.sub( "[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}" ) - save_ctid(page=page, row=row, name=name) + save_ctid(page, row, filename) return 1 diff --git a/requirements/base.txt b/requirements/base.txt index 4e37e9d5..00d4b780 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.9 +# This file is autogenerated by pip-compile with Python 3.8 # by the following command: # # pip-compile --output-file=requirements/base.txt requirements/base.in diff --git a/requirements/dev.in b/requirements/dev.in index 18036c99..c35238d4 100644 --- a/requirements/dev.in +++ b/requirements/dev.in @@ -7,7 +7,8 @@ flake8 freezegun isort mock -pre-commit +# pinned to 3.5.0 because of python 3.8 support +pre-commit==3.5.0 pytest pytest-cov pytest-mock diff --git a/requirements/dev.txt b/requirements/dev.txt index 447fccb9..795a9d09 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.9 +# This file is autogenerated by pip-compile with Python 3.8 # by the following command: # # pip-compile --output-file=requirements/dev.txt requirements/dev.in @@ -112,7 +112,7 @@ platformdirs==4.1.0 # virtualenv pluggy==1.3.0 # via pytest -pre-commit==3.6.0 +pre-commit==3.5.0 # via -r requirements/dev.in psycopg2-binary==2.9.9 # via -r requirements/base.in @@ -171,6 +171,7 @@ tomli==2.0.1 typing-extensions==4.9.0 # via # black + # faker # sqlalchemy urllib3==1.26.18 # via diff --git a/scripts/lint.sh b/scripts/lint.sh index 9ecf11e6..04ac80f0 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -3,6 +3,6 @@ set -e set -x -isort --profile black --check-only bin demo examples pgsync plugins scripts tests -black --check bin demo examples pgsync plugins scripts tests -flake8 pgsync tests examples plugins demo +isort --profile black --check-only bin/* demo examples pgsync plugins scripts tests +black --check bin/* demo examples pgsync plugins scripts tests +flake8 pgsync tests examples plugins demo bin/*