Skip to content

Commit

Permalink
process PG->YDB replication PK Updates correctly
Browse files Browse the repository at this point in the history
process pk updates in ydb sink correctly
commit_hash:33e02c1e00b22df1f4f0a39b150a2ad2712d3c8c
  • Loading branch information
KosovGrigorii committed Dec 17, 2024
1 parent b2b38c6 commit 6dcb467
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 21 deletions.
4 changes: 4 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,10 @@
"tests/e2e/pg2s3/snapshot/dump/type_check.sql":"transfer_manager/go/tests/e2e/pg2s3/snapshot/dump/type_check.sql",
"tests/e2e/pg2s3/snapshot_with_layout/check_db_test.go":"transfer_manager/go/tests/e2e/pg2s3/snapshot_with_layout/check_db_test.go",
"tests/e2e/pg2s3/snapshot_with_layout/dump/type_check.sql":"transfer_manager/go/tests/e2e/pg2s3/snapshot_with_layout/dump/type_check.sql",
"tests/e2e/pg2ydb/replication_toasted/check_db_test.go":"transfer_manager/go/tests/e2e/pg2ydb/replication_toasted/check_db_test.go",
"tests/e2e/pg2ydb/replication_toasted/source/dump.sql":"transfer_manager/go/tests/e2e/pg2ydb/replication_toasted/source/dump.sql",
"tests/e2e/pg2ydb/snapshot_replication_pk_update/check_db_test.go":"transfer_manager/go/tests/e2e/pg2ydb/snapshot_replication_pk_update/check_db_test.go",
"tests/e2e/pg2ydb/snapshot_replication_pk_update/source/dump.sql":"transfer_manager/go/tests/e2e/pg2ydb/snapshot_replication_pk_update/source/dump.sql",
"tests/e2e/pg2yt/alters/check_db_test.go":"transfer_manager/go/tests/e2e/pg2yt/alters/check_db_test.go",
"tests/e2e/pg2yt/alters/dump/type_check.sql":"transfer_manager/go/tests/e2e/pg2yt/alters/dump/type_check.sql",
"tests/e2e/pg2yt/bulk_jsonb_pkey/bulk_json_generator.go":"transfer_manager/go/tests/e2e/pg2yt/bulk_jsonb_pkey/bulk_json_generator.go",
Expand Down
1 change: 0 additions & 1 deletion pkg/abstract/change_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var RawMessageWriteTime = changeitem.RawMessageWriteTime
var NewTableSchema = changeitem.NewTableSchema
var Dump = changeitem.Dump
var Collapse = changeitem.Collapse
var CollapseNoKeysChanged = changeitem.CollapseNoKeysChanged
var SplitByID = changeitem.SplitByID
var SplitUpdatedPKeys = changeitem.SplitUpdatedPKeys
var NewPartition = changeitem.NewPartition
Expand Down
15 changes: 0 additions & 15 deletions pkg/abstract/changeitem/change_item_collapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package changeitem

import (
"sort"

"github.com/doublecloud/transfer/library/go/core/xerrors"
)

func compareColumns(old, new []string) (bool, map[string]int, map[string]int, []string) {
Expand Down Expand Up @@ -35,19 +33,6 @@ func compareColumns(old, new []string) (bool, map[string]int, map[string]int, []
return false, oldM, newM, total
}

// Demo of what was described in https://st.yandex-team.ru/TM-8239
// The most fragile part of Collape is processing PK changing events.
// This temporary solution(for ydb exclusively) does not allow such changes
// TODO: Remove and replace with Collapse when the idea from previously mentioned ticket is realised
func CollapseNoKeysChanged(input []ChangeItem) ([]ChangeItem, error) {
for _, ci := range input {
if ci.KeysChanged() {
return nil, xerrors.Errorf("Attempt to update primary keys is detected")
}
}
return Collapse(input), nil
}

// Collapse collapses (possible) multiple items in the input into a single (or none) items in the output.
// Currently, it preserves the order of items in the result.
// It should only be applied by sinks which support PRIMARY KEYs. For them the order of items is considered to be of no importance.
Expand Down
19 changes: 14 additions & 5 deletions pkg/providers/ydb/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,11 @@ func (s *sinker) Push(input []abstract.ChangeItem) error {
errs = append(errs, err)
}
}

