From 46408f40c298f0e493454ee4676daa70644fc21e Mon Sep 17 00:00:00 2001 From: Kai Cao Date: Fri, 3 Jan 2025 13:52:06 +0800 Subject: [PATCH] fix handle empty startTs/endTs (#21060) fix handle empty startTs/endTs Approved by: @daviszhen --- pkg/cdc/types.go | 3 +++ pkg/frontend/cdc.go | 20 +++++++++------- pkg/frontend/cdc_test.go | 49 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/pkg/cdc/types.go b/pkg/cdc/types.go index d632c16533114..d19ee3c46ed9f 100644 --- a/pkg/cdc/types.go +++ b/pkg/cdc/types.go @@ -60,6 +60,9 @@ const ( MaxSqlLength = "MaxSqlLength" DefaultMaxSqlLength = 4 * 1024 * 1024 + + StartTs = "StartTS" + EndTs = "EndTS" ) var ( diff --git a/pkg/frontend/cdc.go b/pkg/frontend/cdc.go index 5c265d801349a..8fd0f22185379 100644 --- a/pkg/frontend/cdc.go +++ b/pkg/frontend/cdc.go @@ -318,17 +318,21 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err } var ts time.Time - startTs := cdcTaskOptionsMap["StartTS"] - if ts, err = parseTimestamp(startTs, ses.timeZone); err != nil { - return moerr.NewInternalErrorf(ctx, "invalid startTs: %s, supported timestamp format: `%s`, or `%s`", startTs, time.DateTime, time.RFC3339) + startTs := cdcTaskOptionsMap[cdc2.StartTs] + if startTs != "" { + if ts, err = parseTimestamp(startTs, ses.timeZone); err != nil { + return moerr.NewInternalErrorf(ctx, "invalid startTs: %s, supported timestamp format: `%s`, or `%s`", startTs, time.DateTime, time.RFC3339) + } + startTs = ts.Format(time.RFC3339) } - startTs = ts.Format(time.RFC3339) - endTs := cdcTaskOptionsMap["EndTS"] - if ts, err = parseTimestamp(endTs, ses.timeZone); err != nil { - return moerr.NewInternalErrorf(ctx, "invalid endTs: %s, supported timestamp format: `%s`, or `%s`", endTs, time.DateTime, time.RFC3339) + endTs := cdcTaskOptionsMap[cdc2.EndTs] + if endTs != "" { + if ts, err = parseTimestamp(endTs, ses.timeZone); err != nil { + return moerr.NewInternalErrorf(ctx, "invalid endTs: %s, supported timestamp format: `%s`, or `%s`", endTs, time.DateTime, time.RFC3339) + } + endTs = ts.Format(time.RFC3339) } - endTs = ts.Format(time.RFC3339) //step 4: check source uri format and strip password jsonSrcUri, _, err := extractUriInfo(ctx, create.SourceUri, cdc2.SourceUriPrefix) diff --git a/pkg/frontend/cdc_test.go b/pkg/frontend/cdc_test.go index e53ab38b07579..c1f54c858a065 100644 --- a/pkg/frontend/cdc_test.go +++ b/pkg/frontend/cdc_test.go @@ -404,6 +404,10 @@ func Test_handleCreateCdc(t *testing.T) { fmt.Sprintf("%d", cdc2.DefaultMaxSqlLength), cdc2.SendSqlTimeout, cdc2.DefaultSendSqlTimeout, + cdc2.StartTs, + "2025-01-03 15:20:00", + cdc2.EndTs, + "2025-01-03 16:20:00", }, } @@ -449,6 +453,51 @@ func Test_handleCreateCdc(t *testing.T) { } } +func Test_doCreateCdc_invalidStartTs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ses := newTestSession(t, ctrl) + defer ses.Close() + + pu := config.ParameterUnit{} + pu.TaskService = &testTaskService{} + setPu("", &pu) + + stubCheckPitr := gostub.Stub(&checkPitr, func(ctx context.Context, bh BackgroundExec, accName string, pts *cdc2.PatternTuples) error { + return nil + }) + defer stubCheckPitr.Reset() + + create := &tree.CreateCDC{ + IfNotExists: false, + TaskName: "task1", + SourceUri: "mysql://root:111@127.0.0.1:6001", + SinkType: cdc2.MysqlSink, + SinkUri: "mysql://root:111@127.0.0.1:3306", + Tables: "db1.t1:db1.t1,db1.t2", + Option: []string{ + "Level", + cdc2.TableLevel, + "Account", + sysAccountName, + "Exclude", + "db2.t3,db2.t4", + cdc2.InitSnapshotSplitTxn, + "false", + cdc2.MaxSqlLength, + fmt.Sprintf("%d", cdc2.DefaultMaxSqlLength), + cdc2.SendSqlTimeout, + cdc2.DefaultSendSqlTimeout, + cdc2.StartTs, + "123456", + }, + } + + err := doCreateCdc(context.Background(), ses, create) + assert.Error(t, err) +} + type testTaskData struct { metadata task.TaskMetadata details *task.Details