Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres replication #392

Merged
merged 44 commits into from
May 2, 2024
Merged

Postgres replication #392

merged 44 commits into from
May 2, 2024

Conversation

jorritsandbrink
Copy link
Collaborator

@jorritsandbrink jorritsandbrink commented Mar 4, 2024

Tell us what you do here

  • implementing verified source (please link a relevant issue labeled as verified source)
  • fixing a bug (please link a relevant bug report)
  • improving, documenting, or customizing an existing source (please link an issue or describe below)
  • anything else (please link an issue or describe below)

Relevant issue

dlt-hub/dlt#933

More PR info

Adds initial support for postgres replication. Some things are still missing, but this is a good time to get feedback.

  • Implements replication functionality based on logical decoding
  • Processes changes in batch (not streaming)
  • Uses built-in pgoutput plugin
  • Uses psycopg2's support for logical replication—this streams messages from pgoutput into Python in an endless loop
  • Uses pypgoutput to decode pgoutput's binary messages—the library's functionality to consume messages and transform them into "change events" (Pydantic models) is not used because it only works on Linux
  • Uses "text mode", meaning data is provided as string values. These string values are coerced into dlt-compatible Python objects, e.g. the string "t" becomes the boolean True. "Binary mode" would be faster, but less robust.
  • Relies on a dedicated replication slot and publication for a table. I.e. two tables means two slots and two publications. This provides granular control and does not intruduce significant overhead if I'm not mistaken. No longer the case, changed because of user feedback. It is now possible to replicate one table, multiple tables, or an entire schema using a single publication.
  • Maintains resource-scoped state to keep track of progress. At beginning of run, last-seen LSN of previous run is retrieved from state and used to advance the replication slot (flush messages from it).
  • Adds two resource types: table_snapshot for initial load, and table_changes for CDC.
    • table_snapshot persists the state of the table in the snapshot that gets exported when creating a replication slot into a physical table, and then uses sql_table resource to do the rest
    • table_changes generates and yields "data items" (TDataItem) and "metadata items" (DataItemWithMeta) from decoded replication messages. Items are first stored in-memory in a list, before they are yielded from this list.
  • Handles both initial load and change propagation.
    • Provides init_replication to setup a replication slot and publication. This function optionally persists snapshot tables representing the state of the exact moment the replication slot got created. It then returns sql_table resources to enable an initial load. Users do not need to use init_replication—they can create a slot and publication in any other way they see fit.
    • Provides replication_resource to create a DltResource that consumes a slot/publication and generates data items with metadata (DataItemWithMeta). It dispatches data to multiple tables if the publication publishes changes for multiple tables.
  • Uses include_columns argument to exclude any columns not provided as input (or includes all columns if not provided)
  • Organizes code in subfolder under sql_database: /sources/sql_database/pg_replication Moved to its own top-level folder.

What's not (yet) included:

  • Chunking mechanism to limit batch size in table_changes implemented
  • DltSource to handle multiple tables / an entire database no longer applies—multiple tables are now handled at the resource level
  • Support for the truncate operation
  • Perhaps some more data type mapping done—common types are handled and exotic types default to text
  • Deletion of snapshot table after it has been consumed not implemented—couldn't find a good way to do this
  • Example pipeline done
  • More tests done
  • User docs

@jorritsandbrink jorritsandbrink requested a review from rudolfix March 4, 2024 01:13
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks super solid already. I had a few questions, mostly about the transition from syncing the snapshot table to when we start replication from a slot (do we lose data?)

do you think that going for binary pgout is a good idea after all? I think json one will be simpler and faster

  • messages are created AFAIK on the server so you do not need to decode them
  • the structure is almost good to be loaded
    we can actually keep both but we add a lot of code that should probably be tested

return dlt.resource(
table_replication_items,
name=table,
write_disposition="merge",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for tables with INSERT only (ie. logs) append will also work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, should work, didn't think of that. I can fetch details about the publication using pg_publication catalog and use append if it only publishes insert. Will be more efficient than merge.

sources/sql_database/pg_replication/__init__.py Outdated Show resolved Hide resolved
from copy import deepcopy
from dataclasses import dataclass, field

import psycopg2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there will be people using sql_database that are not using postgres replication. we are not surfacing this into top level init? anyway... I think we may land it in separate verified source (if we do not share anything with sql alchemy source)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate source makes sense to me. Only thing is that the table_snapshot resource for initial loading uses the sql_table resource under the hood. How should we handle one verified source depending on another?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just assume that user did dlt init twice: for replication and for sql_database. if you cannot import sql_table you can raise exception and tell user to use dlt init. we have nothing better now :)

