From e649f02b0b140b9144d41996e050b837010f8f80 Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Fri, 27 May 2022 10:11:53 +0200 Subject: [PATCH] VStream API: allow cells to be specified for picking source tablets to stream from (#10294) * Add parameter Cells in vstream api flags, for tablet picker to look for candidates in Signed-off-by: Rohit Nayak * Add e2e test for the new Cells vstream flag Signed-off-by: Rohit Nayak * Self-review Signed-off-by: Rohit Nayak * Address review comments Fix gofmt Signed-off-by: Rohit Nayak Signed-off-by: Vilius Okockis --- go/test/endtoend/vreplication/helper.go | 42 ++++-- .../vreplication/vreplication_test.go | 89 +++++++++++- go/vt/discovery/healthcheck.go | 27 ++-- go/vt/discovery/legacy_healthcheck.go | 14 +- go/vt/discovery/legacy_replicationlag.go | 8 +- go/vt/discovery/replicationlag.go | 8 +- go/vt/discovery/tablet_picker.go | 2 + go/vt/proto/vtgate/vtgate.pb.go | 133 ++++++++++-------- go/vt/proto/vtgate/vtgate_vtproto.pb.go | 43 ++++++ go/vt/vtgate/discoverygateway.go | 4 +- go/vt/vtgate/executor.go | 2 +- go/vt/vtgate/safe_session.go | 2 +- go/vt/vtgate/tx_conn.go | 6 +- go/vt/vtgate/vcursor_impl.go | 4 +- go/vt/vtgate/vstream_manager.go | 24 +++- proto/vtgate.proto | 3 + 16 files changed, 305 insertions(+), 106 deletions(-) diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index bee3c214015..a22b49cbf02 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -5,11 +5,14 @@ import ( "fmt" "io/ioutil" "net/http" + "os/exec" "regexp" "strconv" "strings" "testing" + "vitess.io/vitess/go/vt/log" + "github.com/buger/jsonparser" "github.com/stretchr/testify/require" @@ -98,24 +101,29 @@ func validateThatQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *c return newCount == count+1 } -func getQueryCount(url string, query string) int { - var headings, row []string - var rows [][]string +func getHTTPBody(url string) string { resp, err := http.Get(url) if err != nil { - fmt.Printf("http Get returns %+v\n", err) - return 0 + log.Infof("http Get returns %+v", err) + return "" } if resp.StatusCode != 200 { - fmt.Printf("http Get returns status %d\n", resp.StatusCode) - return 0 + log.Infof("http Get returns status %d", resp.StatusCode) + return "" } respByte, _ := ioutil.ReadAll(resp.Body) defer resp.Body.Close() body := string(respByte) + return body +} + +func getQueryCount(url string, query string) int { + var headings, row []string + var rows [][]string + body := getHTTPBody(url) doc, err := goquery.NewDocumentFromReader(strings.NewReader(body)) if err != nil { - fmt.Printf("goquery parsing returns %+v\n", err) + log.Infof("goquery parsing returns %+v\n", err) return 0 } @@ -143,7 +151,7 @@ func getQueryCount(url string, query string) int { }) }) if queryIndex == -1 || countIndex == -1 { - fmt.Println("Queryz response is incorrect") + log.Infof("Queryz response is incorrect") return 0 } for _, row := range rows { @@ -267,3 +275,19 @@ func printRoutingRules(t *testing.T, vc *VitessCluster, msg string) error { fmt.Printf("Routing Rules::%s:\n%s\n", msg, output) return nil } + +func osExec(t *testing.T, command string, args []string) (string, error) { + cmd := exec.Command(command, args...) + output, err := cmd.CombinedOutput() + return string(output), err +} + +func getDebugVar(t *testing.T, port int, varPath []string) (string, error) { + var val []byte + var err error + url := fmt.Sprintf("http://localhost:%d/debug/vars", port) + body := getHTTPBody(url) + val, _, _, err = jsonparser.Get([]byte(body), varPath...) + require.NoError(t, err) + return string(val), nil +} diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 2b57020d88c..34f557fc0b5 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -17,15 +17,22 @@ limitations under the License. package vreplication import ( + "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "strings" + "sync" "testing" "time" "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -142,7 +149,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { func TestMultiCellVreplicationWorkflow(t *testing.T) { cells := []string{"zone1", "zone2"} - allCellNames = "zone1,zone2" + allCellNames = strings.Join(cells, ",") vc = NewVitessCluster(t, "TestMultiCellVreplicationWorkflow", cells, mainClusterConfig) require.NotNil(t, vc) @@ -165,6 +172,86 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) { verifyClusterHealth(t, vc) insertInitialData(t) shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true) + + // we tag along this test so as not to create the overhead of creating another cluster + testVStreamCellFlag(t) +} + +func testVStreamCellFlag(t *testing.T) { + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "product", + Shard: "0", + Gtid: "", + }}} + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "product", + Filter: "select * from product", + }}, + } + ctx := context.Background() + + type vstreamTestCase struct { + cells string + expectError bool + } + nonExistingCell := "zone7" + vstreamTestCases := []vstreamTestCase{ + {"zone1,zone2", false}, + {nonExistingCell, true}, + {"", false}, + } + + for _, tc := range vstreamTestCases { + t.Run("VStreamCellsFlag/"+tc.cells, func(t *testing.T) { + conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("localhost:%d", vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer conn.Close() + + flags := &vtgatepb.VStreamFlags{} + if tc.cells != "" { + flags.Cells = tc.cells + } + + ctx2, cancel := context.WithTimeout(ctx, 30*time.Second) + reader, err := conn.VStream(ctx2, topodatapb.TabletType_REPLICA, vgtid, filter, flags) + require.NoError(t, err) + + rowsReceived := false + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + events, err := reader.Recv() + switch err { + case nil: + if len(events) > 0 { + log.Infof("received %d events", len(events)) + rowsReceived = true + } + case io.EOF: + log.Infof("stream ended without data") + default: + log.Infof("%s:: remote error: %v", time.Now(), err) + } + }() + wg.Wait() + + if tc.expectError { + require.False(t, rowsReceived) + + // if no tablet was found the tablet picker adds a key which includes the cell name to the vtgate TabletPickerNoTabletFoundErrorCount stat + pickerErrorStat, err := getDebugVar(t, vc.ClusterConfig.vtgatePort, []string{"TabletPickerNoTabletFoundErrorCount"}) + require.NoError(t, err) + require.Contains(t, pickerErrorStat, nonExistingCell) + } else { + require.True(t, rowsReceived) + } + }) + } } func TestCellAliasVreplicationWorkflow(t *testing.T) { diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 9a34c45e620..31aace540d8 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -164,7 +164,7 @@ type TabletRecorder interface { type keyspaceShardTabletType string type tabletAliasString string -//HealthCheck declares what the TabletGateway needs from the HealthCheck +// HealthCheck declares what the TabletGateway needs from the HealthCheck type HealthCheck interface { // CacheStatus returns a displayable version of the health check cache. CacheStatus() TabletsCacheStatusList @@ -245,18 +245,27 @@ type HealthCheckImpl struct { // NewHealthCheck creates a new HealthCheck object. // Parameters: // retryDelay. -// The duration to wait before retrying to connect (e.g. after a failed connection -// attempt). +// +// The duration to wait before retrying to connect (e.g. after a failed connection +// attempt). +// // healthCheckTimeout. -// The duration for which we consider a health check response to be 'fresh'. If we don't get -// a health check response from a tablet for more than this duration, we consider the tablet -// not healthy. +// +// The duration for which we consider a health check response to be 'fresh'. If we don't get +// a health check response from a tablet for more than this duration, we consider the tablet +// not healthy. +// // topoServer. -// The topology server that this healthcheck object can use to retrieve cell or tablet information +// +// The topology server that this healthcheck object can use to retrieve cell or tablet information +// // localCell. -// The localCell for this healthcheck +// +// The localCell for this healthcheck +// // callback. -// A function to call when there is a master change. Used to notify vtgate's buffer to stop buffering. +// +// A function to call when there is a master change. Used to notify vtgate's buffer to stop buffering. func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { log.Infof("loading tablets for cells: %v", cellsToWatch) diff --git a/go/vt/discovery/legacy_healthcheck.go b/go/vt/discovery/legacy_healthcheck.go index ae3754942c8..0fa00c3cf0b 100644 --- a/go/vt/discovery/legacy_healthcheck.go +++ b/go/vt/discovery/legacy_healthcheck.go @@ -378,13 +378,13 @@ func NewLegacyDefaultHealthCheck() LegacyHealthCheck { // NewLegacyHealthCheck creates a new LegacyHealthCheck object. // Parameters: -// retryDelay. -// The duration to wait before retrying to connect (e.g. after a failed connection -// attempt). -// healthCheckTimeout. -// The duration for which we consider a health check response to be 'fresh'. If we don't get -// a health check response from a tablet for more than this duration, we consider the tablet -// not healthy. +// - retryDelay +// The duration to wait before retrying to connect (e.g. after a failed connection +// attempt). +// - healthCheckTimeout +// The duration for which we consider a health check response to be 'fresh'. If we don't get +// a health check response from a tablet for more than this duration, we consider the tablet +// not healthy. func NewLegacyHealthCheck(retryDelay, healthCheckTimeout time.Duration) LegacyHealthCheck { hc := &LegacyHealthCheckImpl{ addrToHealth: make(map[string]*legacyTabletHealth), diff --git a/go/vt/discovery/legacy_replicationlag.go b/go/vt/discovery/legacy_replicationlag.go index 7b21c78ddc3..405737441a2 100644 --- a/go/vt/discovery/legacy_replicationlag.go +++ b/go/vt/discovery/legacy_replicationlag.go @@ -54,10 +54,10 @@ func LegacyIsReplicationLagVeryHigh(tabletStats *LegacyTabletStats) bool { // lags of (30m, 35m, 40m, 45m) return all. // // One thing to know about this code: vttablet also has a couple flags that impact the logic here: -// * unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy. -// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here. -// * degraded_threshold: this is only used by vttablet for display. It should match -// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it. +// - unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy. +// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here. +// - degraded_threshold: this is only used by vttablet for display. It should match +// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it. func FilterLegacyStatsByReplicationLag(tabletStatsList []*LegacyTabletStats) []*LegacyTabletStats { if !*legacyReplicationLagAlgorithm { return filterLegacyStatsByLag(tabletStatsList) diff --git a/go/vt/discovery/replicationlag.go b/go/vt/discovery/replicationlag.go index a78765e1d4c..efa6a1fbc4a 100644 --- a/go/vt/discovery/replicationlag.go +++ b/go/vt/discovery/replicationlag.go @@ -94,10 +94,10 @@ func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool { // lags of (30m, 35m, 40m, 45m) return all. // // One thing to know about this code: vttablet also has a couple flags that impact the logic here: -// * unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy. -// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here. -// * degraded_threshold: this is only used by vttablet for display. It should match -// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it. +// - unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy. +// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here. +// - degraded_threshold: this is only used by vttablet for display. It should match +// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it. func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHealth { if !*legacyReplicationLagAlgorithm { return filterStatsByLag(tabletHealthList) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 74010998523..8f06d60d172 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -196,6 +196,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue if err == nil { actualCells = append(actualCells, alias.Cells...) + } else { + log.Infof("Unable to resolve cell %s, ignoring", cell) } } else { // valid cell, add it to our list diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index 90ced00d24f..31babb3f4d5 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -1064,6 +1064,9 @@ type VStreamFlags struct { HeartbeatInterval uint32 `protobuf:"varint,2,opt,name=heartbeat_interval,json=heartbeatInterval,proto3" json:"heartbeat_interval,omitempty"` // stop streams on a reshard (journal event) StopOnReshard bool `protobuf:"varint,3,opt,name=stop_on_reshard,json=stopOnReshard,proto3" json:"stop_on_reshard,omitempty"` + // if specified, these cells (comma-separated) are used to pick source tablets from. + // defaults to the cell of the vtgate serving the VStream API. + Cells string `protobuf:"bytes,4,opt,name=cells,proto3" json:"cells,omitempty"` } func (x *VStreamFlags) Reset() { @@ -1119,6 +1122,13 @@ func (x *VStreamFlags) GetStopOnReshard() bool { return false } +func (x *VStreamFlags) GetCells() string { + if x != nil { + return x.Cells + } + return "" +} + // VStreamRequest is the payload for VStream. type VStreamRequest struct { state protoimpl.MessageState @@ -1770,7 +1780,7 @@ var file_vtgate_proto_rawDesc = []byte{ 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x74, 0x69, 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x8a, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa0, 0x01, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x5f, 0x73, 0x6b, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, @@ -1779,68 +1789,69 @@ var file_vtgate_proto_rawDesc = []byte{ 0x52, 0x11, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x26, 0x0a, 0x0f, 0x73, 0x74, 0x6f, 0x70, 0x5f, 0x6f, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x73, 0x74, - 0x6f, 0x70, 0x4f, 0x6e, 0x52, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x22, 0xf6, 0x01, 0x0a, 0x0e, - 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, - 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, - 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, - 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, - 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, - 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, - 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, - 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, - 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, - 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, - 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, - 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, - 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, - 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, - 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, - 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, - 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, - 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, - 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, - 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, - 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, - 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, - 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, - 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, - 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, - 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, + 0x6f, 0x70, 0x4f, 0x6e, 0x52, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, + 0x65, 0x6c, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x63, 0x65, 0x6c, 0x6c, + 0x73, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, + 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, + 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, + 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, + 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, + 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, + 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, + 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, + 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, + 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, + 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, + 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, + 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, + 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, + 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, + 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, + 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x89, + 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, + 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, - 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, - 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, - 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, - 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, - 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, - 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, - 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, - 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, - 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, - 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, + 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, + 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, + 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, + 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, + 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, + 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, + 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, + 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, + 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, + 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, + 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, + 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, + 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index b8fd70d9975..03da20424fb 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -1017,6 +1017,13 @@ func (m *VStreamFlags) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.Cells) > 0 { + i -= len(m.Cells) + copy(dAtA[i:], m.Cells) + i = encodeVarint(dAtA, i, uint64(len(m.Cells))) + i-- + dAtA[i] = 0x22 + } if m.StopOnReshard { i-- if m.StopOnReshard { @@ -1816,6 +1823,10 @@ func (m *VStreamFlags) SizeVT() (n int) { if m.StopOnReshard { n += 2 } + l = len(m.Cells) + if l > 0 { + n += 1 + l + sov(uint64(l)) + } if m.unknownFields != nil { n += len(m.unknownFields) } @@ -4509,6 +4520,38 @@ func (m *VStreamFlags) UnmarshalVT(dAtA []byte) error { } } m.StopOnReshard = bool(v != 0) + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Cells", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Cells = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index bde96830154..8dd4194c4c5 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -45,7 +45,7 @@ const ( GatewayImplementationDiscovery = "discoverygateway" ) -//UsingLegacyGateway returns true when legacy +// UsingLegacyGateway returns true when legacy func UsingLegacyGateway() bool { return *GatewayImplementation == GatewayImplementationDiscovery } @@ -78,7 +78,7 @@ type DiscoveryGateway struct { buffer *buffer.Buffer } -//TabletsCacheStatus is not implemented for this struct +// TabletsCacheStatus is not implemented for this struct func (dg *DiscoveryGateway) TabletsCacheStatus() discovery.TabletsCacheStatusList { return nil } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index f458223eb86..e03c3fa723e 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -398,7 +398,7 @@ func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, l return &sqltypes.Result{}, err } -//Commit commits the existing transactions +// Commit commits the existing transactions func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error { return e.txConn.Commit(ctx, safeSession) } diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index a9520abc114..cf3a5ce96ce 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -322,7 +322,7 @@ func (session *SafeSession) SetTargetString(target string) { session.TargetString = target } -//SetSystemVariable sets the system variable in th session. +// SetSystemVariable sets the system variable in th session. func (session *SafeSession) SetSystemVariable(name string, expr string) { session.mu.Lock() defer session.mu.Unlock() diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index b056b8db508..2927b17f367 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -222,7 +222,7 @@ func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error { return err } -//Release releases the reserved connection and/or rollbacks the transaction +// Release releases the reserved connection and/or rollbacks the transaction func (txc *TxConn) Release(ctx context.Context, session *SafeSession) error { if !session.InTransaction() && !session.InReservedConn() { return nil @@ -250,7 +250,7 @@ func (txc *TxConn) Release(ctx context.Context, session *SafeSession) error { }) } -//ReleaseLock releases the reserved connection used for locking. +// ReleaseLock releases the reserved connection used for locking. func (txc *TxConn) ReleaseLock(ctx context.Context, session *SafeSession) error { if !session.InLockSession() { return nil @@ -274,7 +274,7 @@ func (txc *TxConn) ReleaseLock(ctx context.Context, session *SafeSession) error } -//ReleaseAll releases all the shard sessions and lock session. +// ReleaseAll releases all the shard sessions and lock session. func (txc *TxConn) ReleaseAll(ctx context.Context, session *SafeSession) error { if !session.InTransaction() && !session.InReservedConn() && !session.InLockSession() { return nil diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 16207f002ca..788c240a0fc 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -72,7 +72,7 @@ type iExecute interface { VSchema() *vindexes.VSchema } -//VSchemaOperator is an interface to Vschema Operations +// VSchemaOperator is an interface to Vschema Operations type VSchemaOperator interface { GetCurrentSrvVschema() *vschemapb.SrvVSchema UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error @@ -510,7 +510,7 @@ func (vc *vcursorImpl) SetSysVar(name string, expr string) { vc.safeSession.SetSystemVariable(name, expr) } -//NeedsReservedConn implements the SessionActions interface +// NeedsReservedConn implements the SessionActions interface func (vc *vcursorImpl) NeedsReservedConn() { vc.safeSession.SetReservedConn(true) } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 65131f89c40..a3e53c8d7c3 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "strings" "sync" "time" @@ -69,6 +70,7 @@ type vstream struct { tabletType topodatapb.TabletType filter *binlogdatapb.Filter resolver *srvtopo.Resolver + optCells string cancel context.CancelFunc wg sync.WaitGroup @@ -136,6 +138,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta vs := &vstream{ vgtid: vgtid, tabletType: tabletType, + optCells: flags.Cells, filter: filter, send: send, resolver: vsm.resolver, @@ -407,6 +410,21 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, } } +func (vs *vstream) getCells() []string { + var cells []string + if vs.optCells != "" { + for _, cell := range strings.Split(strings.TrimSpace(vs.optCells), ",") { + cells = append(cells, strings.TrimSpace(cell)) + } + } + + if len(cells) == 0 { + // use the vtgate's cell by default + cells = append(cells, vs.vsm.cell) + } + return cells +} + // streamFromTablet streams from one shard. If transactions come in separate chunks, they are grouped and sent. func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.ShardGtid) error { // journalDone is assigned a channel when a journal event is encountered. @@ -428,7 +446,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error - tp, err := discovery.NewTabletPicker(vs.ts, []string{vs.vsm.cell}, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + cells := vs.getCells() + tp, err := discovery.NewTabletPicker(vs.ts, cells, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) if err != nil { log.Errorf(err.Error()) return err @@ -438,7 +457,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha log.Errorf(err.Error()) return err } - log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","), + sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) target := &querypb.Target{ Keyspace: sgtid.Keyspace, Shard: sgtid.Shard, diff --git a/proto/vtgate.proto b/proto/vtgate.proto index 0248d333189..923a41ae6d5 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -277,6 +277,9 @@ message VStreamFlags { uint32 heartbeat_interval = 2; // stop streams on a reshard (journal event) bool stop_on_reshard = 3; + // if specified, these cells (comma-separated) are used to pick source tablets from. + // defaults to the cell of the vtgate serving the VStream API. + string cells = 4; } // VStreamRequest is the payload for VStream.