-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postgres replication #392
Merged
Merged
Postgres replication #392
Changes from 4 commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
cb58fbc
WIP
d218a3a
WIP
945da6d
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
fa9a4c1
move config to correct position
4cdf823
extend SQLAlchemy type mapping
8808812
add initial support for postgres replication
1914acf
add credentials instruction
36739ec
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
cc6a11d
undo adding secret
f815361
add module docstring
a318fee
use from import to prevent AttributeError when running test_dlt_init.py
8aed399
enable multiple tables per publication
9fc3c39
add support for schema replication
8c2f905
add support for unmapped data types
a0af605
add test for init_replication
051830c
update docstrings
656989a
return resource instead of single-element list
d014645
add example pipeline
269422e
add more example pipelines
c674f24
add nullability hints
a919c82
add README
57b5e1e
add sql_database dependency instruction
5636e07
batch data items per table and yield hints only once
2713464
postpone replication column hints to preserve order
eec75f0
refactor to use resource decorator
aae3754
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
493147d
add support for table schema changes
7bd211b
optimize message type detection for performance
48442ba
upgrade dlt to 0.4.8
d303efd
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
524945f
enables to run tests in parallel
rudolfix ab005a1
Merge branch 'master' into 933-postgres-replication
rudolfix c596180
fixes format
rudolfix 34610b6
make test more specific to handle postgres version differences
7a07045
add postgres server version requirement for schema replication functi…
61712b4
removed whitespace
fd1d973
explicitly fetch credentials from pg_replication source
8bc4da3
add superuser check
796c980
Merge branch 'master' into 933-postgres-replication
rudolfix 77fb1dd
updates lock file
rudolfix 8a1d910
use psycopg2-binary instead of psycopg2
b0d2abb
use destination-specific escape identifier
f63ceff
replace string literal with int literal
22758fe
include pypgoutput decoders in library
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
## Prerequisites | ||
|
||
The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned: | ||
|
||
```sql | ||
CREATE ROLE replication_user WITH LOGIN REPLICATION; | ||
``` | ||
|
||
It also needs `CREATE` privilege on the database: | ||
|
||
```sql | ||
GRANT CREATE ON DATABASE dlt_data TO replication_user; | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from typing import Optional, Sequence | ||
|
||
import dlt | ||
|
||
from dlt.common.schema.typing import TColumnNames | ||
from dlt.sources import DltResource | ||
from dlt.sources.credentials import ConnectionStringCredentials | ||
|
||
from .helpers import table_replication_items, TableChangesResourceConfiguration | ||
|
||
|
||
@dlt.sources.config.with_config( | ||
sections=("sources", "sql_database"), | ||
spec=TableChangesResourceConfiguration, | ||
) | ||
def table_changes( | ||
credentials: ConnectionStringCredentials = dlt.secrets.value, | ||
table: str = dlt.config.value, | ||
primary_key: TColumnNames = None, | ||
include_columns: Optional[Sequence[str]] = dlt.config.value, | ||
slot_name: str = dlt.config.value, | ||
publication_name: str = dlt.config.value, | ||
upto_lsn: Optional[int] = None, | ||
) -> DltResource: | ||
"""Returns a dlt resource that yields data items for changes in a postgres table. | ||
|
||
Relies on a dedicated replication slot and publication that publishes DML | ||
operations (i.e. `insert`, `update`, and/or `delete`) for the table (helper | ||
method `init_table_replication` can be used to set this up). | ||
Uses `merge` write disposition to merge changes into destination table(s). | ||
|
||
Args: | ||
credentials (ConnectionStringCredentials): Postgres database credentials. | ||
table (str): Name of the table that is replicated | ||
primary_key (TColumnNames): Names of one or multiple columns serving as | ||
primary key on the table. Used to deduplicate data items in the `merge` | ||
operation. | ||
include_columns (Optional[Sequence[str]]): Optional sequence of names of | ||
columns to include in the generated data itemes. Any columns not in the | ||
sequence are excluded. If not provided, all columns are included. | ||
slot_name (str): Name of the replication slot to consume replication | ||
messages from. Each table is expected to have a dedicated slot. | ||
publication_name (str): Name of the publication that published DML operations | ||
for the table. Each table is expected to have a dedicated publication. | ||
upto_lsn Optional[int]: Optional integer LSN value upto which the replication | ||
slot is consumed. If not provided, all messages in the slot are consumed, | ||
ensuring all new changes in the source table are included. | ||
|
||
Returns: | ||
DltResource that yields data items for changes in the postgres table. | ||
""" | ||
return dlt.resource( | ||
table_replication_items, | ||
name=table, | ||
write_disposition="merge", | ||
primary_key=primary_key, | ||
rudolfix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
columns={"lsn": {"dedup_sort": "desc"}}, | ||
)( | ||
credentials=credentials, | ||
slot_name=slot_name, | ||
publication_name=publication_name, | ||
include_columns=include_columns, | ||
upto_lsn=upto_lsn, | ||
) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for tables with INSERT only (ie. logs)
append
will also work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, should work, didn't think of that. I can fetch details about the publication using
pg_publication
catalog and useappend
if it only publishesinsert
. Will be more efficient thanmerge
.