batch, err := abstract.CollapseNoKeysChanged(batch)
if err != nil {
return xerrors.Errorf("was unable to collapse received changeitems: %w", errs)
}
// The most fragile part of Collape is processing PK changing events.
// Here we transform these changes into Delete + Insert pair and only then send batch to Collapse
// As a result potentially dangerous part of Collapse is avoided + PK updates are processed correctly(it is imposible to update pk in YDB explicitly)
// Ticket about rewriting Collapse https://st.yandex-team.ru/TM-8239
batch = abstract.Collapse(s.processPKUpdate(batch))
for i := 0; i < len(batch); i += batchSize {
end := i + batchSize
if end > len(batch) {
Expand All @@ -648,6 +648,15 @@ func (s *sinker) Push(input []abstract.ChangeItem) error {
return nil
}

func (s *sinker) processPKUpdate(batch []abstract.ChangeItem) []abstract.ChangeItem {
parts := abstract.SplitUpdatedPKeys(batch)
result := make([]abstract.ChangeItem, 0)
for _, part := range parts {
result = append(result, part...)
}
return result
}

func (s *sinker) pushBatch(tablePath ydbPath, batch []abstract.ChangeItem) error {
retries := uint64(5)
regular := make([]abstract.ChangeItem, 0)
Expand Down
88 changes: 88 additions & 0 deletions tests/e2e/pg2ydb/replication_toasted/check_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"context"
"os"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
pgcommon "github.com/doublecloud/transfer/pkg/providers/postgres"
"github.com/doublecloud/transfer/pkg/providers/postgres/pgrecipe"
"github.com/doublecloud/transfer/pkg/providers/ydb"
"github.com/doublecloud/transfer/tests/helpers"
"github.com/stretchr/testify/require"
)

var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
tableName = "test"
)

func TestSnapshotAndIncrement(t *testing.T) {
Source := pgrecipe.RecipeSource(pgrecipe.WithPrefix(""))
Target := &ydb.YdbDestination{
Token: model.SecretString(os.Getenv("YDB_TOKEN")),
Database: helpers.GetEnvOfFail(t, "YDB_DATABASE"),
Instance: helpers.GetEnvOfFail(t, "YDB_ENDPOINT"),
}

_ = os.Setenv("YC", "1") // to not go to vanga
helpers.InitSrcDst(helpers.TransferID, Source, Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable

defer func() {
sourcePort, err := helpers.GetPortFromStr(Target.Instance)
require.NoError(t, err)
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: Source.Port},
helpers.LabeledPort{Label: "YDB target", Port: sourcePort},
))
}()

connConfig, err := pgcommon.MakeConnConfigFromSrc(logger.Log, Source)
require.NoError(t, err)
conn, err := pgcommon.NewPgConnPool(connConfig, logger.Log)
require.NoError(t, err)

transfer := helpers.MakeTransfer(helpers.TransferID, Source, Target, TransferType)
worker := helpers.Activate(t, transfer)
defer worker.Close(t)

time.Sleep(5 * time.Second) // for the worker to start

_, err = conn.Exec(context.Background(), "INSERT INTO test (i1, t1, i2, t2, vc1) VALUES (3, '3', 3, 'c', '3'), (4, '4', 4, 'd', '4')")
require.NoError(t, err)
_, err = conn.Exec(context.Background(), "UPDATE test SET t2 = 'test_update' WHERE i1 = 1")
require.NoError(t, err)
_, err = conn.Exec(context.Background(), "DELETE FROM test WHERE i1 != 1")
require.NoError(t, err)

require.NoError(t, helpers.WaitEqualRowsCount(t, databaseName, tableName, helpers.GetSampleableStorageByModel(t, Source), helpers.GetSampleableStorageByModel(t, Target), 60*time.Second))
require.NoError(t, helpers.WaitDestinationEqualRowsCount(databaseName, tableName, helpers.GetSampleableStorageByModel(t, Target), 60*time.Second, 1))

