Skip to content

Commit

Permalink
fix handle empty startTs/endTs (#21060)
Browse files Browse the repository at this point in the history
fix handle empty startTs/endTs

Approved by: @daviszhen
  • Loading branch information
ck89119 authored Jan 3, 2025
1 parent 6e3b3ca commit 46408f4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
3 changes: 3 additions & 0 deletions pkg/cdc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const (

MaxSqlLength = "MaxSqlLength"
DefaultMaxSqlLength = 4 * 1024 * 1024

StartTs = "StartTS"
EndTs = "EndTS"
)

var (
Expand Down
20 changes: 12 additions & 8 deletions pkg/frontend/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,17 +318,21 @@ func doCreateCdc(ctx context.Context, ses *Session, create *tree.CreateCDC) (err
}

var ts time.Time
startTs := cdcTaskOptionsMap["StartTS"]
if ts, err = parseTimestamp(startTs, ses.timeZone); err != nil {
return moerr.NewInternalErrorf(ctx, "invalid startTs: %s, supported timestamp format: `%s`, or `%s`", startTs, time.DateTime, time.RFC3339)
startTs := cdcTaskOptionsMap[cdc2.StartTs]
if startTs != "" {
if ts, err = parseTimestamp(startTs, ses.timeZone); err != nil {
return moerr.NewInternalErrorf(ctx, "invalid startTs: %s, supported timestamp format: `%s`, or `%s`", startTs, time.DateTime, time.RFC3339)
}
startTs = ts.Format(time.RFC3339)
}
startTs = ts.Format(time.RFC3339)

endTs := cdcTaskOptionsMap["EndTS"]
if ts, err = parseTimestamp(endTs, ses.timeZone); err != nil {
return moerr.NewInternalErrorf(ctx, "invalid endTs: %s, supported timestamp format: `%s`, or `%s`", endTs, time.DateTime, time.RFC3339)
endTs := cdcTaskOptionsMap[cdc2.EndTs]
if endTs != "" {
if ts, err = parseTimestamp(endTs, ses.timeZone); err != nil {
return moerr.NewInternalErrorf(ctx, "invalid endTs: %s, supported timestamp format: `%s`, or `%s`", endTs, time.DateTime, time.RFC3339)
}
endTs = ts.Format(time.RFC3339)
}
endTs = ts.Format(time.RFC3339)

//step 4: check source uri format and strip password
jsonSrcUri, _, err := extractUriInfo(ctx, create.SourceUri, cdc2.SourceUriPrefix)
Expand Down
49 changes: 49 additions & 0 deletions pkg/frontend/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ func Test_handleCreateCdc(t *testing.T) {
fmt.Sprintf("%d", cdc2.DefaultMaxSqlLength),
cdc2.SendSqlTimeout,
cdc2.DefaultSendSqlTimeout,
cdc2.StartTs,
"2025-01-03 15:20:00",
cdc2.EndTs,
"2025-01-03 16:20:00",
},
}

Expand Down Expand Up @@ -449,6 +453,51 @@ func Test_handleCreateCdc(t *testing.T) {
}
}

func Test_doCreateCdc_invalidStartTs(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ses := newTestSession(t, ctrl)
defer ses.Close()

pu := config.ParameterUnit{}
pu.TaskService = &testTaskService{}
setPu("", &pu)

stubCheckPitr := gostub.Stub(&checkPitr, func(ctx context.Context, bh BackgroundExec, accName string, pts *cdc2.PatternTuples) error {
return nil
})
defer stubCheckPitr.Reset()

create := &tree.CreateCDC{
IfNotExists: false,
TaskName: "task1",
SourceUri: "mysql://root:[email protected]:6001",
SinkType: cdc2.MysqlSink,
SinkUri: "mysql://root:[email protected]:3306",
Tables: "db1.t1:db1.t1,db1.t2",
Option: []string{
"Level",
cdc2.TableLevel,
"Account",
sysAccountName,
"Exclude",
"db2.t3,db2.t4",
cdc2.InitSnapshotSplitTxn,
"false",
cdc2.MaxSqlLength,
fmt.Sprintf("%d", cdc2.DefaultMaxSqlLength),
cdc2.SendSqlTimeout,
cdc2.DefaultSendSqlTimeout,
cdc2.StartTs,
"123456",
},
}

err := doCreateCdc(context.Background(), ses, create)
assert.Error(t, err)
}

type testTaskData struct {
metadata task.TaskMetadata
details *task.Details
Expand Down

0 comments on commit 46408f4

Please sign in to comment.