diff --git a/.github/workflows/constraints.txt b/.github/workflows/constraints.txt index 3fe057e2..a2ba2c6c 100644 --- a/.github/workflows/constraints.txt +++ b/.github/workflows/constraints.txt @@ -1,2 +1,2 @@ -pip==23.2.1 +pip==23.3 poetry==1.6.1 diff --git a/README.md b/README.md index 629d9ea9..7f538dc1 100644 --- a/README.md +++ b/README.md @@ -193,3 +193,72 @@ To connect through SSH, you will need to determine the following pieces of infor - The private key you use for authentication with the bastion server, provided in the `ssh.private_key` configuration option. If your private key is protected by a password (alternatively called a "private key passphrase"), provide it in the `ssh.private_key_password` configuration option. If your private key doesn't have a password, you can safely leave this field blank. After everything has been configured, be sure to indicate your use of an ssh tunnel to the tap by configuring the `ssh.enable` configuration option to be `True`. Then, you should be able to connect to your privately accessible Postgres database through the bastion server. + +## Log-Based Replication + +Log-based replication is an alternative to full-table and incremental syncs and syncs all changes tot he database, including deletes. This feature is built based on [postgres replication slots](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS). + +### Negatives of Log Based Replication + +1. Managing replication slots - Log-based replication has to be set up and maintained on the database. This tap attempts to abstract away as much complexity as possible, but there's still potentially manual effort needed +2. Log Files - When a replication slot is setup the file that holds these logs will continue to grow until consumed, this can cause issues if the tap doesn't ingest these quickly enough due to outages, etc. + +If and when someone finds more please add them to this list! + +### Implementation Details +Log-based replication will modify the schemas output by the tap. Specifically, all fields will be made nullable and non-required. The reason for this is that when the tap sends a message indicating that a record has been deleted, that message will leave all fields for that record (except primary keys) as null. The stream's schema must be capable of accomodating these messages, even if a source field in the database is not nullable. As a result, log-based schemas will have all fields nullable. + +Note that changing what streams are selected after already beginning log-based replication can have unexpected consequences. To ensure consistent output, it is best to keep selected streams the same across invocations of the tap. + +Note also that using log-based replication will cause the replication key for all streams to be set to "_sdc_lsn", which is the Postgres LSN for the record in question. + +### How to Set Up Log-Based Replication + +1. Ensure you are using PostgresSQL 9.4 or higher. +1. Need to access the master postgres instance +1. Install the wal2json plugin for your database. Example instructions are given below for a Postgres 15.0 database running on Ubuntu 22.04. For more information, or for alternative versions/operating systems, refer to the [wal2json documentation](https://github.com/eulerto/wal2json) + - Update and upgrade apt if necessary. + ```bash + sudo apt update + sudo apt upgrade -y + ``` + - Prepare by making prerequisite installations. + ```bash + sudo apt install curl ca-certificates + sudo install -d /usr/share/postgresql-common/pgdg + ``` + - Import the repository keys for the Postgres Apt repository + ```bash + sudo curl -o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc --fail https://www.postgresql.org/media/keys/ACCC4CF8.asc + ``` + - Create the pgdg.list file. + ```bash + sudo sh -c 'echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] https://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list' + ``` + - Use the Postgres Apt repository to install wal2json + ```bash + sudo apt update + sudo apt-get install postgresql-server-dev-15 + export PATH=/usr/lib/postgresql/15/bin:$PATH + sudo apt-get install postgresql-15-wal2json + ``` +1. Configure your database with wal2json enabled. + - Edit your `postgresql.conf` configuration file so the following parameters are appropriately set. + ``` + wal_level = logical + max_replication_slots = 10 + max_wal_senders = 10 + ``` + - Restart PostgresSQL + - Create a replication slot for tap-postgres. + ```sql + SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json'); + ``` +1. Ensure your configuration for tap-postgres specifies host, port, user, password, and database manually, without relying on an sqlalchemy url. +1. Use the following metadata modification in your `meltano.yml` for the streams you wish to have as log-based. Note that during log-based replication, we do not support any replication key other than `_sdc_lsn`. + ```yml + metadata: + "*": + replication_method: LOG_BASED + replication_key: _sdc_lsn + ``` \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index cb1a130c..0dc0176e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -929,13 +929,13 @@ files = [ [[package]] name = "pre-commit" -version = "3.4.0" +version = "3.5.0" description = "A framework for managing and maintaining multi-language pre-commit hooks." optional = false python-versions = ">=3.8" files = [ - {file = "pre_commit-3.4.0-py2.py3-none-any.whl", hash = "sha256:96d529a951f8b677f730a7212442027e8ba53f9b04d217c4c67dc56c393ad945"}, - {file = "pre_commit-3.4.0.tar.gz", hash = "sha256:6bbd5129a64cad4c0dfaeeb12cd8f7ea7e15b77028d985341478c8af3c759522"}, + {file = "pre_commit-3.5.0-py2.py3-none-any.whl", hash = "sha256:841dc9aef25daba9a0238cd27984041fa0467b4199fc4852e27950664919f660"}, + {file = "pre_commit-3.5.0.tar.gz", hash = "sha256:5804465c675b659b0862f07907f96295d490822a450c4c40e747d0b1c6ebcb32"}, ] [package.dependencies] diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 5c9731a1..90dfdf78 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -5,13 +5,20 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type, Union +import json +import select +import typing +from functools import cached_property +from types import MappingProxyType +from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Type, Union import psycopg2 import singer_sdk.helpers._typing import sqlalchemy +from psycopg2 import extras from singer_sdk import SQLConnector, SQLStream from singer_sdk import typing as th +from singer_sdk.helpers._state import increment_state from singer_sdk.helpers._typing import TypeConformanceLevel from sqlalchemy import nullsfirst from sqlalchemy.engine import Engine @@ -275,3 +282,202 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: with self.connector._connect() as con: for row in con.execute(query): yield dict(row) + + +class PostgresLogBasedStream(SQLStream): + """Stream class for Postgres log-based streams.""" + + connector_class = PostgresConnector + + # JSONB Objects won't be selected without type_confomance_level to ROOT_ONLY + TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY + + replication_key = "_sdc_lsn" + + @property + def config(self) -> Mapping[str, Any]: + """Return a read-only config dictionary.""" + return MappingProxyType(self._config) + + @cached_property + def schema(self) -> dict: + """Override schema for log-based replication adding _sdc columns.""" + schema_dict = typing.cast(dict, self._singer_catalog_entry.schema.to_dict()) + for property in schema_dict["properties"].values(): + if "null" not in property["type"]: + property["type"].append("null") + if "required" in schema_dict: + schema_dict.pop("required") + schema_dict["properties"].update({"_sdc_deleted_at": {"type": ["string"]}}) + schema_dict["properties"].update({"_sdc_lsn": {"type": ["integer"]}}) + return schema_dict + + def _increment_stream_state( + self, + latest_record: dict[str, Any], + *, + context: dict | None = None, + ) -> None: + """Update state of stream or partition with data from the provided record. + + The default implementation does not advance any bookmarks unless + `self.replication_method == 'INCREMENTAL'`. For us, `self.replication_method == + 'LOG_BASED'`, so an override is required. + """ + # This also creates a state entry if one does not yet exist: + state_dict = self.get_context_state(context) + + # Advance state bookmark values if applicable + if latest_record: # This is the only line that has been overridden. + if not self.replication_key: + msg = ( + f"Could not detect replication key for '{self.name}' " + f"stream(replication method={self.replication_method})" + ) + raise ValueError(msg) + treat_as_sorted = self.is_sorted + if not treat_as_sorted and self.state_partitioning_keys is not None: + # Streams with custom state partitioning are not resumable. + treat_as_sorted = False + increment_state( + state_dict, + replication_key=self.replication_key, + latest_record=latest_record, + is_sorted=treat_as_sorted, + check_sorted=self.check_sorted, + ) + + def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: + """Return a generator of row-type dictionary objects.""" + status_interval = 5.0 # if no records in 5 seconds the tap can exit + start_lsn = self.get_starting_replication_key_value(context=context) + if start_lsn is None: + start_lsn = 0 + logical_replication_connection = self.logical_replication_connection() + logical_replication_cursor = logical_replication_connection.cursor() + + # Flush logs from the previous sync. send_feedback() will only flush LSNs before + # the value of flush_lsn, not including the value of flush_lsn, so this is safe + # even though we still want logs with an LSN == start_lsn. + logical_replication_cursor.send_feedback(flush_lsn=start_lsn) + + logical_replication_cursor.start_replication( + slot_name="tappostgres", + decode=True, + start_lsn=start_lsn, + status_interval=status_interval, + options={ + "format-version": 2, + "include-transaction": False, + "add-tables": self.fully_qualified_name, + }, + ) + + # Using scaffolding layout from: + # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor + while True: + message = logical_replication_cursor.read_message() + if message: + row = self.consume(message) + if row: + yield row + else: + timeout = ( + status_interval + - ( + datetime.datetime.now() + - logical_replication_cursor.feedback_timestamp + ).total_seconds() + ) + try: + # If the timeout has passed and the cursor still has no new + # messages, the sync has completed. + if ( + select.select( + [logical_replication_cursor], [], [], max(0, timeout) + )[0] + == [] + ): + break + except InterruptedError: + pass + + logical_replication_cursor.close() + logical_replication_connection.close() + + def consume(self, message) -> dict | None: + """Ingest WAL message.""" + try: + message_payload = json.loads(message.payload) + except json.JSONDecodeError: + self.logger.warning( + "A message payload of %s could not be converted to JSON", + message.payload, + ) + return + + row = {} + + upsert_actions = {"I", "U"} + delete_actions = {"D"} + truncate_actions = {"T"} + transaction_actions = {"B", "C"} + + if message_payload["action"] in upsert_actions: + for column in message_payload["columns"]: + row.update({column["name"]: column["value"]}) + row.update({"_sdc_deleted_at": None}) + row.update({"_sdc_lsn": message.data_start}) + elif message_payload["action"] in delete_actions: + for column in message_payload["identity"]: + row.update({column["name"]: column["value"]}) + row.update( + { + "_sdc_deleted_at": datetime.datetime.utcnow().strftime( + r"%Y-%m-%dT%H:%M:%SZ" + ) + } + ) + row.update({"_sdc_lsn": message.data_start}) + elif message_payload["action"] in truncate_actions: + self.logger.debug( + ( + "A message payload of %s (corresponding to a truncate action) " + "could not be processed." + ), + message.payload, + ) + elif message_payload["action"] in transaction_actions: + self.logger.debug( + ( + "A message payload of %s (corresponding to a transaction beginning " + "or commit) could not be processed." + ), + message.payload, + ) + else: + raise RuntimeError( + ( + "A message payload of %s (corresponding to an unknown action type) " + "could not be processed." + ), + message.payload, + ) + + return row + + def logical_replication_connection(self): + """A logical replication connection to the database. + + Uses a direct psycopg2 implementation rather than through sqlalchemy. + """ + connection_string = ( + f"dbname={self.config['database']} user={self.config['user']} password=" + f"{self.config['password']} host={self.config['host']} port=" + f"{self.config['port']}" + ) + return psycopg2.connect( + connection_string, + application_name="tap_postgres", + connection_factory=extras.LogicalReplicationConnection, + ) diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index 5d7c7b95..c04ed08b 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -2,6 +2,7 @@ from __future__ import annotations import atexit +import copy import io import signal from functools import cached_property @@ -10,12 +11,21 @@ import paramiko from singer_sdk import SQLTap, Stream -from singer_sdk import typing as th # JSON schema typing helpers +from singer_sdk import typing as th +from singer_sdk._singerlib import ( # JSON schema typing helpers + Catalog, + Metadata, + Schema, +) from sqlalchemy.engine import URL from sqlalchemy.engine.url import make_url from sshtunnel import SSHTunnelForwarder -from tap_postgres.client import PostgresConnector, PostgresStream +from tap_postgres.client import ( + PostgresConnector, + PostgresLogBasedStream, + PostgresStream, +) class TapPostgres(SQLTap): @@ -45,6 +55,11 @@ def __init__( + " and password to be set" ) + # If log-based replication is used, sqlalchemy_url can't be used. + assert (self.config.get("sqlalchemy_url") is None) or ( + self.config.get("replication_mode") != "LOG_BASED" + ), "A sqlalchemy_url can't be used with log-based replication" + # If sqlalchemy_url is not being used and ssl_enable is on, ssl_mode must have # one of six allowable values. If ssl_mode is verify-ca or verify-full, a # certificate authority must be provided to verify against. @@ -282,6 +297,16 @@ def __init__( + " configuration option determines where that file is created." ), ), + th.Property( + "default_replication_method", + th.StringType, + default="FULL_TABLE", + allowed_values=["FULL_TABLE", "INCREMENTAL", "LOG_BASED"], + description=( + "Replication method to use if there is not a catalog entry to override " + "this choice. One of `FULL_TABLE`, `INCREMENTAL`, or `LOG_BASED`." + ), + ), ).to_dict() def get_sqlalchemy_url(self, config: Dict[Any, Any]) -> str: @@ -472,6 +497,8 @@ def catch_signal(self, signum, frame) -> None: def catalog_dict(self) -> dict: """Get catalog dictionary. + Override to prevent premature instantiation of the connector. + Returns: The tap's catalog as a dict """ @@ -487,13 +514,79 @@ def catalog_dict(self) -> dict: self._catalog_dict: dict = result return self._catalog_dict + @property + def catalog(self) -> Catalog: + """Get the tap's working catalog. + + Override to do LOG_BASED modifications. + + Returns: + A Singer catalog object. + """ + new_catalog: Catalog = Catalog() + modified_streams: list = [] + for stream in super().catalog.streams: + stream_modified = False + new_stream = copy.deepcopy(stream) + if new_stream.replication_method == "LOG_BASED": + for property in new_stream.schema.properties.values(): + if "null" not in property.type: + stream_modified = True + property.type.append("null") + if new_stream.schema.required: + stream_modified = True + new_stream.schema.required = None + if "_sdc_deleted_at" not in new_stream.schema.properties: + stream_modified = True + new_stream.schema.properties.update( + {"_sdc_deleted_at": Schema(type=["string", "null"])} + ) + new_stream.metadata.update( + { + ("properties", "_sdc_deleted_at"): Metadata( + Metadata.InclusionType.AVAILABLE, True, None + ) + } + ) + if "_sdc_lsn" not in new_stream.schema.properties: + stream_modified = True + new_stream.schema.properties.update( + {"_sdc_lsn": Schema(type=["integer", "null"])} + ) + new_stream.metadata.update( + { + ("properties", "_sdc_lsn"): Metadata( + Metadata.InclusionType.AVAILABLE, True, None + ) + } + ) + if stream_modified: + modified_streams.append(new_stream.tap_stream_id) + new_catalog.add_stream(new_stream) + if modified_streams: + self.logger.info( + "One or more LOG_BASED catalog entries were modified " + f"({modified_streams=}) to allow nullability and include _sdc columns. " + "See README for further information." + ) + return new_catalog + def discover_streams(self) -> list[Stream]: """Initialize all available streams and return them as a list. Returns: List of discovered Stream objects. """ - return [ - PostgresStream(self, catalog_entry, connector=self.connector) - for catalog_entry in self.catalog_dict["streams"] - ] + streams = [] + for catalog_entry in self.catalog_dict["streams"]: + if catalog_entry["replication_method"] == "LOG_BASED": + streams.append( + PostgresLogBasedStream( + self, catalog_entry, connector=self.connector + ) + ) + else: + streams.append( + PostgresStream(self, catalog_entry, connector=self.connector) + ) + return streams