var large string
var small string
err = backoff.Retry(func() error {
return conn.QueryRow(context.Background(), "SELECT t2, vc1 FROM public.test WHERE i1 = 1").Scan(&small, &large)
}, backoff.NewConstantBackOff(time.Second))
require.NoError(t, err)

dump := helpers.YDBPullDataFromTable(t,
os.Getenv("YDB_TOKEN"),
helpers.GetEnvOfFail(t, "YDB_DATABASE"),
helpers.GetEnvOfFail(t, "YDB_ENDPOINT"),
"public_test")
require.Equal(t, 1, len(dump))

idx := dump[0].ColumnNameIndex("vc1")
require.True(t, idx != -1)
require.Equal(t, large, dump[0].ColumnValues[idx])

idx = dump[0].ColumnNameIndex("t2")
require.True(t, idx != -1)
require.Equal(t, small, dump[0].ColumnValues[idx])
}
17 changes: 17 additions & 0 deletions tests/e2e/pg2ydb/replication_toasted/source/dump.sql

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions tests/e2e/pg2ydb/snapshot_replication_pk_update/check_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/abstract/model"
pgcommon "github.com/doublecloud/transfer/pkg/providers/postgres"
"github.com/doublecloud/transfer/pkg/providers/postgres/pgrecipe"
"github.com/doublecloud/transfer/pkg/providers/ydb"
"github.com/doublecloud/transfer/tests/helpers"
"github.com/stretchr/testify/require"
)

var (
databaseName = "public"
TransferType = abstract.TransferTypeSnapshotAndIncrement
tableName = "people"
primaryKey = "ID"
)

func TestSnapshotAndIncrement(t *testing.T) {
Source := pgrecipe.RecipeSource(pgrecipe.WithPrefix(""))
Target := &ydb.YdbDestination{
Token: model.SecretString(os.Getenv("YDB_TOKEN")),
Database: helpers.GetEnvOfFail(t, "YDB_DATABASE"),
Instance: helpers.GetEnvOfFail(t, "YDB_ENDPOINT"),
}

_ = os.Setenv("YC", "1") // to not go to vanga
helpers.InitSrcDst(helpers.TransferID, Source, Target, TransferType) // to WithDefaults() & FillDependentFields(): IsHomo, helpers.TransferID, IsUpdateable

defer func() {
sourcePort, err := helpers.GetPortFromStr(Target.Instance)
require.NoError(t, err)
require.NoError(t, helpers.CheckConnections(
helpers.LabeledPort{Label: "PG source", Port: Source.Port},
helpers.LabeledPort{Label: "YDB target", Port: sourcePort},
))
}()

transfer := helpers.MakeTransfer(
tableName,
Source,
Target,
TransferType,
)
worker := helpers.Activate(t, transfer)
defer worker.Close(t)

conn, err := pgcommon.MakeConnPoolFromSrc(Source, logger.Log)
require.NoError(t, err)
_, err = conn.Exec(context.Background(), fmt.Sprintf(`insert into %s values(6, 'You')`, tableName))
require.NoError(t, err)
_, err = conn.Exec(context.Background(), fmt.Sprintf(`update %s set %s = 7 where %s = 6`, tableName, primaryKey, primaryKey))
require.NoError(t, err)
require.NoError(t, helpers.WaitEqualRowsCount(t, databaseName, tableName, helpers.GetSampleableStorageByModel(t, Source), helpers.GetSampleableStorageByModel(t, Target), 60*time.Second))
require.NoError(t, helpers.WaitDestinationEqualRowsCount(databaseName, tableName, helpers.GetSampleableStorageByModel(t, Target), 60*time.Second, 5))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE People (
ID int NOT NULL,
LastName varchar(255),
PRIMARY KEY (ID)
);

INSERT INTO People VALUES (1, 'Masha'), (2, 'Maxim'), (3, 'Misha'), (4, 'Marina');

0 comments on commit 6dcb467

Please sign in to comment.