sources/sql_database/pg_replication/helpers.py Outdated Show resolved Hide resolved
sources/sql_database/pg_replication/helpers.py Outdated Show resolved Hide resolved
self.last_commit_ts: pendulum.DateTime
self.last_commit_lsn = None

def __call__(self, msg: ReplicationMessage) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do I understand correctly that we get a message one by one? I hope that our python code will keep up. also we need some batching (as you say in the ticket)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, messages come in one by one. I will implement batching and then do some load tests to see if performance is acceptable for larger volumes.

the table schema.
"""
# store table schema information
columns = {c.name: _to_dlt_column_schema(c) for c in decoded_msg.columns}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do we get this message? is it coming with every change? it looked like it in jsonpgout

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my previous comment.

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix see my replies on your comments.

Regarding pgoutput vs wal2json:

  • my main consideration to go with pypgoutput first was convenience for the user (no additional setup needed)—for what it's worth, Debezium also supports pypgoutput but not wal2json
  • another reason is that publications only work with pgoutput, meaning that additional client-side filtering is needed (selecting the right table, and the right DML operation if you want to only propagate inserts for example), offsetting some of the benefits of server-side decoding—see this SO on the topic
  • wal2json will probably be a little simpler, but not a lot—the message objects generated by pypgoutput (e.g. Insert, Update, ...) are almost as easy to work with as wal2json's output

That being said, wal2json might be better (necessary) when data volumns are large. I'd suggest we go with pgoutput first, then add wal2json support later if needed.

@jorritsandbrink
Copy link
Collaborator Author

image
Thanks for your feedback Martin, let's continue the discussion here for visibility.

  • I think using one slot + publication for multiple tables can be enabled with some minor modifications to the code.
  • If you have multiple tables in a single publication, it would make sense (be most efficient) if you process all tables concurrently while iterating over the replication messages once.
  • The alternative would be to iterate over the replication messages multiple times, once for each table, and skipping all messages that aren't relevant for that table. Less efficient, but resources/tables can be processed independently.

Can you process individual tables in fivetran / peardb / the other systems you're referring to, or do you have to process all of them simultaneously?

@rudolfix
Copy link
Contributor

@jorritsandbrink it should be easy. if there > 1 table we still have one LSN with which we track new messages right? if so the change is trivial: you can specify a table name which is resolved dynamically:
https://dlthub.com/docs/general-usage/resource#dispatch-data-to-many-tables

drawbacks:

  1. lambda needs table name in the data items. but you can also remove that name in the same lambda if we do not need this in the final table.
  2. full refresh of a table requires a reset of all tables in publication, right? also initial load will generate many sql_table resources

@jorritsandbrink jorritsandbrink marked this pull request as ready for review March 18, 2024 21:58
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so good! I have a few code improvements and one optimization that will make this way faster. I love that we have so many tests.
it also looks to me that dlt-hub/dlt#1105 is crucial to support replication of many tables from a single resource

## Initialize the pipeline

```bash
dlt init pg_replication duckdb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good place to tell the user to use dlt init sql_database ... if they want to use this initial resource

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended the README: 57b5e1e

from .helpers import _gen_replication_resource_name, get_pub_ops, replication_items


@dlt.sources.config.with_config(sections=("sources", "pg_replication"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you need this trick because you want to generate a resource name dynamically. now this is supported without tricks. see ie. kinesis resource:

name=lambda args: args["stream_name"],
    primary_key="_kinesis_msg_id",
    standalone=True,
)
def kinesis_stream(
    stream_name: str = dlt.config.value,

I think you can move all the settings to decorator now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in eec75f0

from .schema_types import _to_dlt_column_schema, _to_dlt_val


@dlt.sources.config.with_config(sections=("sources", "pg_replication"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can declare it as a dlt.source? it returns many resources now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it only returns resources on the first call (when it creates the slot) and only if persist_snapshots is True. So I'd say it's primarily a helper to set up replication, rather than a source.

columns: Optional[Dict[str, TTableHintTemplate[TAnySchemaColumns]]] = None,
target_batch_size: int = 1000,
flush_slot: bool = True,
write_disposition: TWriteDisposition = "append", # TODO: remove after https://github.com/dlt-hub/dlt/issues/1031 has been released
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already merged and available in 0.4.7a0 (pre-release)

Copy link
Collaborator Author

@jorritsandbrink jorritsandbrink Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works nicely with 0.4.7 indeed. I removed the obsolete lines of code in 5636e07.

"""
# start where we left off in previous run
resource_name = _gen_replication_resource_name(slot_name, pub_name)
start_lsn = dlt.current.resource_state(resource_name).get("last_commit_lsn", 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason not to use Incremental? the code below is surely faster than incremental...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it's simpler without Incremental, didn't see a reason to use it.

write_disposition=write_disposition, # TODO: remove after https://github.com/dlt-hub/dlt/issues/1031 has been released
)
yield from gen
if gen.generated_all:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good! just one thing: if gen is closed ie by add_limit`) your cleanup code will not be executed because yield raises GeneratorExit... please do:

try
  yield from gen
finally:
  ...your finalization code

I do not like this because it is not intuitive... I do not see how to make it easier without rewriting code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case we need the cleanup code to only execute if the batch has been yielded completely. We can't split at any LSN, it needs to be the LSN of a Commit message. This is because transactions need to be processed atomically, and transactions can consist of multiple messages (e.g. 100 inserted records = 100 messages). The code has been set up such that batch boundaries are always Commit messages. Replication can also only start (start_lsn) at the beginning of a transaction. So if we have a transaction of 100 inserts, and add_limit(50) we don't want to persist resource state, because we haven't yielded the entire transaction.

# https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
cur.execute(
f"""
SELECT a.attname
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably good enough. but sql_table returns primary key (also compound one) in primary_key hint. is that not enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary_key hint on the sql_table resource gives an empty list, so perhaps something's going wrong there:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a primary key or some kind of unique column? we use SqlAlchemy reflection to get the table schema. what is the schema o this table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I get it now. sql_table creates a resource for the snapshot table, which doesn't have a primary key, because it is created with a CTAS statement:

  cur.execute(
      f"""
      START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
      SET TRANSACTION SNAPSHOT '{snapshot_name}';
      CREATE TABLE {snapshot_qual_name} AS SELECT {col_str} FROM {qual_name};
  """
  )

I point _get_pk at the original table (table_name instead of snapshot_table_name), which does have a primary key, hence the difference.

Stores table schema information from Relation message in object state.
"""
# store table schema information
columns = {c.name: _to_dlt_column_schema(c) for c in decoded_msg.columns}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a full table schema or just modified columns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decoded_msg.columns has all columns, modified or not.

data_item["lsn"] = lsn
if for_delete:
data_item["deleted_ts"] = commit_ts
return dlt.mark.with_hints(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is all correct but will be very slow. what happens currently is

  • you yield items one by one
  • each item has full hints with it.
    this will generate a lot of unnecessary processing in the pipeline.

what you should do instead (I think it is not very complicated):

  • buffer data items per table name to be able to yield them together
  • keep schema changes per table and yield them once
    you yield stuff when we reach batch size:
  • for each table and schema change
    yield dlt.mark.with_hints([], dlt.mark.make_hints(...) - you can yield empty list just to update hints
  • for each table and data items
    yield dlt.mark.with_table_name(data_items[table_name], table_name)
    where data_items[table_name] is a list of data items collected for a particular table

the speedup will be significant. and this code structure also makes easier to yield arrow tables if we want to be even faster
tell me WDYT

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better indeed! I started building this, but faced the "hint accumulation issue" that's blocking progress. This is on hold until that issue has been completed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has now been implemented in 5636e07.

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix I've addressed all your comments, please review.

To be done: increase dlt version in requirements.txt to include PR 1127.

rudolfix
rudolfix previously approved these changes Mar 25, 2024
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I have one question to the missing PK in SqlAlchemy. let's take a look. PK should be really there.

I expect that for large loads this will be slow. we are doing a lot of operations per row. also the pgout decoder is quite heavy.

# https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
cur.execute(
f"""
SELECT a.attname
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a primary key or some kind of unique column? we use SqlAlchemy reflection to get the table schema. what is the schema o this table?

elif isinstance(decoded_msg, Delete):
column_data = decoded_msg.old_tuple.column_data
table_name = self.last_table_schema[decoded_msg.relation_id]["name"]
data_item = self.gen_data_item(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good.

- `target_batch_size` is reached
- a table's schema has changed
"""
op = (msg.payload[:1]).decode("utf-8")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could compare binary strings without decoding to make it a little bit faster. also you could put the more frequent (I/U/D) operations to the top

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in 7bd211b.

@rudolfix
Copy link
Contributor

@jorritsandbrink I finally enabled tests on our CI, and most of them are passing but:

  • this one consistently fails
with src_pl.sql_client() as c:
            qual_name = src_pl.sql_client().make_qualified_table_name("items")
            c.execute_sql(f"UPDATE {qual_name} SET foo = 'baz' WHERE id = 2;")
            c.execute_sql(f"DELETE FROM {qual_name} WHERE id = 2;")
        extract_info = dest_pl.extract(changes)
>       assert extract_info.asdict()["job_metrics"] == []
E       AssertionError: assert [{'created': ...e': 501, ...}] == []
E         Left contains one more item: {'created': 1713123289.1342065, 'extract_idx': 1, 'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/d...lize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl', 'file_size': 501, ...}
E         Full diff:
E           [
E         -  ,
E         +  {'created': 1713123289.1342065,
E         +   'extract_idx': 1,
E         +   'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/dest_pl/normalize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl',...
E
E         ...Full output truncated (6 lines hidden), use '-vv' to show
  • those require postgres 15 while we are on 13 on CI. maybe you could take a look? is there a way to use the old syntax?
try:
>           cur.execute(
                f"ALTER PUBLICATION {esc_pub_name} ADD TABLES IN SCHEMA {esc_schema_name};"
            )
E           psycopg2.errors.SyntaxError: syntax error at or near "TABLES"
E           LINE 1: ALTER PUBLICATION "test_pub6589482f" ADD TABLES IN SCHEMA "s...

@jorritsandbrink
Copy link
Collaborator Author

@jorritsandbrink I finally enabled tests on our CI, and most of them are passing but:

  • this one consistently fails
with src_pl.sql_client() as c:
            qual_name = src_pl.sql_client().make_qualified_table_name("items")
            c.execute_sql(f"UPDATE {qual_name} SET foo = 'baz' WHERE id = 2;")
            c.execute_sql(f"DELETE FROM {qual_name} WHERE id = 2;")
        extract_info = dest_pl.extract(changes)
>       assert extract_info.asdict()["job_metrics"] == []
E       AssertionError: assert [{'created': ...e': 501, ...}] == []
E         Left contains one more item: {'created': 1713123289.1342065, 'extract_idx': 1, 'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/d...lize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl', 'file_size': 501, ...}
E         Full diff:
E           [
E         -  ,
E         +  {'created': 1713123289.1342065,
E         +   'extract_idx': 1,
E         +   'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/dest_pl/normalize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl',...
E
E         ...Full output truncated (6 lines hidden), use '-vv' to show
  • those require postgres 15 while we are on 13 on CI. maybe you could take a look? is there a way to use the old syntax?
try:
>           cur.execute(
                f"ALTER PUBLICATION {esc_pub_name} ADD TABLES IN SCHEMA {esc_schema_name};"
            )
E           psycopg2.errors.SyntaxError: syntax error at or near "TABLES"
E           LINE 1: ALTER PUBLICATION "test_pub6589482f" ADD TABLES IN SCHEMA "s...

@rudolfix The first issue is actually also version related. I was testing on Postgres 16 locally, but have been able to reproduce both issues on Postgres 13.

  1. It seems Postgres 13 publishes "empty transactions" for updates/deletes when they are excluded from the publish publication parameter (e.g. when publish = 'insert'). Postgres 16 does not do this. As a result we do find messages to process (the "empty transactions") when we've done an update or delete, even though we told Postgres we're not interested in them. last_commit_lsn in resource state needs to be updated accordingly. An item gets extracted for _dlt_pipeline_state because of the state update, where our test asserted nothing is extracted. Solved by making the test more specific and asserting nothing gets extracted for the items table: 34610b6

  2. Not really feasible to make this work for older Postgres versions. I could fetch all tables from the schema and add them one by one, but that wouldn't accomodate the case where a table gets added to the schema later. To keep things clean, I introduced a Postgres version requirement for schema replication instead: 7a07045

@rudolfix
Copy link
Contributor

@jorritsandbrink should we spawn another postgres instance just to test replication? I do not want to make it too complicated and I'm totally fine with 15. version.

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix yes, using a separate instance for replication sounds good.

Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rudolfix rudolfix merged commit df0cb7e into master May 2, 2024
14 checks passed
@rudolfix rudolfix deleted the 933-postgres-replication branch May 2, 2024 15:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants