Skip to content

Commit

Permalink
fix snowflake bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Nov 28, 2024
1 parent 558a200 commit d9c92d2
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 72 deletions.
77 changes: 50 additions & 27 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,53 +336,76 @@ func (ins *Insert) processGenerate(vcursor VCursor, bindVars map[string]*querypb

// If generation is needed, generate the requested number of values (as one call).
if count != 0 {
rss, _, err := vcursor.ResolveDestinations(ins.Generate.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return 0, err
}
// TODO: place where to decide routing maybe for snowflake
if len(rss) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "auto sequence generation can happen through single shard only, it is getting routed to %d shards", len(rss))
}
bindVars := map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(count)}
qr, err := vcursor.ExecuteStandalone(ins.Generate.Query, bindVars, rss[0])
if err != nil {
return 0, err
}
// If no rows are returned, it's an internal error, and the code
// must panic, which will be caught and reported.
insertID, err = evalengine.ToInt64(qr.Rows[0][0])
if err != nil {
return 0, err
if ins.Generate.Type == vindexes.TypeSnowflake {
// We will pick any shard
rss, _, err := vcursor.ResolveDestinations(ins.Generate.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return 0, err
}
if len(rss) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "auto snowflake generation can happen with at least one shard this keyspace: %s", ins.Generate.Keyspace.Name)
}
bindVars := map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(count)}
qr, err := vcursor.ExecuteStandalone(ins.Generate.Query, bindVars, rss[0])
if err != nil {
return 0, err
}
// If no rows are returned, it's an internal error, and the code
// must panic, which will be caught and reported.
insertID, err = evalengine.ToInt64(qr.Rows[0][0])
if err != nil {
return 0, err
}
} else {
rss, _, err := vcursor.ResolveDestinations(ins.Generate.Keyspace.Name, nil, []key.Destination{key.DestinationAnyShard{}})
if err != nil {
return 0, err
}
if len(rss) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "auto sequence generation can happen through single shard only, it is getting routed to %d shards", len(rss))
}
bindVars := map[string]*querypb.BindVariable{"n": sqltypes.Int64BindVariable(count)}
qr, err := vcursor.ExecuteStandalone(ins.Generate.Query, bindVars, rss[0])
if err != nil {
return 0, err
}
// If no rows are returned, it's an internal error, and the code
// must panic, which will be caught and reported.
insertID, err = evalengine.ToInt64(qr.Rows[0][0])
if err != nil {
return 0, err
}
}
}

// Fill the holes where no value was supplied.
// For Snowflake
// Fill the holes where no value was supplied depending on the type of sequence: snowflake or sequence
cur := insertID
// for Snowflake
if ins.Generate.Type == vindexes.TypeSnowflake {
cur := insertID
var totalInc int64
ts := (cur >> int64(SequenceLength+MachineIDLength)) + SnowflakeStartTime.UTC().UnixNano()/1e6
sequence := cur & int64(MaxSequence)
machineID := (cur & (int64(MaxMachineID) << SequenceLength)) >> SequenceLength
for i, v := range resolved {
fmt.Println(fmt.Sprintf("Generating Snowflake, %s id %d", ins.GetTableName(), cur))
if shouldGenerate(v) {
bindVars[SeqVarName+strconv.Itoa(i)] = sqltypes.Int64BindVariable(cur)
// calculate next id and advance ts and sequence
totalInc := sequence + 1
ts := ts + totalInc/MaxSequence
totalInc = sequence + 1
ts = ts + totalInc/MaxSequence
sequence = totalInc % MaxSequence
// TODO: generate next id properly for snowflake
df := elapsedTime(ts, SnowflakeStartTime)
cur = int64((uint64(df) << uint64(timestampMoveLength)) | (uint64(machineID) << uint64(machineIDMoveLength)) | uint64(sequence))
cur = (df << timestampMoveLength) | (machineID << machineIDMoveLength) | sequence
fmt.Println(
((df << timestampMoveLength) | (machineID << machineIDMoveLength) | sequence),
)
} else {
bindVars[SeqVarName+strconv.Itoa(i)] = sqltypes.ValueBindVariable(v)
}
}
return insertID, nil
}
// For Sequence
cur := insertID
// for Sequence
for i, v := range resolved {
if shouldGenerate(v) {
bindVars[SeqVarName+strconv.Itoa(i)] = sqltypes.Int64BindVariable(cur)
Expand Down
110 changes: 110 additions & 0 deletions go/vt/vtgate/engine/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,116 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

func TestInsertShardedSnoflakeGenerate(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"sharded": {
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Columns: []string{"id"},
}},
},
},
},
},
}
vs := vindexes.BuildVSchema(invschema)
ks := vs.Keyspaces["sharded"]

ins := NewInsert(
InsertSharded,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id
Values: []sqltypes.PlanValue{{
// 5 rows.
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(2),
}, {
Value: sqltypes.NewInt64(3),
}, {
Value: sqltypes.NewInt64(4),
}, {
Value: sqltypes.NewInt64(5),
}},
}},
}},
ks.Tables["t1"],
"prefix",
[]string{" mid1", " mid2", " mid3", " mid4", " mid5"},
" suffix",
)

