Skip to content

Commit

Permalink
python 3.8 support for pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Dec 22, 2023
1 parent 765307d commit f771ff7
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 44 deletions.
68 changes: 31 additions & 37 deletions bin/parallel_sync
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
#!/usr/bin/env python

"""
Parallel sync is an innovative, experimental feature designed to optimize
throughput by utilizing available CPUs/threads, particularly beneficial
Parallel sync is an innovative, experimental feature designed to optimize
throughput by utilizing available CPUs/threads, particularly beneficial
in environments experiencing high network latency.
Scenario & Challenge:
In instances where your PG database, Elasticsearch/OpenSearch, and PGSync
servers operate on divergent networks, a delay in request/response time is
noticeable. The primary constraint emerges from the database query's roundtrip,
which even server-side cursors can address only to a limited extent by fetching
a certain number of records at a time. The consequent delay in fetching the
In instances where your PG database, Elasticsearch/OpenSearch, and PGSync
servers operate on divergent networks, a delay in request/response time is
noticeable. The primary constraint emerges from the database query's roundtrip,
which even server-side cursors can address only to a limited extent by fetching
a certain number of records at a time. The consequent delay in fetching the
next cursor significantly hampers the overall synchronization speed.
Solution:
To mitigate this, the strategy is to conduct an initial fast/parallel sync,
thereby populating Elasticsearch/OpenSearch in a single iteration.
To mitigate this, the strategy is to conduct an initial fast/parallel sync,
thereby populating Elasticsearch/OpenSearch in a single iteration.
Post this, the regular pgsync can continue running as a daemon.
Approach and Technical Implementation:
The approach centers around utilizing the Tuple identifier record of the table
columns. Every table incorporates a system column – "ctid" of type "tid,"
The approach centers around utilizing the Tuple identifier record of the table
columns. Every table incorporates a system column – "ctid" of type "tid,"
which helps identify the page record and the row number in each block.
This element facilitates the pagination of the sync process.
Technically, pagination implies dividing each paged record amongst the
available CPUs/threads. This division enables the parallel execution of
Elasticsearch/OpenSearch bulk inserts. The "ctid" serves as a tuple
Technically, pagination implies dividing each paged record amongst the
available CPUs/threads. This division enables the parallel execution of
Elasticsearch/OpenSearch bulk inserts. The "ctid" serves as a tuple
(for instance, (1, 5)), pinpointing the row in a disk page.
By leveraging this method, all paged row records are retrieved upfront and
allocated as work units across the worker threads/CPUs.
Each work unit, defined by the BLOCK_SIZE, denotes the number of root node
By leveraging this method, all paged row records are retrieved upfront and
allocated as work units across the worker threads/CPUs.
Each work unit, defined by the BLOCK_SIZE, denotes the number of root node
records assigned for each worker to process.
Subsequently, the workers execute queries for each assigned chunk of work,
filtered based on the page number and row numbers.
This systematic and parallel approach optimizes the synchronization process,
Subsequently, the workers execute queries for each assigned chunk of work,
filtered based on the page number and row numbers.
This systematic and parallel approach optimizes the synchronization process,
especially in environments challenged by network latency.
"""

Expand All @@ -56,40 +56,34 @@ import sqlalchemy as sa

from pgsync.settings import BLOCK_SIZE, CHECKPOINT_PATH
from pgsync.sync import Sync
from pgsync.utils import (
compiled_query,
config_loader,
get_config,
show_settings,
timeit,
)
from pgsync.utils import config_loader, get_config, show_settings, timeit


def save_ctid(page: int, row: int, name: str) -> None:
def save_ctid(page: int, row: int, filename: str) -> None:
"""
Save the checkpoint for a given page and row in a file with the given name.
Args:
page (int): The page number to save.
row (int): The row number to save.
name (str): The name of the file to save the checkpoint in.
filename (str): The name of the file to save the checkpoint in.
"""
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid")
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid")
with open(checkpoint_file, "w+") as fp:
fp.write(f"{page},{row}\n")


def read_ctid(name: str) -> None:
def read_ctid(filename: str) -> t.Tuple[t.Optional[int], t.Optional[int]]:
"""
Reads the checkpoint file for the given name and returns the page and row numbers.
Args:
name (str): The name of the checkpoint file.
filename (str): The name of the checkpoint file.
Returns:
tuple: A tuple containing the page and row numbers. If the checkpoint file does not exist, returns (None, None).
"""
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid")
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{filename}.ctid")
if os.path.exists(checkpoint_file):
with open(checkpoint_file, "r") as fp:
pairs: str = fp.read().split()[0].split(",")
Expand Down Expand Up @@ -141,10 +135,10 @@ def fetch_tasks(
sync: Sync = Sync(doc)
page: t.Optional[int] = None
row: t.Optional[int] = None
name: str = re.sub(
filename: str = re.sub(
"[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}"
)
page, row = read_ctid(name=name)
page, row = read_ctid(filename)
statement: sa.sql.Select = sa.select(
*[
sa.literal_column("1").label("x"),
Expand Down Expand Up @@ -374,10 +368,10 @@ def run_task(
if len(task) > 0:
page: int = max(task.keys())
row: int = max(task[page])
name: str = re.sub(
filename: str = re.sub(
"[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}"
)
save_ctid(page=page, row=row, name=name)
save_ctid(page, row, filename)

return 1

Expand Down
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --output-file=requirements/base.txt requirements/base.in
Expand Down
3 changes: 2 additions & 1 deletion requirements/dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ flake8
freezegun
isort
mock
pre-commit
# pinned to 3.5.0 because of python 3.8 support
pre-commit==3.5.0
pytest
pytest-cov
pytest-mock
5 changes: 3 additions & 2 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --output-file=requirements/dev.txt requirements/dev.in
Expand Down Expand Up @@ -112,7 +112,7 @@ platformdirs==4.1.0
# virtualenv
pluggy==1.3.0
# via pytest
pre-commit==3.6.0
pre-commit==3.5.0
# via -r requirements/dev.in
psycopg2-binary==2.9.9
# via -r requirements/base.in
Expand Down Expand Up @@ -171,6 +171,7 @@ tomli==2.0.1
typing-extensions==4.9.0
# via
# black
# faker
# sqlalchemy
urllib3==1.26.18
# via
Expand Down
6 changes: 3 additions & 3 deletions scripts/lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
set -e
set -x

isort --profile black --check-only bin demo examples pgsync plugins scripts tests
black --check bin demo examples pgsync plugins scripts tests
flake8 pgsync tests examples plugins demo
isort --profile black --check-only bin/* demo examples pgsync plugins scripts tests
black --check bin/* demo examples pgsync plugins scripts tests
flake8 pgsync tests examples plugins demo bin/*

0 comments on commit f771ff7

Please sign in to comment.