Skip to content

Commit

Permalink
fix: protocol out of sync when mixing sunsubscribe requests and slot …
Browse files Browse the repository at this point in the history
…migrations (#691)

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Dec 10, 2024
1 parent 5c29d32 commit 723761f
Show file tree
Hide file tree
Showing 6 changed files with 431 additions and 52 deletions.
17 changes: 14 additions & 3 deletions hack/cmds/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func main() {
}

checkAllUsed("noRetCMDs", noRetCMDs)
checkAllUsed("unsubCMDs", unsubCMDs)
checkAllUsed("blockingCMDs", blockingCMDs)
checkAllUsed("cacheableCMDs", cacheableCMDs)
checkAllUsed("readOnlyCMDs", readOnlyCMDs)
Expand Down Expand Up @@ -727,6 +728,13 @@ func rootCf(root goStruct) (tag string) {
tag = "noRetTag"
}

if within(root, unsubCMDs) {
if tag != "" {
panic("root cf collision")
}
tag = "unsubTag"
}

if within(root, mtGetCMDs) {
if tag != "" {
panic("root cf collision")
Expand Down Expand Up @@ -1022,11 +1030,14 @@ func within(cmd goStruct, cmds map[string]bool) bool {
}

var noRetCMDs = map[string]bool{
"subscribe": false,
"psubscribe": false,
"subscribe": false,
"psubscribe": false,
"ssubscribe": false,
}

var unsubCMDs = map[string]bool{
"unsubscribe": false,
"punsubscribe": false,
"ssubscribe": false,
"sunsubscribe": false,
}

Expand Down
14 changes: 10 additions & 4 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
noRetTag = uint16(1<<12) | readonly // make noRetTag can also be retried
mtGetTag = uint16(1<<11) | readonly // make mtGetTag can also be retried
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
unsubTag = uint16(1<<9) | noRetTag
// InitSlot indicates that the command be sent to any redis node in cluster
// When SendToReplicas is set, InitSlot command will be sent to primary node
InitSlot = uint16(1 << 14)
Expand Down Expand Up @@ -38,17 +39,17 @@ var (
// UnsubscribeCmd is predefined UNSUBSCRIBE
UnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"UNSUBSCRIBE"}),
cf: noRetTag,
cf: unsubTag,
}
// PUnsubscribeCmd is predefined PUNSUBSCRIBE
PUnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"PUNSUBSCRIBE"}),
cf: noRetTag,
cf: unsubTag,
}
// SUnsubscribeCmd is predefined SUNSUBSCRIBE
SUnsubscribeCmd = Completed{
cs: newCommandSlice([]string{"SUNSUBSCRIBE"}),
cf: noRetTag,
cf: unsubTag,
}
// PingCmd is predefined PING
PingCmd = Completed{
Expand All @@ -74,7 +75,7 @@ var (
// SentinelUnSubscribe is predefined UNSUBSCRIBE ASKING
SentinelUnSubscribe = Completed{
cs: newCommandSlice([]string{"UNSUBSCRIBE", "+sentinel", "+slave", "-sdown", "+sdown", "+switch-master", "+reboot"}),
cf: noRetTag,
cf: unsubTag,
}

// DiscardCmd is predefined DISCARD
Expand Down Expand Up @@ -128,6 +129,11 @@ func (c *Completed) NoReply() bool {
return c.cf&noRetTag == noRetTag
}

// IsUnsub checks if it is one of the UNSUBSCRIBE, PUNSUBSCRIBE, or SUNSUBSCRIBE commands.
func (c *Completed) IsUnsub() bool {
return c.cf&unsubTag == unsubTag
}

// IsReadOnly checks if it is readonly command and can be retried when network error.
func (c *Completed) IsReadOnly() bool {
return c.cf&readonly == readonly
Expand Down
31 changes: 31 additions & 0 deletions internal/cmds/cmds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ func TestCompleted_NoReply(t *testing.T) {
if cmd := builder.Punsubscribe().Pattern("").Build(); !cmd.NoReply() {
t.Fatalf("should be no reply command")
}
if cmd := builder.Ssubscribe().Channel("").Build(); !cmd.NoReply() {
t.Fatalf("should be no reply command")
}
if cmd := builder.Sunsubscribe().Channel("").Build(); !cmd.NoReply() {
t.Fatalf("should be no reply command")
}
}

func TestCompleted_IsUnsub(t *testing.T) {
if cmd := NewCompleted([]string{"a", "b"}); cmd.IsUnsub() {
t.Fatalf("should not be no reply command")
}
builder := NewBuilder(InitSlot)
if cmd := builder.Subscribe().Channel("").Build(); cmd.IsUnsub() {
t.Fatalf("should be not be unsub command")
}
if cmd := builder.Unsubscribe().Channel("").Build(); !cmd.IsUnsub() {
t.Fatalf("should be unsub command")
}
if cmd := builder.Psubscribe().Pattern("").Build(); cmd.IsUnsub() {
t.Fatalf("should be not be unsub command")
}
if cmd := builder.Punsubscribe().Pattern("").Build(); !cmd.IsUnsub() {
t.Fatalf("should be unsub command")
}
if cmd := builder.Ssubscribe().Channel("").Build(); cmd.IsUnsub() {
t.Fatalf("should be not be unsub command")
}
if cmd := builder.Sunsubscribe().Channel("").Build(); !cmd.IsUnsub() {
t.Fatalf("should be unsub command")
}
}

func TestComplete_IsReadOnly(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/cmds/gen_pubsub.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ const LibVer = "1.0.51"

var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")

// See https://github.com/redis/rueidis/pull/691
func isUnsubReply(msg *RedisMessage) bool {
// ex. NOPERM User limiteduser has no permissions to run the 'ping' command
if msg.typ == '-' && strings.Contains(msg.string, "'ping'") {
msg.typ = '+'
msg.string = "PONG"
return true
}
return msg.string == "PONG" || (len(msg.values) != 0 && msg.values[0].string == "pong")
}

type wire interface {
Do(ctx context.Context, cmd Completed) RedisResult
DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult
Expand Down Expand Up @@ -438,6 +449,9 @@ func (p *pipe) _backgroundWrite() (err error) {
}
for _, cmd := range multi {
err = writeCmd(p.w, cmd.Commands())
if cmd.IsUnsub() { // See https://github.com/redis/rueidis/pull/691
err = writeCmd(p.w, cmds.PingCmd.Commands())
}
}
}
return
Expand All @@ -456,7 +470,10 @@ func (p *pipe) _backgroundRead() (err error) {
ver = p.version
prply bool // push reply
unsub bool // unsubscribe notification
r2ps = p.r2ps

skipUnsubReply bool // if unsubscribe is replied

r2ps = p.r2ps
)

defer func() {
Expand Down Expand Up @@ -516,11 +533,16 @@ func (p *pipe) _backgroundRead() (err error) {
// We should ignore them and go fetch next message.
// We also treat all the other unsubscribe notifications just like sunsubscribe,
// so that we don't need to track how many channels we have subscribed to deal with wildcard unsubscribe command
// See https://github.com/redis/rueidis/pull/691
if unsub {
prply = false
unsub = false
continue
}
if skipUnsubReply && isUnsubReply(&msg) {
skipUnsubReply = false
continue
}
panic(protocolbug)
}
if multi == nil {
Expand Down Expand Up @@ -555,7 +577,8 @@ func (p *pipe) _backgroundRead() (err error) {
// We should ignore them and go fetch next message.
// We also treat all the other unsubscribe notifications just like sunsubscribe,
// so that we don't need to track how many channels we have subscribed to deal with wildcard unsubscribe command
if unsub && (!multi[ff].NoReply() || !strings.HasSuffix(multi[ff].Commands()[0], "UNSUBSCRIBE")) {
// See https://github.com/redis/rueidis/pull/691
if unsub {
prply = false
unsub = false
continue
Expand All @@ -569,6 +592,16 @@ func (p *pipe) _backgroundRead() (err error) {
msg = RedisMessage{} // override successful subscribe/unsubscribe response to empty
} else if multi[ff].NoReply() && msg.string == "QUEUED" {
panic(multiexecsub)
} else if multi[ff].IsUnsub() && !isUnsubReply(&msg) {
// See https://github.com/redis/rueidis/pull/691
skipUnsubReply = true
} else if skipUnsubReply {
// See https://github.com/redis/rueidis/pull/691
if !isUnsubReply(&msg) {
panic(protocolbug)
}
skipUnsubReply = false
continue
}
resp := newResult(msg, err)
if resps != nil {
Expand Down
Loading

0 comments on commit 723761f

Please sign in to comment.