Skip to content

Commit

Permalink
Merge branch 'main' into 238-generate-fully-variant-json-schema-type-…
Browse files Browse the repository at this point in the history
…for-columns-of-type-jsonjsonb
  • Loading branch information
visch authored Oct 16, 2023
2 parents 15af16b + 7c4d8b8 commit d951737
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/constraints.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pip==23.2.1
pip==23.3
poetry==1.6.1
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,72 @@ To connect through SSH, you will need to determine the following pieces of infor
- The private key you use for authentication with the bastion server, provided in the `ssh.private_key` configuration option. If your private key is protected by a password (alternatively called a "private key passphrase"), provide it in the `ssh.private_key_password` configuration option. If your private key doesn't have a password, you can safely leave this field blank.
After everything has been configured, be sure to indicate your use of an ssh tunnel to the tap by configuring the `ssh.enable` configuration option to be `True`. Then, you should be able to connect to your privately accessible Postgres database through the bastion server.
## Log-Based Replication
Log-based replication is an alternative to full-table and incremental syncs and syncs all changes tot he database, including deletes. This feature is built based on [postgres replication slots](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS).
### Negatives of Log Based Replication
1. Managing replication slots - Log-based replication has to be set up and maintained on the database. This tap attempts to abstract away as much complexity as possible, but there's still potentially manual effort needed
2. Log Files - When a replication slot is setup the file that holds these logs will continue to grow until consumed, this can cause issues if the tap doesn't ingest these quickly enough due to outages, etc.
If and when someone finds more please add them to this list!
### Implementation Details
Log-based replication will modify the schemas output by the tap. Specifically, all fields will be made nullable and non-required. The reason for this is that when the tap sends a message indicating that a record has been deleted, that message will leave all fields for that record (except primary keys) as null. The stream's schema must be capable of accomodating these messages, even if a source field in the database is not nullable. As a result, log-based schemas will have all fields nullable.
Note that changing what streams are selected after already beginning log-based replication can have unexpected consequences. To ensure consistent output, it is best to keep selected streams the same across invocations of the tap.
Note also that using log-based replication will cause the replication key for all streams to be set to "_sdc_lsn", which is the Postgres LSN for the record in question.
### How to Set Up Log-Based Replication
1. Ensure you are using PostgresSQL 9.4 or higher.
1. Need to access the master postgres instance
1. Install the wal2json plugin for your database. Example instructions are given below for a Postgres 15.0 database running on Ubuntu 22.04. For more information, or for alternative versions/operating systems, refer to the [wal2json documentation](https://github.com/eulerto/wal2json)
- Update and upgrade apt if necessary.
```bash
sudo apt update
sudo apt upgrade -y
```
- Prepare by making prerequisite installations.
```bash
sudo apt install curl ca-certificates
sudo install -d /usr/share/postgresql-common/pgdg
```
- Import the repository keys for the Postgres Apt repository
```bash
sudo curl -o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc --fail https://www.postgresql.org/media/keys/ACCC4CF8.asc
```
- Create the pgdg.list file.
```bash
sudo sh -c 'echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] https://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
```
- Use the Postgres Apt repository to install wal2json
```bash
sudo apt update
sudo apt-get install postgresql-server-dev-15
export PATH=/usr/lib/postgresql/15/bin:$PATH
sudo apt-get install postgresql-15-wal2json
```
1. Configure your database with wal2json enabled.
- Edit your `postgresql.conf` configuration file so the following parameters are appropriately set.
```
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
```
- Restart PostgresSQL
- Create a replication slot for tap-postgres.
```sql
SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json');
```
1. Ensure your configuration for tap-postgres specifies host, port, user, password, and database manually, without relying on an sqlalchemy url.
1. Use the following metadata modification in your `meltano.yml` for the streams you wish to have as log-based. Note that during log-based replication, we do not support any replication key other than `_sdc_lsn`.
```yml
metadata:
"*":
replication_method: LOG_BASED
replication_key: _sdc_lsn
```
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

208 changes: 207 additions & 1 deletion tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type, Union
import json
import select
import typing
from functools import cached_property
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Type, Union

import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy
from psycopg2 import extras
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk.helpers._state import increment_state
from singer_sdk.helpers._typing import TypeConformanceLevel
from sqlalchemy import nullsfirst
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -275,3 +282,202 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
with self.connector._connect() as con:
for row in con.execute(query):
yield dict(row)


class PostgresLogBasedStream(SQLStream):
"""Stream class for Postgres log-based streams."""

connector_class = PostgresConnector

# JSONB Objects won't be selected without type_confomance_level to ROOT_ONLY
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

replication_key = "_sdc_lsn"

@property
def config(self) -> Mapping[str, Any]:
"""Return a read-only config dictionary."""
return MappingProxyType(self._config)

@cached_property
def schema(self) -> dict:
"""Override schema for log-based replication adding _sdc columns."""
schema_dict = typing.cast(dict, self._singer_catalog_entry.schema.to_dict())
for property in schema_dict["properties"].values():
if "null" not in property["type"]:
property["type"].append("null")
if "required" in schema_dict:
schema_dict.pop("required")
schema_dict["properties"].update({"_sdc_deleted_at": {"type": ["string"]}})
schema_dict["properties"].update({"_sdc_lsn": {"type": ["integer"]}})
return schema_dict

def _increment_stream_state(
self,
latest_record: dict[str, Any],
*,
context: dict | None = None,
) -> None:
"""Update state of stream or partition with data from the provided record.
The default implementation does not advance any bookmarks unless
`self.replication_method == 'INCREMENTAL'`. For us, `self.replication_method ==
'LOG_BASED'`, so an override is required.
"""
# This also creates a state entry if one does not yet exist:
state_dict = self.get_context_state(context)

# Advance state bookmark values if applicable
if latest_record: # This is the only line that has been overridden.
if not self.replication_key:
msg = (
f"Could not detect replication key for '{self.name}' "
f"stream(replication method={self.replication_method})"
)
raise ValueError(msg)
treat_as_sorted = self.is_sorted
if not treat_as_sorted and self.state_partitioning_keys is not None:
# Streams with custom state partitioning are not resumable.
treat_as_sorted = False
increment_state(
state_dict,
replication_key=self.replication_key,
latest_record=latest_record,
is_sorted=treat_as_sorted,
check_sorted=self.check_sorted,
)

def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects."""
status_interval = 5.0 # if no records in 5 seconds the tap can exit
start_lsn = self.get_starting_replication_key_value(context=context)
if start_lsn is None:
start_lsn = 0
logical_replication_connection = self.logical_replication_connection()
logical_replication_cursor = logical_replication_connection.cursor()

# Flush logs from the previous sync. send_feedback() will only flush LSNs before
# the value of flush_lsn, not including the value of flush_lsn, so this is safe
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

logical_replication_cursor.start_replication(
slot_name="tappostgres",
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
options={
"format-version": 2,
"include-transaction": False,
"add-tables": self.fully_qualified_name,
},
)

# Using scaffolding layout from:
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor
while True:
message = logical_replication_cursor.read_message()
if message:
row = self.consume(message)
if row:
yield row
else:
timeout = (
status_interval
- (
datetime.datetime.now()
- logical_replication_cursor.feedback_timestamp
).total_seconds()
)
try:
# If the timeout has passed and the cursor still has no new
# messages, the sync has completed.
if (
select.select(
[logical_replication_cursor], [], [], max(0, timeout)
)[0]
== []
):
break
except InterruptedError:
pass

logical_replication_cursor.close()
logical_replication_connection.close()

def consume(self, message) -> dict | None:
"""Ingest WAL message."""
try:
message_payload = json.loads(message.payload)
except json.JSONDecodeError:
self.logger.warning(
"A message payload of %s could not be converted to JSON",
message.payload,
)
return

row = {}

upsert_actions = {"I", "U"}
delete_actions = {"D"}
truncate_actions = {"T"}
transaction_actions = {"B", "C"}

if message_payload["action"] in upsert_actions:
for column in message_payload["columns"]:
row.update({column["name"]: column["value"]})
row.update({"_sdc_deleted_at": None})
row.update({"_sdc_lsn": message.data_start})
elif message_payload["action"] in delete_actions:
for column in message_payload["identity"]:
row.update({column["name"]: column["value"]})
row.update(
{
"_sdc_deleted_at": datetime.datetime.utcnow().strftime(
r"%Y-%m-%dT%H:%M:%SZ"
)
}
)
row.update({"_sdc_lsn": message.data_start})
elif message_payload["action"] in truncate_actions:
self.logger.debug(
(
"A message payload of %s (corresponding to a truncate action) "
"could not be processed."
),
message.payload,
)
elif message_payload["action"] in transaction_actions:
self.logger.debug(
(
"A message payload of %s (corresponding to a transaction beginning "
"or commit) could not be processed."
),
message.payload,
)
else:
raise RuntimeError(
(
"A message payload of %s (corresponding to an unknown action type) "
"could not be processed."
),
message.payload,
)

return row

def logical_replication_connection(self):
"""A logical replication connection to the database.
Uses a direct psycopg2 implementation rather than through sqlalchemy.
"""
connection_string = (
f"dbname={self.config['database']} user={self.config['user']} password="
f"{self.config['password']} host={self.config['host']} port="
f"{self.config['port']}"
)
return psycopg2.connect(
connection_string,
application_name="tap_postgres",
connection_factory=extras.LogicalReplicationConnection,
)
Loading

0 comments on commit d951737

Please sign in to comment.