diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index a54a1acd5196..585ae0f47fa4 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.4.0 + dockerImageTag: 3.4.1 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index 21c3d1ba78b9..52d911a81433 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -190,16 +190,23 @@ public static List> cdcCtidIteratorsCombin streamsUnderVacuum.addAll(streamsUnderVacuum(database, ctidStreams.streamsForCtidSync(), quoteString).result()); - final List finalListOfStreamsToBeSyncedViaCtid = + List finalListOfStreamsToBeSyncedViaCtid = streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync() : ctidStreams.streamsForCtidSync().stream() .filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c))) .toList(); - LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database, finalListOfStreamsToBeSyncedViaCtid, quoteString); final PostgresCtidHandler ctidHandler; + if (!fileNodeHandler.getFailedToQuery().isEmpty()) { + finalListOfStreamsToBeSyncedViaCtid = finalListOfStreamsToBeSyncedViaCtid.stream() + .filter(stream -> !fileNodeHandler.getFailedToQuery().contains( + new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()))) + .collect(Collectors.toList()); + } + LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); + try { ctidHandler = createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, quoteString, ctidStateManager, diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 68c3e43e7b58..b96981e97b0b 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -308,6 +308,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.1 | 2024-05-10 | [38130](https://github.com/airbytehq/airbyte/pull/38130) | Bug fix on old PG where ctid column not found when stream is a view. | | 3.4.0 | 2024-04-29 | [37112](https://github.com/airbytehq/airbyte/pull/37112) | resumeable full refresh. | | 3.3.33 | 2024-05-07 | [38030](https://github.com/airbytehq/airbyte/pull/38030) | Mark PG hot standby error as transient. | | 3.3.32 | 2024-04-30 | [37758](https://github.com/airbytehq/airbyte/pull/37758) | Correct previous release to disable debezium retries | @@ -471,13 +472,13 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | 0.4.43 | 2022-08-03 | [15226](https://github.com/airbytehq/airbyte/pull/15226) | Make connectionTimeoutMs configurable through JDBC url parameters | | 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time | | 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC | -| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead | +| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead | | 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps | | | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers | | 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | | 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | | 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | -| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) | +| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) | | 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors | | 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. | | 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |