From 287251423a4ed07e4eb0aa8d724f9a8f44592fb9 Mon Sep 17 00:00:00 2001 From: snower Date: Mon, 20 May 2024 09:57:05 +0800 Subject: [PATCH] text redis protocol adds setting default timeout command --- protocol/textcommand.go | 436 +++++++++++++++++----------------------- server/protocol.go | 38 +++- server/transparency.go | 11 +- 3 files changed, 229 insertions(+), 256 deletions(-) diff --git a/protocol/textcommand.go b/protocol/textcommand.go index 074cdd3..4c12ca9 100644 --- a/protocol/textcommand.go +++ b/protocol/textcommand.go @@ -13,6 +13,7 @@ import ( type ITextProtocol interface { GetDBId() uint8 GetLockId() [16]byte + GetTimeout() uint16 GetLockCommand() *LockCommand FreeLockCommand(lockCommand *LockCommand) error GetParser() *TextParser @@ -34,25 +35,27 @@ func (self *TextCommandConverter) FindHandler(name string) (ConvertTextCommand, self.handlers = make(map[string]ConvertTextCommand, 64) self.handlers["LOCK"] = self.ConvertTextLockAndUnLockCommand self.handlers["UNLOCK"] = self.ConvertTextLockAndUnLockCommand + self.handlers["DEL"] = self.ConvertTextDelCommand self.handlers["SET"] = self.ConvertTextSetCommand - self.handlers["APPEND"] = self.ConvertTextAppendCommand - self.handlers["GETSET"] = self.ConvertTextGetSetCommand self.handlers["SETEX"] = self.ConvertTextSetEXCommand self.handlers["PSETEX"] = self.ConvertTextSetEXCommand self.handlers["SETNX"] = self.ConvertTextSetNXCommand - self.handlers["GET"] = self.ConvertTextGetCommand + self.handlers["APPEND"] = self.ConvertTextAppendCommand + self.handlers["GETSET"] = self.ConvertTextGetSetCommand self.handlers["INCR"] = self.ConvertTextIncrCommand self.handlers["INCRBY"] = self.ConvertTextIncrCommand self.handlers["DECR"] = self.ConvertTextDecrCommand self.handlers["DECRBY"] = self.ConvertTextDecrCommand - self.handlers["STRLEN"] = self.ConvertTextStrlenCommand - self.handlers["EXISTS"] = self.ConvertTextExistsCommand self.handlers["EXPIRE"] = self.ConvertTextExpireCommand self.handlers["PEXPIREAT"] = self.ConvertTextExpireCommand self.handlers["PEXPIRE"] = self.ConvertTextExpireCommand self.handlers["PEXPIREAT"] = self.ConvertTextExpireCommand self.handlers["PERSIST"] = self.ConvertTextExpireCommand + + self.handlers["GET"] = self.ConvertTextGetCommand + self.handlers["STRLEN"] = self.ConvertTextStrlenCommand + self.handlers["EXISTS"] = self.ConvertTextExistsCommand self.handlers["TYPE"] = self.ConvertTextTypeCommand self.handlers["DUMP"] = self.ConvertTextDumpCommand } @@ -396,13 +399,10 @@ func (self *TextCommandConverter) WriteTextLockAndUnLockCommandResult(textProtoc bufIndex += copy(wbuf[bufIndex:], []byte("\r\n")) err := stream.WriteBytes(wbuf[:bufIndex]) - if err == nil { - if lockCommandResult.Flag&UNLOCK_FLAG_CONTAINS_DATA != 0 { - data := lockCommandResult.Data.GetStringValue() - err = stream.WriteBytes([]byte(fmt.Sprintf("$4\r\nDATA\r\n$%d\r\n%s\r\n", len(data), data))) - } + if err == nil && lockCommandResult.Flag&UNLOCK_FLAG_CONTAINS_DATA != 0 { + data := lockCommandResult.Data.GetStringValue() + return stream.WriteBytes([]byte(fmt.Sprintf("$4\r\nDATA\r\n$%d\r\n%s\r\n", len(data), data))) } - lockCommandResult.Data = nil return err } @@ -429,13 +429,9 @@ func (self *TextCommandConverter) ConvertTextDelCommand(textProtocol ITextProtoc func (self *TextCommandConverter) WriteTextDelCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != 0 { - err := stream.WriteBytes([]byte(":0\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(":0\r\n")) } - err := stream.WriteBytes([]byte(":1\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(":1\r\n")) } func (self *TextCommandConverter) ConvertTextSetCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { @@ -457,7 +453,7 @@ func (self *TextCommandConverter) ConvertTextSetCommand(textProtocol ITextProtoc } } if lockCommand.Flag&LOCK_FLAG_UPDATE_WHEN_LOCKED == 0 && lockCommand.Timeout == 0 && lockCommand.TimeoutFlag == 0 { - lockCommand.Timeout = 15 + lockCommand.Timeout = textProtocol.GetTimeout() } if lockCommand.Expried == 0 && lockCommand.ExpriedFlag == 0 { lockCommand.Expried = 0x7fff @@ -475,71 +471,11 @@ func (self *TextCommandConverter) ConvertTextSetCommand(textProtocol ITextProtoc func (self *TextCommandConverter) WriteTextSetCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { if lockCommandResult.Result == RESULT_TIMEOUT { - err := stream.WriteBytes([]byte("$-1\r\n")) - lockCommandResult.Data = nil - return err - } - err := stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) - lockCommandResult.Data = nil - return err - } - err := stream.WriteBytes([]byte("+OK\r\n")) - lockCommandResult.Data = nil - return err -} - -func (self *TextCommandConverter) ConvertTextAppendCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { - if len(args) < 3 { - return nil, nil, errors.New("Command Parse Args Count Error") - } - - lockCommand := self.GetAndResetLockCommand(textProtocol) - lockCommand.CommandType = COMMAND_LOCK - self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) - lockCommand.LockId = lockCommand.LockKey - lockCommand.Flag = LOCK_FLAG_UPDATE_WHEN_LOCKED | LOCK_FLAG_CONTAINS_DATA - lockCommand.Data = NewLockCommandDataAppendStringWithProperty(args[2], []*LockCommandDataProperty{NewLockCommandDataProperty(LOCK_DATA_PROPERTY_CODE_KEY, []byte(args[1]))}) - - if len(args) > 3 { - err := self.ConvertArgs2Flag(lockCommand, args[3:]) - if err != nil { - _ = textProtocol.FreeLockCommand(lockCommand) - return nil, nil, err - } - } - if lockCommand.Expried == 0 && lockCommand.ExpriedFlag == 0 { - lockCommand.Expried = 0xffff - lockCommand.ExpriedFlag = EXPRIED_FLAG_UNLIMITED_EXPRIED_TIME | EXPRIED_FLAG_ZEOR_AOF_TIME | EXPRIED_FLAG_UPDATE_NO_RESET_EXPRIED_CHECKED_COUNT - } else { - if lockCommand.ExpriedFlag&EXPRIED_FLAG_UNLIMITED_AOF_TIME != 0 { - lockCommand.ExpriedFlag |= EXPRIED_FLAG_UPDATE_NO_RESET_EXPRIED_CHECKED_COUNT - } else { - lockCommand.ExpriedFlag |= EXPRIED_FLAG_ZEOR_AOF_TIME | EXPRIED_FLAG_UPDATE_NO_RESET_EXPRIED_CHECKED_COUNT - } - } - return lockCommand, func(textProtocol ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { - if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { - if lockCommandResult.Result == RESULT_TIMEOUT { - werr := stream.WriteBytes([]byte("$-1\r\n")) - lockCommandResult.Data = nil - return werr - } - werr := stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte("$-1\r\n")) } - werr := stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockCommandResult.Data.GetValueSize()+len(args[2])))) - lockCommandResult.Data = nil - return werr - }, nil -} - -func (self *TextCommandConverter) ConvertTextGetSetCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { - lockCommand, _, err := self.ConvertTextSetCommand(textProtocol, args) - if err != nil { - return nil, nil, err + return stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) } - return lockCommand, self.WriteTextGetCommandResult, nil + return stream.WriteBytes([]byte("+OK\r\n")) } func (self *TextCommandConverter) ConvertTextSetNXCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { @@ -553,7 +489,7 @@ func (self *TextCommandConverter) ConvertTextSetNXCommand(textProtocol ITextProt lockCommand.LockId = GenLockId() lockCommand.Flag = LOCK_FLAG_CONTAINS_DATA lockCommand.Data = NewLockCommandDataSetStringWithProperty(args[2], []*LockCommandDataProperty{NewLockCommandDataProperty(LOCK_DATA_PROPERTY_CODE_KEY, []byte(args[1]))}) - lockCommand.Timeout = 15 + lockCommand.Timeout = textProtocol.GetTimeout() if len(args) > 3 { err := self.ConvertArgs2Flag(lockCommand, args[3:]) if err != nil { @@ -577,17 +513,11 @@ func (self *TextCommandConverter) ConvertTextSetNXCommand(textProtocol ITextProt func (self *TextCommandConverter) WriteTextSetNXCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { if lockCommandResult.Result == RESULT_TIMEOUT { - err := stream.WriteBytes([]byte(":0\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(":0\r\n")) } - err := stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) } - err := stream.WriteBytes([]byte(":1\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(":1\r\n")) } func (self *TextCommandConverter) ConvertTextSetEXCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { @@ -603,6 +533,7 @@ func (self *TextCommandConverter) ConvertTextSetEXCommand(textProtocol ITextProt lockCommand.Data = NewLockCommandDataSetStringWithProperty(args[3], []*LockCommandDataProperty{NewLockCommandDataProperty(LOCK_DATA_PROPERTY_CODE_KEY, []byte(args[1]))}) expried, err := strconv.ParseInt(args[2], 10, 64) if err != nil { + _ = textProtocol.FreeLockCommand(lockCommand) return nil, nil, errors.New("Command Parse EX Value Error") } lockCommand.Expried = uint16(expried & 0xffff) @@ -647,8 +578,16 @@ func (self *TextCommandConverter) ConvertTextSetEXCommand(textProtocol ITextProt return lockCommand, self.WriteTextSetCommandResult, nil } -func (self *TextCommandConverter) ConvertTextGetCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { - if len(args) < 2 { +func (self *TextCommandConverter) ConvertTextGetSetCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { + lockCommand, _, err := self.ConvertTextSetCommand(textProtocol, args) + if err != nil { + return nil, nil, err + } + return lockCommand, self.WriteTextGetCommandResult, nil +} + +func (self *TextCommandConverter) ConvertTextAppendCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { + if len(args) < 3 { return nil, nil, errors.New("Command Parse Args Count Error") } @@ -656,48 +595,35 @@ func (self *TextCommandConverter) ConvertTextGetCommand(textProtocol ITextProtoc lockCommand.CommandType = COMMAND_LOCK self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) lockCommand.LockId = lockCommand.LockKey - lockCommand.Flag = LOCK_FLAG_SHOW_WHEN_LOCKED - return lockCommand, self.WriteTextGetCommandResult, nil -} - -func (self *TextCommandConverter) WriteTextGetCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { - if (lockCommandResult.Result != RESULT_UNOWN_ERROR && lockCommandResult.Result != RESULT_LOCKED_ERROR) || lockCommandResult.Data == nil { - err := stream.WriteBytes([]byte("$-1\r\n")) - lockCommandResult.Data = nil - return err - } + lockCommand.Flag = LOCK_FLAG_UPDATE_WHEN_LOCKED | LOCK_FLAG_CONTAINS_DATA + lockCommand.Data = NewLockCommandDataAppendStringWithProperty(args[2], []*LockCommandDataProperty{NewLockCommandDataProperty(LOCK_DATA_PROPERTY_CODE_KEY, []byte(args[1]))}) - lockResultCommandData := lockCommandResult.Data - lockCommandResult.Data = nil - if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_NUMBER != 0 { - return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockResultCommandData.GetIncrValue()))) - } - if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_ARRAY != 0 { - values := lockResultCommandData.GetArrayValue() - if values == nil || len(values) == 0 { - return stream.WriteBytes([]byte("$-1\r\n")) + if len(args) > 3 { + err := self.ConvertArgs2Flag(lockCommand, args[3:]) + if err != nil { + _ = textProtocol.FreeLockCommand(lockCommand) + return nil, nil, err } - value := string(values[0]) - return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(value), value))) } - if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_KV != 0 { - values := lockResultCommandData.GetKVValue() - if values == nil || len(values) == 0 { - return stream.WriteBytes([]byte("$-1\r\n")) - } - var value string - for _, v := range values { - value = string(v) - break + if lockCommand.Expried == 0 && lockCommand.ExpriedFlag == 0 { + lockCommand.Expried = 0xffff + lockCommand.ExpriedFlag = EXPRIED_FLAG_UNLIMITED_EXPRIED_TIME | EXPRIED_FLAG_ZEOR_AOF_TIME | EXPRIED_FLAG_UPDATE_NO_RESET_EXPRIED_CHECKED_COUNT + } else { + if lockCommand.ExpriedFlag&EXPRIED_FLAG_UNLIMITED_AOF_TIME != 0 { + lockCommand.ExpriedFlag |= EXPRIED_FLAG_UPDATE_NO_RESET_EXPRIED_CHECKED_COUNT + } else { + lockCommand.ExpriedFlag |= EXPRIED_FLAG_ZEOR_AOF_TIME | EXPRIED_FLAG_UPDATE_NO_RESET_EXPRIED_CHECKED_COUNT } - return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(value), value))) } - - if len(lockResultCommandData.Data) <= 6 { - return stream.WriteBytes([]byte("$-1\r\n")) - } - value := lockResultCommandData.GetStringValue() - return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(value), value))) + return lockCommand, func(textProtocol ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { + if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { + if lockCommandResult.Result == RESULT_TIMEOUT { + return stream.WriteBytes([]byte("$-1\r\n")) + } + return stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) + } + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockCommandResult.Data.GetValueSize()+len(args[2])))) + }, nil } func (self *TextCommandConverter) ConvertTextIncrCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { @@ -741,18 +667,12 @@ func (self *TextCommandConverter) ConvertTextIncrCommand(textProtocol ITextProto } return lockCommand, func(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { - werr := stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) } if lockCommandResult.Data == nil { - werr := stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", incrValue))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", incrValue))) } - werr := stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockCommandResult.Data.GetIncrValue()+incrValue))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockCommandResult.Data.GetIncrValue()+incrValue))) }, nil } @@ -797,98 +717,15 @@ func (self *TextCommandConverter) ConvertTextDecrCommand(textProtocol ITextProto } return lockCommand, func(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { - werr := stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) } if lockCommandResult.Data == nil { - werr := stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", incrValue))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", incrValue))) } - werr := stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockCommandResult.Data.GetIncrValue()+incrValue))) - lockCommandResult.Data = nil - return werr + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockCommandResult.Data.GetIncrValue()+incrValue))) }, nil } -func (self *TextCommandConverter) ConvertTextStrlenCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { - if len(args) < 2 { - return nil, nil, errors.New("Command Parse Args Count Error") - } - - lockCommand := self.GetAndResetLockCommand(textProtocol) - lockCommand.CommandType = COMMAND_LOCK - self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) - lockCommand.LockId = lockCommand.LockKey - lockCommand.Flag = LOCK_FLAG_SHOW_WHEN_LOCKED - return lockCommand, self.WriteTextStrlenCommandResult, nil -} - -func (self *TextCommandConverter) WriteTextStrlenCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { - if lockCommandResult.Result != RESULT_UNOWN_ERROR || lockCommandResult.Data == nil { - err := stream.WriteBytes([]byte(":0\r\n")) - lockCommandResult.Data = nil - return err - } - - lockResultCommandData := lockCommandResult.Data - lockCommandResult.Data = nil - if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_NUMBER != 0 { - return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(fmt.Sprintf("%d", lockResultCommandData.GetIncrValue()))))) - } - if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_ARRAY != 0 { - values := lockResultCommandData.GetArrayValue() - if values == nil || len(values) == 0 { - return stream.WriteBytes([]byte(":0\r\n")) - } - value := string(values[0]) - return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(value)))) - } - if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_KV != 0 { - values := lockResultCommandData.GetKVValue() - if values == nil || len(values) == 0 { - return stream.WriteBytes([]byte(":0\r\n")) - } - var value string - for _, v := range values { - value = string(v) - break - } - return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(value)))) - } - - if len(lockResultCommandData.Data) <= 6 { - return stream.WriteBytes([]byte(":0\r\n")) - } - value := lockResultCommandData.GetStringValue() - return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(value)))) -} - -func (self *TextCommandConverter) ConvertTextExistsCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { - if len(args) < 2 { - return nil, nil, errors.New("Command Parse Args Count Error") - } - - lockCommand := self.GetAndResetLockCommand(textProtocol) - lockCommand.CommandType = COMMAND_LOCK - self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) - lockCommand.LockId = lockCommand.LockKey - lockCommand.Flag = LOCK_FLAG_SHOW_WHEN_LOCKED - return lockCommand, self.WriteTextExistsCommandResult, nil -} - -func (self *TextCommandConverter) WriteTextExistsCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { - if lockCommandResult.Result != RESULT_UNOWN_ERROR || lockCommandResult.Data == nil { - err := stream.WriteBytes([]byte(":0\r\n")) - lockCommandResult.Data = nil - return err - } - err := stream.WriteBytes([]byte(":1\r\n")) - lockCommandResult.Data = nil - return err -} - func (self *TextCommandConverter) ConvertTextExpireCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { if len(args) < 3 { return nil, nil, errors.New("Command Parse Args Count Error") @@ -901,6 +738,7 @@ func (self *TextCommandConverter) ConvertTextExpireCommand(textProtocol ITextPro lockCommand.Flag = LOCK_FLAG_UPDATE_WHEN_LOCKED expried, err := strconv.ParseInt(args[2], 10, 64) if err != nil { + _ = textProtocol.FreeLockCommand(lockCommand) return nil, nil, errors.New("Command Parse EX Value Error") } switch strings.ToUpper(args[0]) { @@ -967,17 +805,131 @@ func (self *TextCommandConverter) ConvertTextExpireCommand(textProtocol ITextPro func (self *TextCommandConverter) WriteTextExpireCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != 0 && lockCommandResult.Result != RESULT_LOCKED_ERROR { if lockCommandResult.Result == RESULT_TIMEOUT { - err := stream.WriteBytes([]byte(":0\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(":0\r\n")) } - err := stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(fmt.Sprintf("-ERR %d\r\n", lockCommandResult.Result))) } - err := stream.WriteBytes([]byte(":1\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(":1\r\n")) +} + +func (self *TextCommandConverter) ConvertTextGetCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { + if len(args) < 2 { + return nil, nil, errors.New("Command Parse Args Count Error") + } + + lockCommand := self.GetAndResetLockCommand(textProtocol) + lockCommand.CommandType = COMMAND_LOCK + self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) + lockCommand.LockId = lockCommand.LockKey + lockCommand.Flag = LOCK_FLAG_SHOW_WHEN_LOCKED + return lockCommand, self.WriteTextGetCommandResult, nil +} + +func (self *TextCommandConverter) WriteTextGetCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { + if (lockCommandResult.Result != RESULT_UNOWN_ERROR && lockCommandResult.Result != RESULT_LOCKED_ERROR) || lockCommandResult.Data == nil { + return stream.WriteBytes([]byte("$-1\r\n")) + } + + lockResultCommandData := lockCommandResult.Data + if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_NUMBER != 0 { + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", lockResultCommandData.GetIncrValue()))) + } + if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_ARRAY != 0 { + values := lockResultCommandData.GetArrayValue() + if values == nil || len(values) == 0 { + return stream.WriteBytes([]byte("$-1\r\n")) + } + value := string(values[0]) + return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(value), value))) + } + if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_KV != 0 { + values := lockResultCommandData.GetKVValue() + if values == nil || len(values) == 0 { + return stream.WriteBytes([]byte("$-1\r\n")) + } + var value string + for _, v := range values { + value = string(v) + break + } + return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(value), value))) + } + + if len(lockResultCommandData.Data) <= 6 { + return stream.WriteBytes([]byte("$-1\r\n")) + } + value := lockResultCommandData.GetStringValue() + return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(value), value))) +} + +func (self *TextCommandConverter) ConvertTextStrlenCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { + if len(args) < 2 { + return nil, nil, errors.New("Command Parse Args Count Error") + } + + lockCommand := self.GetAndResetLockCommand(textProtocol) + lockCommand.CommandType = COMMAND_LOCK + self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) + lockCommand.LockId = lockCommand.LockKey + lockCommand.Flag = LOCK_FLAG_SHOW_WHEN_LOCKED + return lockCommand, self.WriteTextStrlenCommandResult, nil +} + +func (self *TextCommandConverter) WriteTextStrlenCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { + if lockCommandResult.Result != RESULT_UNOWN_ERROR || lockCommandResult.Data == nil { + return stream.WriteBytes([]byte(":0\r\n")) + } + + lockResultCommandData := lockCommandResult.Data + if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_NUMBER != 0 { + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(fmt.Sprintf("%d", lockResultCommandData.GetIncrValue()))))) + } + if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_ARRAY != 0 { + values := lockResultCommandData.GetArrayValue() + if values == nil || len(values) == 0 { + return stream.WriteBytes([]byte(":0\r\n")) + } + value := string(values[0]) + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(value)))) + } + if lockResultCommandData.DataFlag&LOCK_DATA_FLAG_VALUE_TYPE_KV != 0 { + values := lockResultCommandData.GetKVValue() + if values == nil || len(values) == 0 { + return stream.WriteBytes([]byte(":0\r\n")) + } + var value string + for _, v := range values { + value = string(v) + break + } + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(value)))) + } + + if len(lockResultCommandData.Data) <= 6 { + return stream.WriteBytes([]byte(":0\r\n")) + } + value := lockResultCommandData.GetStringValue() + return stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", len(value)))) +} + +func (self *TextCommandConverter) ConvertTextExistsCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { + if len(args) < 2 { + return nil, nil, errors.New("Command Parse Args Count Error") + } + + lockCommand := self.GetAndResetLockCommand(textProtocol) + lockCommand.CommandType = COMMAND_LOCK + self.ConvertArgId2LockId(args[1], &lockCommand.LockKey) + lockCommand.LockId = lockCommand.LockKey + lockCommand.Flag = LOCK_FLAG_SHOW_WHEN_LOCKED + return lockCommand, self.WriteTextExistsCommandResult, nil +} + +func (self *TextCommandConverter) WriteTextExistsCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { + if lockCommandResult.Result != RESULT_UNOWN_ERROR || lockCommandResult.Data == nil { + return stream.WriteBytes([]byte(":0\r\n")) + } + return stream.WriteBytes([]byte(":1\r\n")) } func (self *TextCommandConverter) ConvertTextTypeCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { @@ -995,13 +947,9 @@ func (self *TextCommandConverter) ConvertTextTypeCommand(textProtocol ITextProto func (self *TextCommandConverter) WriteTextTypeCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != RESULT_UNOWN_ERROR || lockCommandResult.Data == nil { - err := stream.WriteBytes([]byte("+none\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte("+none\r\n")) } - err := stream.WriteBytes([]byte("+string\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte("+string\r\n")) } func (self *TextCommandConverter) ConvertTextDumpCommand(textProtocol ITextProtocol, args []string) (*LockCommand, WriteTextCommandResultFunc, error) { @@ -1019,11 +967,7 @@ func (self *TextCommandConverter) ConvertTextDumpCommand(textProtocol ITextProto func (self *TextCommandConverter) WriteTextDumpCommandResult(_ ITextProtocol, stream ISteam, lockCommandResult *LockResultCommand) error { if lockCommandResult.Result != RESULT_UNOWN_ERROR || lockCommandResult.Data == nil { - err := stream.WriteBytes([]byte("$-1\r\n")) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte("$-1\r\n")) } - err := stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(lockCommandResult.Data.Data), lockCommandResult.Data.Data))) - lockCommandResult.Data = nil - return err + return stream.WriteBytes([]byte(fmt.Sprintf("$%d\r\n%s\r\n", len(lockCommandResult.Data.Data), lockCommandResult.Data.Data))) } diff --git a/server/protocol.go b/server/protocol.go index 3bd65ce..8e96c40 100755 --- a/server/protocol.go +++ b/server/protocol.go @@ -1812,6 +1812,7 @@ type TextServerProtocol struct { lockId [16]byte willCommands *LockCommandQueue totalCommandCount uint64 + timeout uint16 dbId uint8 closed bool } @@ -1821,7 +1822,7 @@ func NewTextServerProtocol(slock *SLock, stream *Stream) *TextServerProtocol { parser := protocol.NewTextParser(make([]byte, 1024), make([]byte, 1024)) serverProtocol := &TextServerProtocol{slock, &sync.Mutex{}, stream, nil, make([]*ProxyServerProtocol, 0), make([]*protocol.LockCommand, FREE_COMMAND_MAX_SIZE), 0, NewLockCommandQueue(4, 64, FREE_COMMAND_QUEUE_INIT_SIZE), nil, parser, protocol.NewTextCommandConverter(), - nil, make(chan *protocol.LockResultCommand, 4), [16]byte{}, [16]byte{}, nil, 0, 0, false} + nil, make(chan *protocol.LockResultCommand, 4), [16]byte{}, [16]byte{}, nil, 0, 15, 0, false} proxy.serverProtocol = serverProtocol serverProtocol.InitLockCommand() serverProtocol.session = slock.addServerProtocol(serverProtocol) @@ -1834,6 +1835,7 @@ func (self *TextServerProtocol) FindHandler(name string) (TextServerProtocolComm if self.handlers == nil { self.handlers = make(map[string]TextServerProtocolCommandHandler, 64) self.handlers["SELECT"] = self.commandHandlerSelectDB + self.handlers["TIMEOUT"] = self.commandHandlerTimeoutDB self.handlers["LOCK"] = self.commandHandlerLock self.handlers["UNLOCK"] = self.commandHandlerUnlock self.handlers["PUSH"] = self.commandHandlerPush @@ -1942,6 +1944,10 @@ func (self *TextServerProtocol) GetLockId() [16]byte { return self.lockId } +func (self *TextServerProtocol) GetTimeout() uint16 { + return self.timeout +} + func (self *TextServerProtocol) GetParser() *protocol.TextParser { return self.parser } @@ -2490,6 +2496,22 @@ func (self *TextServerProtocol) commandHandlerSelectDB(_ *TextServerProtocol, ar return self.stream.WriteBytes(self.parser.BuildResponse(true, "OK", nil)) } +func (self *TextServerProtocol) commandHandlerTimeoutDB(_ *TextServerProtocol, args []string) error { + if len(args) < 3 { + return self.stream.WriteBytes(self.parser.BuildResponse(false, "ERR Command Parse Len Error", nil)) + } + if strings.ToUpper(args[1]) != "SET" { + return self.stream.WriteBytes([]byte(fmt.Sprintf(":%d\r\n", self.timeout))) + } + + timeout, err := strconv.Atoi(args[2]) + if err != nil { + return self.stream.WriteBytes(self.parser.BuildResponse(false, "ERR Command Parse Timeout Value Error", nil)) + } + self.timeout = uint16(timeout) + return self.stream.WriteBytes(self.parser.BuildResponse(true, "OK", nil)) +} + func (self *TextServerProtocol) commandHandlerLock(_ *TextServerProtocol, args []string) error { lockCommand, writeTextCommandResultFunc, err := self.commandConverter.ConvertTextLockAndUnLockCommand(self, args) if err != nil { @@ -2531,7 +2553,7 @@ func (self *TextServerProtocol) commandHandlerLock(_ *TextServerProtocol, args [ self.lockId = lockCommandResult.LockId } err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) - self.freeCommandResult = lockCommandResult + self.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil return err } @@ -2581,7 +2603,7 @@ func (self *TextServerProtocol) commandHandlerUnlock(_ *TextServerProtocol, args 0, 0, 0, 0, 0, 0, 0, 0 } err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) - self.freeCommandResult = lockCommandResult + self.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil return err } @@ -2635,11 +2657,11 @@ func (self *TextServerProtocol) commandHandlerKeyWriteValueCommand(_ *TextServer self.lockRequestId[8], self.lockRequestId[9], self.lockRequestId[10], self.lockRequestId[11], self.lockRequestId[12], self.lockRequestId[13], self.lockRequestId[14], self.lockRequestId[15] = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - return self.stream.WriteBytes(self.parser.BuildResponse(false, "ERR Lock Error", nil)) + return self.stream.WriteBytes(self.parser.BuildResponse(false, err.Error(), nil)) } lockCommandResult := <-self.lockWaiter err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) - self.freeCommandResult = lockCommandResult + self.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil return err } @@ -2670,8 +2692,8 @@ func (self *TextServerProtocol) commandHandlerKeyReadValueCommand(_ *TextServerP lockCommandResult := self.freeCommandResult if lockCommandResult == nil { lockCommandResult = protocol.NewLockResultCommand(lockCommand, result, 0, lcount, count, lrcount, rcount, data) - self.freeCommandResult = lockCommandResult } else { + self.freeCommandResult = nil lockCommandResult.CommandType = lockCommand.CommandType lockCommandResult.RequestId = lockCommand.RequestId lockCommandResult.Result = result @@ -2693,8 +2715,10 @@ func (self *TextServerProtocol) commandHandlerKeyReadValueCommand(_ *TextServerP } else { lockCommandResult.Data = nil } + err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) _ = self.FreeLockCommand(lockCommand) - return writeTextCommandResultFunc(self, self.stream, lockCommandResult) + self.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil + return err } func (self *TextServerProtocol) commandHandlerKeysCommand(_ *TextServerProtocol, args []string) error { diff --git a/server/transparency.go b/server/transparency.go index 17ce6cf..aa08e95 100644 --- a/server/transparency.go +++ b/server/transparency.go @@ -864,6 +864,7 @@ func (self *TransparencyTextServerProtocol) FindHandler(name string) (TextServer if self.handlers == nil { self.handlers = make(map[string]TextServerProtocolCommandHandler, 64) self.handlers["SELECT"] = self.serverProtocol.commandHandlerSelectDB + self.handlers["TIMEOUT"] = self.serverProtocol.commandHandlerTimeoutDB self.handlers["LOCK"] = self.commandHandlerLock self.handlers["UNLOCK"] = self.commandHandlerUnlock self.handlers["PUSH"] = self.commandHandlerPush @@ -959,6 +960,10 @@ func (self *TransparencyTextServerProtocol) GetLockId() [16]byte { return self.serverProtocol.GetLockId() } +func (self *TransparencyTextServerProtocol) GetTimeout() uint16 { + return self.serverProtocol.GetTimeout() +} + func (self *TransparencyTextServerProtocol) GetParser() *protocol.TextParser { return self.serverProtocol.GetParser() } @@ -1204,7 +1209,7 @@ func (self *TransparencyTextServerProtocol) commandHandlerLock(serverProtocol *T } err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) _ = self.serverProtocol.FreeLockCommand(lockCommand) - self.serverProtocol.freeCommandResult = lockCommandResult + self.serverProtocol.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil return err } @@ -1264,7 +1269,7 @@ func (self *TransparencyTextServerProtocol) commandHandlerUnlock(serverProtocol } err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) _ = self.serverProtocol.FreeLockCommand(lockCommand) - self.serverProtocol.freeCommandResult = lockCommandResult + self.serverProtocol.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil return err } @@ -1332,7 +1337,7 @@ func (self *TransparencyTextServerProtocol) commandHandlerKeyWriteValueCommand(s lockCommandResult := <-self.lockWaiter err = writeTextCommandResultFunc(self, self.stream, lockCommandResult) _ = self.serverProtocol.FreeLockCommand(lockCommand) - self.serverProtocol.freeCommandResult = lockCommandResult + self.serverProtocol.freeCommandResult, lockCommandResult.Data = lockCommandResult, nil return err }