ins.Generate = &Generate{
Keyspace: &vindexes.Keyspace{
Name: "ks2",
Sharded: true,
},
Type: "snowflake",
Query: "dummy_snowflake_generate",
Values: sqltypes.PlanValue{
Values: []sqltypes.PlanValue{
{Value: sqltypes.NewInt64(1)},
{Value: sqltypes.NULL},
{Value: sqltypes.NewInt64(2)},
{Value: sqltypes.NULL},
{Value: sqltypes.NULL},
},
},
}

vc := newDMLTestVCursor("-20", "20-")
vc.shardForKsid = []string{"20-", "-20", "20-", "-20", "-20"}

// Snowflake ids
// | 2124054243676528637 | 1732771994712 | 2024-11-28 05:33:14.7120 | 4093 | 1 |
// | 2124054243676528638 | 1732771994712 | 2024-11-28 05:33:14.7120 | 4094 | 1 |
// | 2124054243680718848 | 1732771994713 | 2024-11-28 05:33:14.7130 | 0 | 1 |
vc.results = []*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"nextval",
"int64",
),
"2124054243676528637",
),
{InsertID: 1},
}

result, err := ins.Execute(vc, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ResolveDestinations ks2 [] Destinations:DestinationAnyShard()`,
`ExecuteStandalone dummy_snowflake_generate n: type:INT64 value:"3" ks2 -20`,
// Based on shardForKsid, values returned will be 20-, -20, 20-.
`ResolveDestinations sharded [value:"0" value:"1" value:"2" value:"3" value:"4"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c),DestinationKeyspaceID(d2fd8867d50d2dfe),DestinationKeyspaceID(70bb023c810ca87a)`,
// Row 2 will go to -20, rows 1 & 3 will go to 20-
`ExecuteMultiShard ` +
`sharded.20-: prefix mid1, mid3 suffix ` +
`{__seq0: type:INT64 value:"1" __seq1: type:INT64 value:"2124054243676528637" __seq2: type:INT64 value:"2" __seq3: type:INT64 value:"2124054243676528638" __seq4: type:INT64 value:"2124054243680718848" ` +
`_id_0: type:INT64 value:"1" _id_1: type:INT64 value:"2" _id_2: type:INT64 value:"3" _id_3: type:INT64 value:"4" _id_4: type:INT64 value:"5"} ` +
`sharded.-20: prefix mid2, mid4, mid5 suffix ` +
`{__seq0: type:INT64 value:"1" __seq1: type:INT64 value:"2124054243676528637" __seq2: type:INT64 value:"2" __seq3: type:INT64 value:"2124054243676528638" __seq4: type:INT64 value:"2124054243680718848" ` +
`_id_0: type:INT64 value:"1" _id_1: type:INT64 value:"2" _id_2: type:INT64 value:"3" _id_3: type:INT64 value:"4" _id_4: type:INT64 value:"5"} ` +
`true false`,
})

// The insert id returned by ExecuteMultiShard should be overwritten by processGenerate.
expectResult(t, "Execute", result, &sqltypes.Result{InsertID: 2124054243676528637})
}

func TestInsertUnshardedSnowflakeGenerate(t *testing.T) {
ins := NewQueryInsert(
InsertUnsharded,
Expand Down
4 changes: 1 addition & 3 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ func buildTables(ks *vschemapb.Keyspace, vschema *VSchema, ksvschema *KeyspaceSc
}
t.Type = table.Type
case TypeSnowflake:
if keyspace.Sharded && table.Pinned == "" {
return fmt.Errorf("snowflake table has to be in an unsharded keyspace or must be pinned: %s", tname)
}
// Snowflake should be ok to use with multiple shards
t.Type = table.Type
default:
return fmt.Errorf("unidentified table type %s", table.Type)
Expand Down
7 changes: 2 additions & 5 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
return nil, err
}
tableName := qre.plan.TableName()
// check if snowflake

if inc < 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid increment for sequence or snowflake %s: %d", tableName, inc)
Expand Down Expand Up @@ -599,7 +598,6 @@ func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
}
ret = t.SequenceInfo.NextVal
t.SequenceInfo.NextVal += inc

} else if qre.plan.Table.SnowflakeInfo != nil {
t := qre.plan.Table
t.SnowflakeInfo.Lock()
Expand Down Expand Up @@ -628,12 +626,11 @@ func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
}
}

// Generate new id here, return it and update last val with overflow
nextID, err := t.SnowflakeInfo.NextNID(inc, currentMillis())
// generate first Snowflake id from requested range
ret, err = t.SnowflakeInfo.NextNID(inc, currentMillis())
if err != nil {
return nil, vterrors.Wrapf(err, "error generating snowflake with NextNID(%d) %s", inc, tableName)
}
ret = int64(nextID)
}
return &sqltypes.Result{
Fields: sequenceFields,
Expand Down
8 changes: 1 addition & 7 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,7 @@ func (se *Engine) MakeNonMaster() {
t.SequenceInfo.LastVal = 0
t.SequenceInfo.Unlock()
}
if t.SnowflakeInfo != nil {
t.SnowflakeInfo.Lock()
// We don't care about this, since each tablet has its own machine ID.
// t.SnowflakeInfo.NextVal = 0
// t.SnowflakeInfo.LastVal = 0
t.SnowflakeInfo.Unlock()
}
// We don't care about Snowflake, since each tablet has its own machine ID.
}
}

Expand Down
2 changes: 0 additions & 2 deletions go/vt/vttablet/tabletserver/schema/load_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ func LoadTable(conn *connpool.DBConn, tableName string, comment string) (*Table,
if err := fetchColumns(ta, conn, sqlTableName); err != nil {
return nil, err
}
fmt.Println("fff comment", comment, "tableName", tableName)
switch {
case strings.Contains(comment, "vitess_sequence"):
ta.Type = Sequence
ta.SequenceInfo = &SequenceInfo{}
case strings.Contains(comment, "vitess_snowflake"):
ta.Type = Snowflake
ta.SnowflakeInfo = &SnowflakeInfo{}
fmt.Println("loaded snowflake table: ", tableName)
case strings.Contains(comment, "vitess_message"):
if err := loadMessageInfo(ta, comment); err != nil {
return nil, err
Expand Down
37 changes: 14 additions & 23 deletions go/vt/vttablet/tabletserver/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,12 @@ const (
)

var (
// default starttime
// default Snowflake start time
SnowflakeStartTime = time.Date(2008, 11, 10, 23, 0, 0, 0, time.UTC)
)

// SnowflakeInfo contains info specific to sequence tabels.
// It must be locked before accessing the values inside.
// If CurVal==LastVal, we have to cache new values.
// When the schema is first loaded, the values are all 0,
// which will trigger caching on first use.
type SnowflakeInfo struct {
Expand All @@ -113,37 +112,43 @@ func elapsedTime(noms int64, s time.Time) int64 {
}

func (s *SnowflakeInfo) NextNID(inc int64, currentTimestamp int64) (int64, error) {
fmt.Println("----------")
// need to pass timestamo in order to make it more testable
// currentTimestamp := currentMillis()
var firstSequence, firstTimestamp int64
if s.LastTimestamp < currentTimestamp {
// calculate timestamp and sequence for first id
firstTimestamp = currentTimestamp
firstSequence = 0
s.LastTimestamp = currentTimestamp
s.Sequence = 0
// // calculate timestamp and sequence for last id
// s.LastTimestamp = currentTimestamp
// s.Sequence = 0
// calculate timestamp and sequence for last id
lastInc := inc - 1
s.LastTimestamp = currentTimestamp + lastInc/MaxSequence // add overflow to timestamp as ms
s.Sequence = lastInc % MaxSequence // set last sequence
} else {
if s.LastTimestamp > currentTimestamp {
fmt.Println("current timestamp is less than last timestamp, so we are overflowing again")
currentTimestamp = s.LastTimestamp
} else {
fmt.Println("Same timestamp", currentTimestamp)
}
// calculate first id values
// calculate timestamp and sequence for first id
firstInc := s.Sequence + 1
firstTimestamp = currentTimestamp + firstInc/MaxSequence // add overflow to timestamp as ms
firstSequence = firstInc % MaxSequence // set first sequence
// calculate last id values
// calculate timestamp and sequence for last id
lastInc := s.Sequence + inc
s.LastTimestamp = currentTimestamp + lastInc/MaxSequence // add overflow to timestamp as ms
s.Sequence = lastInc % MaxSequence // set last sequence
}

fmt.Println("firstSequence", firstSequence, "firstTimestamp", firstTimestamp)
fmt.Println("lastSequence", s.Sequence, "lastTimestamp", s.LastTimestamp)

firstDF := elapsedTime(firstTimestamp, SnowflakeStartTime)
firstId := (uint64(firstDF) << uint64(timestampMoveLength)) | (uint64(s.MachineID) << uint64(machineIDMoveLength)) | uint64(firstSequence)
return int64(firstId), nil
firstId := (firstDF << timestampMoveLength) | (s.MachineID << machineIDMoveLength) | firstSequence
return firstId, nil
}

// SetMachineID specify the machine ID. It will panic when machined > max limit for 2^10-1.
Expand All @@ -156,20 +161,6 @@ func (s *SnowflakeInfo) SetMachineID(m int64) error {
return nil
}

// // ParseID parse snowflake it to SID struct.
// func ParseSnowflakeID(id uint64) SnowflakeID {
// t := id >> uint64(SequenceLength+MachineIDLength)
// sequence := id & uint64(MaxSequence)
// mID := (id & (uint64(MaxMachineID) << SequenceLength)) >> SequenceLength

// return SnowflakeID{
// ID: id,
// Sequence: sequence,
// MachineID: mID,
// Timestamp: t,
// }
// }

// MessageInfo contains info specific to message tables.
type MessageInfo struct {
// Fields stores the field info to be
Expand Down
Loading

0 comments on commit d9c92d2

Please sign in to comment.