Skip to content

Commit

Permalink
Add Keyspace/Shard properties to VEvent
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
rohit-nayak-ps authored and DeathBorn committed Apr 8, 2024
1 parent 2cafb43 commit 73a871c
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 254 deletions.
255 changes: 19 additions & 236 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

90 changes: 90 additions & 0 deletions go/vt/proto/binlogdata/binlogdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ func TestVStreamSharded(t *testing.T) {
switch err {
case nil:
for _, event := range events {
// check for Keyspace/Shard values first and then reset them so that we don't have to add them to each expected event proto string
require.Equal(t, "ks", event.Keyspace, "event has an incorrect keyspace attribute: %s", event.Keyspace)
require.True(t, event.Shard == "-80" || event.Shard == "80-", "event has an incorrect shard attribute: %s", event.Shard)
event.Keyspace = ""
event.Shard = ""
switch event.Type {
case binlogdatapb.VEventType_ROW, binlogdatapb.VEventType_FIELD:
evs = append(evs, event)
Expand Down
12 changes: 8 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,10 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
// Update the VGtid and send that instead.
sgtid.Gtid = event.Gtid
events[j] = &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_VGTID,
Vgtid: proto.Clone(vs.vgtid).(*binlogdatapb.VGtid),
Type: binlogdatapb.VEventType_VGTID,
Vgtid: proto.Clone(vs.vgtid).(*binlogdatapb.VGtid),
Keyspace: event.Keyspace,
Shard: event.Shard,
}
} else if event.Type == binlogdatapb.VEventType_LASTPK {
var foundIndex = -1
Expand All @@ -631,8 +633,10 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
}
}
events[j] = &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_VGTID,
Vgtid: proto.Clone(vs.vgtid).(*binlogdatapb.VGtid),
Type: binlogdatapb.VEventType_VGTID,
Vgtid: proto.Clone(vs.vgtid).(*binlogdatapb.VGtid),
Keyspace: event.Keyspace,
Shard: event.Shard,
}
}
}
Expand Down
24 changes: 17 additions & 7 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import (
var mu sync.Mutex

func TestVStreamSkew(t *testing.T) {
stream := func(conn *sandboxconn.SandboxConn, shard string, count, idx int64) {
vevents := getVEvents(shard, count, idx)
stream := func(conn *sandboxconn.SandboxConn, keyspace, shard string, count, idx int64) {
vevents := getVEvents(keyspace, shard, count, idx)
for _, ev := range vevents {
conn.VStreamCh <- ev
time.Sleep(time.Duration(idx*100) * time.Millisecond)
Expand Down Expand Up @@ -94,15 +94,15 @@ func TestVStreamSkew(t *testing.T) {
sbc0.VStreamCh = make(chan *binlogdatapb.VEvent)
want += 2 * tcase.numEventsPerShard
vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "-20"})
go stream(sbc0, "-20", tcase.numEventsPerShard, tcase.shard0idx)
go stream(sbc0, ks, "-20", tcase.numEventsPerShard, tcase.shard0idx)
}
if tcase.shard1idx != 0 {
sbc1 = hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_MASTER, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())
sbc1.VStreamCh = make(chan *binlogdatapb.VEvent)
want += 2 * tcase.numEventsPerShard
vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "20-40"})
go stream(sbc1, "20-40", tcase.numEventsPerShard, tcase.shard1idx)
go stream(sbc1, ks, "20-40", tcase.numEventsPerShard, tcase.shard1idx)
}
ch := startVStream(ctx, t, vsm, vgtid, &vtgatepb.VStreamFlags{MinimizeSkew: true})
var receivedEvents []*binlogdatapb.VEvent
Expand Down Expand Up @@ -529,7 +529,10 @@ func TestVStreamJournalOneToMany(t *testing.T) {
}},
},
}
if !proto.Equal(got.Events[0], wantevent) {
gotEvent := got.Events[0]
gotEvent.Keyspace = ""
gotEvent.Shard = ""
if !proto.Equal(gotEvent, wantevent) {
t.Errorf("vgtid: %v, want %v", got.Events[0], wantevent)
}
}
Expand Down Expand Up @@ -643,7 +646,10 @@ func TestVStreamJournalManyToOne(t *testing.T) {
}},
},
}
if !proto.Equal(got.Events[0], wantevent) {
gotEvent := got.Events[0]
gotEvent.Keyspace = ""
gotEvent.Shard = ""
if !proto.Equal(gotEvent, wantevent) {
t.Errorf("vgtid: %v, want %v", got.Events[0], wantevent)
}
verifyEvents(t, ch, want1)
Expand Down Expand Up @@ -1096,7 +1102,7 @@ func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants .
}
}

