Skip to content

Commit

Permalink
removed max lsn
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Jul 3, 2024
1 parent 71fd7b4 commit da99dca
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 52 deletions.
50 changes: 11 additions & 39 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def _logical_slot_changes(
func: sa.sql.functions._FunctionGenerator,
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
upto_lsn: t.Optional[int] = None,
upto_lsn: t.Optional[str] = None,
upto_nchanges: t.Optional[int] = None,
limit: t.Optional[int] = None,
offset: t.Optional[int] = None,
Expand All @@ -486,7 +486,7 @@ def _logical_slot_changes(
func (sa.sql.functions._FunctionGenerator): The function to use to read from the slot.
txmin (Optional[int], optional): The minimum transaction ID to read from. Defaults to None.
txmax (Optional[int], optional): The maximum transaction ID to read from. Defaults to None.
upto_lsn (Optional[int], optional): The maximum LSN to read up to. Defaults to None.
upto_lsn (Optional[str], optional): The maximum LSN to read up to. Defaults to None.
upto_nchanges (Optional[int], optional): The maximum number of changes to read. Defaults to None.
limit (Optional[int], optional): The maximum number of rows to return. Defaults to None.
offset (Optional[int], optional): The number of rows to skip before returning. Defaults to None.
Expand Down Expand Up @@ -529,48 +529,20 @@ def _logical_slot_changes(
statement = statement.offset(offset)
return statement

def max_lsn(
self,
slot_name: str,
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
):
filters: list = []
statement: sa.sql.Select = sa.select(
sa.func.MAX(sa.text("lsn")),
).select_from(
sa.func.PG_LOGICAL_SLOT_PEEK_CHANGES(
slot_name,
None,
None,
)
)
if txmin is not None:
filters.append(
sa.cast(
sa.cast(sa.column("xid"), sa.Text),
sa.BigInteger,
)
>= txmin
)
if txmax is not None:
filters.append(
sa.cast(
sa.cast(sa.column("xid"), sa.Text),
sa.BigInteger,
)
< txmax
@property
def current_wal_lsn(self) -> str:
return self.fetchone(
sa.select(sa.func.MAX(sa.text("pg_current_wal_lsn"))).select_from(
sa.func.PG_CURRENT_WAL_LSN()
)
if filters:
statement = statement.where(sa.and_(*filters))
return self.fetchone(statement)[0]
)[0]

def logical_slot_get_changes(
self,
slot_name: str,
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
upto_lsn: t.Optional[int] = None,
upto_lsn: t.Optional[str] = None,
upto_nchanges: t.Optional[int] = None,
limit: t.Optional[int] = None,
offset: t.Optional[int] = None,
Expand Down Expand Up @@ -600,7 +572,7 @@ def logical_slot_peek_changes(
slot_name: str,
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
upto_lsn: t.Optional[int] = None,
upto_lsn: t.Optional[str] = None,
upto_nchanges: t.Optional[int] = None,
limit: t.Optional[int] = None,
offset: t.Optional[int] = None,
Expand All @@ -626,7 +598,7 @@ def logical_slot_count_changes(
slot_name: str,
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
upto_lsn: t.Optional[int] = None,
upto_lsn: t.Optional[str] = None,
upto_nchanges: t.Optional[int] = None,
) -> int:
statement: sa.sql.Select = self._logical_slot_changes(
Expand Down
23 changes: 14 additions & 9 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ def logical_slot_changes(
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
upto_nchanges: t.Optional[int] = None,
upto_lsn: t.Optional[str] = None,
) -> None:
"""
Process changes from the db logical replication logs.
Expand Down Expand Up @@ -377,19 +378,13 @@ def logical_slot_changes(
# minimize the tmp file disk usage when calling
# PG_LOGICAL_SLOT_PEEK_CHANGES and PG_LOGICAL_SLOT_GET_CHANGES
# by limiting to a smaller batch size.

upto_nchanges: int = upto_nchanges or settings.LOGICAL_SLOT_CHUNK_SIZE

# this is the max lsn we can go upto
max_lsn: int = self.max_lsn(self.__name, txmin=txmin, txmax=txmax)

while True:
changes: int = self.logical_slot_peek_changes(
self.__name,
txmin=txmin,
txmax=txmax,
upto_nchanges=upto_nchanges,
upto_lsn=max_lsn,
upto_lsn=upto_lsn,
)
if not changes:
break
Expand Down Expand Up @@ -446,7 +441,7 @@ def logical_slot_changes(
txmin=txmin,
txmax=txmax,
upto_nchanges=upto_nchanges,
upto_lsn=max_lsn,
upto_lsn=upto_lsn,
)
self.count["xlog"] += len(rows)

Expand Down Expand Up @@ -1220,13 +1215,22 @@ def pull(self) -> None:
"""Pull data from db."""
txmin: int = self.checkpoint
txmax: int = self.txid_current
# this is the max lsn we should go upto
upto_lsn: str = self.current_wal_lsn
upto_nchanges: int = settings.LOGICAL_SLOT_CHUNK_SIZE

logger.debug(f"pull txmin: {txmin} - txmax: {txmax}")
# forward pass sync
self.search_client.bulk(
self.index, self.sync(txmin=txmin, txmax=txmax)
)
# now sync up to txmax to capture everything we may have missed
self.logical_slot_changes(txmin=txmin, txmax=txmax, upto_nchanges=None)
self.logical_slot_changes(
txmin=txmin,
txmax=txmax,
upto_nchanges=upto_nchanges,
upto_lsn=upto_lsn,
)
self.checkpoint: int = txmax or self.txid_current
self._truncate = True

Expand Down Expand Up @@ -1263,6 +1267,7 @@ async def async_status(self) -> None:
await asyncio.sleep(settings.LOG_INTERVAL)

def _status(self, label: str) -> None:
# TODO: indicate if we are processing logical logs or not
sys.stdout.write(
f"{label} {self.database}:{self.index} "
f"Xlog: [{self.count['xlog']:,}] => "
Expand Down
11 changes: 7 additions & 4 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_logical_slot_changes(self, mock_logger, sync):
"testdb_testdb",
txmin=None,
txmax=None,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_nchanges=None,
upto_lsn=None,
)
mock_sync.assert_not_called()
Expand All @@ -97,7 +97,7 @@ def test_logical_slot_changes(self, mock_logger, sync):
"testdb_testdb",
txmin=None,
txmax=None,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_nchanges=None,
upto_lsn=None,
)
mock_sync.assert_not_called()
Expand Down Expand Up @@ -125,7 +125,7 @@ def test_logical_slot_changes(self, mock_logger, sync):
"testdb_testdb",
txmin=None,
txmax=None,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_nchanges=None,
upto_lsn=None,
)
mock_get.assert_called_once()
Expand Down Expand Up @@ -353,7 +353,10 @@ def test_pull(self, mock_logger, mock_es, sync):
txmin = None
txmax = sync.txid_current - 1
mock_get.assert_called_once_with(
txmin=txmin, txmax=txmax, upto_nchanges=None
txmin=txmin,
txmax=txmax,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_lsn=ANY,
)
mock_logger.debug.assert_called_once_with(
f"pull txmin: {txmin} - txmax: {txmax}"
Expand Down

0 comments on commit da99dca

Please sign in to comment.