Skip to content

Commit

Permalink
Add retention period YDB setting
Browse files Browse the repository at this point in the history
commit_hash:7c55bd4d1a192cd693f9dfcab0569541ac5f2560
  • Loading branch information
boooec committed Dec 17, 2024
1 parent 6dcb467 commit 8f59106
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 12 deletions.
1 change: 1 addition & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,7 @@
"pkg/util/backoff.go":"transfer_manager/go/pkg/util/backoff.go",
"pkg/util/bool.go":"transfer_manager/go/pkg/util/bool.go",
"pkg/util/castx/caste.go":"transfer_manager/go/pkg/util/castx/caste.go",
"pkg/util/castx/caste_test.go":"transfer_manager/go/pkg/util/castx/caste_test.go",
"pkg/util/channel.go":"transfer_manager/go/pkg/util/channel.go",
"pkg/util/channel_reader.go":"transfer_manager/go/pkg/util/channel_reader.go",
"pkg/util/cli/spinner.go":"transfer_manager/go/pkg/util/cli/spinner.go",
Expand Down
4 changes: 3 additions & 1 deletion pkg/providers/ydb/model_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"path/filepath"
"strings"
"time"

"github.com/doublecloud/transfer/library/go/core/metrics"
"github.com/doublecloud/transfer/pkg/abstract"
Expand Down Expand Up @@ -55,7 +56,8 @@ type YdbSource struct {

// replication stuff:
ChangeFeedMode ChangeFeedModeType
ChangeFeedCustomName string // user can specify pre-created feed's name, otherwise it will created with name == transferID
ChangeFeedRetentionPeriod *time.Duration // not suitable for pre-created (custom) changefeed
ChangeFeedCustomName string // user can specify pre-created feed's name, otherwise it will created with name == transferID
ChangeFeedCustomConsumerName string
BufferSize model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source
VerboseSDKLogs bool
Expand Down
23 changes: 17 additions & 6 deletions pkg/providers/ydb/source_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/util"
"github.com/doublecloud/transfer/pkg/util/castx"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
Expand Down Expand Up @@ -50,8 +51,18 @@ func dropChangeFeedIfExistsOneTable(ctx context.Context, ydbClient *ydb.Driver,
return true, nil
}

func createChangeFeedOneTable(ctx context.Context, ydbClient *ydb.Driver, tablePath, transferID, changeFeedMode string) error {
query := fmt.Sprintf(ydbV1+"ALTER TABLE `%s` ADD CHANGEFEED %s WITH (FORMAT = 'JSON', MODE = '%s')", tablePath, transferID, changeFeedMode)
func createChangeFeedOneTable(ctx context.Context, ydbClient *ydb.Driver, tablePath, transferID string, cfg *YdbSource) error {
queryParams := fmt.Sprintf("FORMAT = 'JSON', MODE = '%s'", string(cfg.ChangeFeedMode))

if period := cfg.ChangeFeedRetentionPeriod; period != nil {
asIso, err := castx.DurationToIso8601(*period)
if err != nil {
return xerrors.Errorf("unable to represent retention period as ISO 8601: %w", err)
}
queryParams += fmt.Sprintf(", RETENTION_PERIOD = Interval('%s')", asIso)
}

query := fmt.Sprintf(ydbV1+"ALTER TABLE `%s` ADD CHANGEFEED %s WITH (%s)", tablePath, transferID, queryParams)
err := execQuery(ctx, ydbClient, query)
if err != nil {
return xerrors.Errorf("unable to add changefeed, err: %w", err)
Expand Down Expand Up @@ -106,16 +117,16 @@ func CreateChangeFeed(cfg *YdbSource, transferID string) error {
return nil // User already created changefeed and specified its name.
}

clientCtx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
defer cancel()

ydbClient, err := newYDBSourceDriver(clientCtx, cfg)
ydbClient, err := newYDBSourceDriver(ctx, cfg)
if err != nil {
return xerrors.Errorf("unable to create ydb, err: %w", err)
}

for _, tablePath := range cfg.Tables {
err = createChangeFeedOneTable(clientCtx, ydbClient, tablePath, transferID, string(cfg.ChangeFeedMode))
err = createChangeFeedOneTable(ctx, ydbClient, tablePath, transferID, cfg)
if err != nil {
return xerrors.Errorf("unable to create changeFeed for table %s, err: %w", tablePath, err)
}
Expand Down Expand Up @@ -144,7 +155,7 @@ func CreateChangeFeedIfNotExists(cfg *YdbSource, transferID string) error {
if isOnline {
continue
}
err = createChangeFeedOneTable(clientCtx, ydbClient, tablePath, transferID, string(cfg.ChangeFeedMode))
err = createChangeFeedOneTable(clientCtx, ydbClient, tablePath, transferID, cfg)
if err != nil {
return xerrors.Errorf("unable to create changeFeed for table %s, err: %w", tablePath, err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/providers/ydb/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestSourceCDC(t *testing.T) {
})
}

func checkSchemaUpdateWithMode(t *testing.T, db *ydb.Driver, transferID, mode string, srcCfgTemplate YdbSource) {
tableName := "schema_up_to_date_new_image" + "_" + mode
func checkSchemaUpdateWithMode(t *testing.T, db *ydb.Driver, transferID string, mode ChangeFeedModeType, srcCfgTemplate YdbSource) {
tableName := "schema_up_to_date_new_image" + "_" + string(mode)
tablePath := formTablePath(tableName)
createTableAndFeedWithMode(t, db, transferID, tablePath, mode,
options.WithColumn("id", types.Optional(types.TypeUint64)),
Expand Down Expand Up @@ -352,17 +352,17 @@ func execQueries(t *testing.T, db *ydb.Driver, queries []string) {
}

func createTableAndFeed(t *testing.T, db *ydb.Driver, feedName, tablePath string, opts ...options.CreateTableOption) {
createTableAndFeedWithMode(t, db, feedName, tablePath, "NEW_IMAGE", opts...)
createTableAndFeedWithMode(t, db, feedName, tablePath, ChangeFeedModeNewImage, opts...)
}

func createTableAndFeedWithMode(t *testing.T, db *ydb.Driver, feedName, tablePath, mode string, opts ...options.CreateTableOption) {
func createTableAndFeedWithMode(t *testing.T, db *ydb.Driver, feedName, tablePath string, mode ChangeFeedModeType, opts ...options.CreateTableOption) {
opts = append(opts, options.WithPartitions(options.WithUniformPartitions(partitionsCount)))

require.NoError(t, db.Table().Do(context.Background(), func(ctx context.Context, s table.Session) error {
return s.CreateTable(ctx, tablePath, opts...)
}))

require.NoError(t, createChangeFeedOneTable(context.Background(), db, tablePath, feedName, mode))
require.NoError(t, createChangeFeedOneTable(context.Background(), db, tablePath, feedName, &YdbSource{ChangeFeedMode: mode}))
}

func formTablePath(tableName string) string {
Expand Down
31 changes: 31 additions & 0 deletions pkg/util/castx/caste.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"html/template"
"strconv"
"time"

"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/goccy/go-json"
Expand Down Expand Up @@ -103,3 +104,33 @@ func ToStringE(i interface{}) (string, error) {
return cast.ToStringE(i)
}
}

// DurationToIso8601 extracts Days, Hours, Minutes and Seconds from provided duration and casts to ISO 8601 standart.
// E.g. P750DT23H59M59S, more on https://en.wikipedia.org/wiki/ISO_8601#Durations.
func DurationToIso8601(duration time.Duration) (string, error) {
if duration.Nanoseconds() < 0 {
return "", xerrors.Errorf("unable to cast negative duration '%s' to ISO 8601", duration.String())
}
time := "T"
if h := int(duration.Hours()) % 24; h > 0 {
time += fmt.Sprint(h) + "H"
}
if m := int(duration.Minutes()) % 60; m > 0 {
time += fmt.Sprint(m) + "M"
}
if s := int(duration.Seconds()) % 60; s > 0 {
time += fmt.Sprint(s) + "S"
}

res := "P"
if d := int(duration.Hours()) / 24; d > 0 {
res += fmt.Sprint(d) + "D"
}
if time != "T" {
res += time
}
if res == "P" {
res = "P0D"
}
return res, nil
}
39 changes: 39 additions & 0 deletions pkg/util/castx/caste_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package castx

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestDurationToIso8601(t *testing.T) {
t.Run("normal", func(t *testing.T) {
testsData := [][2]string{
{"1s", "PT1S"},
{"1m", "PT1M"},
{"1h", "PT1H"},
{"24h", "P1D"},
{"1ms", "P0D"},
{"1000ms", "PT1S"},
{fmt.Sprint(115*24*time.Hour + 17*time.Hour + 46*time.Minute + 39*time.Second), "P115DT17H46M39S"},
}

for _, data := range testsData {
input, expected := data[0], data[1]
d, err := time.ParseDuration(input)
require.NoError(t, err)
actual, err := DurationToIso8601(d)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
})

t.Run("error", func(t *testing.T) {
d, err := time.ParseDuration("-1h")
require.NoError(t, err)
_, err = DurationToIso8601(d)
require.Error(t, err)
})
}

0 comments on commit 8f59106

Please sign in to comment.