func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent {
func getVEvents(keyspace, shard string, count, idx int64) []*binlogdatapb.VEvent {
mu.Lock()
defer mu.Unlock()
var vevents []*binlogdatapb.VEvent
Expand All @@ -1108,12 +1114,16 @@ func getVEvents(shard string, count, idx int64) []*binlogdatapb.VEvent {
Type: binlogdatapb.VEventType_GTID, Gtid: fmt.Sprintf("gtid-%s-%d", shard, j),
Timestamp: currentTime - j,
CurrentTime: currentTime * 1e9,
Keyspace: keyspace,
Shard: shard,
})

vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COMMIT,
Timestamp: currentTime - j,
CurrentTime: currentTime * 1e9,
Keyspace: keyspace,
Shard: shard,
})
}
return vevents
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (uvs *uvstreamer) sendEventsForRows(ctx context.Context, tableName string,
var evs []*binlogdatapb.VEvent
for _, row := range rows.Rows {
ev := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_ROW,
Type: binlogdatapb.VEventType_ROW,
Keyspace: uvs.vse.keyspace,
Shard: uvs.vse.shard,
RowEvent: &binlogdatapb.RowEvent{
TableName: tableName,
Keyspace: uvs.vse.keyspace,
Expand All @@ -154,11 +156,15 @@ func (uvs *uvstreamer) sendEventsForRows(ctx context.Context, tableName string,

ev := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_LASTPK,
Keyspace: uvs.vse.keyspace,
Shard: uvs.vse.shard,
LastPKEvent: lastPKEvent,
}
evs = append(evs, ev)
evs = append(evs, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COMMIT,
Type: binlogdatapb.VEventType_COMMIT,
Keyspace: uvs.vse.keyspace,
Shard: uvs.vse.shard,
})

if err := uvs.send(evs); err != nil {
Expand Down
22 changes: 18 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
}
send2 := func(evs []*binlogdatapb.VEvent) error {
vse.vstreamerEventsStreamed.Add(int64(len(evs)))
for _, ev := range evs {
ev.Keyspace = vse.keyspace
ev.Shard = vse.shard
}
return send(evs)
}
uvs := &uvstreamer{
Expand Down Expand Up @@ -464,17 +468,27 @@ func (uvs *uvstreamer) setPosition(gtid string, isInTx bool) error {
return nil
}
gtidEvent := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_GTID,
Gtid: gtid,
Type: binlogdatapb.VEventType_GTID,
Gtid: gtid,
Keyspace: uvs.vse.keyspace,
Shard: uvs.vse.shard,
}

var evs []*binlogdatapb.VEvent
if !isInTx {
evs = append(evs, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_BEGIN})
evs = append(evs, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_BEGIN,
Keyspace: uvs.vse.keyspace,
Shard: uvs.vse.shard,
})
}
evs = append(evs, gtidEvent)
if !isInTx {
evs = append(evs, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_COMMIT})
evs = append(evs, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COMMIT,
Keyspace: uvs.vse.keyspace,
Shard: uvs.vse.shard,
})
}
if err := uvs.send(evs); err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
// all existing rows are sent without the new row.
// If a single row exceeds the packet size, it will be in its own packet.
bufferAndTransmit := func(vevent *binlogdatapb.VEvent) error {
vevent.Keyspace = vs.vse.keyspace
vevent.Shard = vs.vse.shard
switch vevent.Type {
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD,
binlogdatapb.VEventType_JOURNAL:
Expand Down Expand Up @@ -417,7 +419,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Type: binlogdatapb.VEventType_BEGIN,
})
}
vs.pos = mysql.AppendGTID(vs.pos, gtid) //TODO: #sugu why Append?
vs.pos = mysql.AppendGTID(vs.pos, gtid)
case ev.IsXID():
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_GTID,
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
for i, want := range wantset {
// CurrentTime is not testable.
evs[i].CurrentTime = 0
evs[i].Keyspace = ""
evs[i].Shard = ""
switch want {
case "begin":
if evs[i].Type != binlogdatapb.VEventType_BEGIN {
Expand Down
5 changes: 5 additions & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ message VEvent {
int64 current_time = 20;
// LastPK is the last PK for a table
LastPKEvent last_p_k_event = 21;
// the source keyspace
string keyspace = 22;
// the source shard
string shard = 23;

}

message MinimalTable {
Expand Down

0 comments on commit 73a871c

Please sign in to comment.