From c2dabfc4d5578f605c23215646f67123246e742d Mon Sep 17 00:00:00 2001 From: Julian Gutierrez Oschmann Date: Wed, 14 Mar 2012 02:57:40 -0300 Subject: [PATCH] Some minor changes to make gocached compile against RC1 --- cachestorage.go | 68 ++--- command.go | 565 ++++++++++++++++++++-------------------- eventnotifierstorage.go | 94 +++---- generationalstorage.go | 175 ++++++------- gocached.go | 85 +++--- hashingstorage.go | 60 ++--- heapexpiringstorage.go | 50 ++-- mapcachestorage.go | 80 +++--- 8 files changed, 586 insertions(+), 591 deletions(-) diff --git a/cachestorage.go b/cachestorage.go index 9553eba..50a379b 100644 --- a/cachestorage.go +++ b/cachestorage.go @@ -1,57 +1,57 @@ package main const ( - Ok = iota - KeyAlreadyInUse - KeyNotFound - IllegalParameter + Ok = iota + KeyAlreadyInUse + KeyNotFound + IllegalParameter ) -type ErrorCode uint; +type ErrorCode uint type StorageEntry struct { - exptime uint32 - flags uint32 - bytes uint32 - cas_unique uint64 - content []byte + exptime uint32 + flags uint32 + bytes uint32 + cas_unique uint64 + content []byte } type CacheStorageFactory func() CacheStorage type CacheStorage interface { - // Store this data. - Set(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (previous *StorageEntry, result *StorageEntry) + // Store this data. + Set(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (previous *StorageEntry, result *StorageEntry) - // Store this data, but only if the server *doesn't* already hold data for this key - Add(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (err ErrorCode, result *StorageEntry) + // Store this data, but only if the server *doesn't* already hold data for this key + Add(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (err ErrorCode, result *StorageEntry) - // Store this data, but only if the server *does* already hold data for this key - Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) + // Store this data, but only if the server *does* already hold data for this key + Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) - // Add this data to an existing key after existing data - Append(key string, bytes uint32, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) + // Add this data to an existing key after existing data + Append(key string, bytes uint32, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) - // Add this data to an existing key before existing data - Prepend(key string, bytes uint32, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) + // Add this data to an existing key before existing data + Prepend(key string, bytes uint32, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) - // Check and set (CAS) operation which means "store this data but - // only if no one else has updated since I last fetched it" - Cas(key string, flags uint32, exptime uint32, bytes uint32, cas_unique uint64, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) + // Check and set (CAS) operation which means "store this data but + // only if no one else has updated since I last fetched it" + Cas(key string, flags uint32, exptime uint32, bytes uint32, cas_unique uint64, content []byte) (err ErrorCode, previous *StorageEntry, result *StorageEntry) - // Retrieve the stored data for a given key - Get(key string) (err ErrorCode, result *StorageEntry) + // Retrieve the stored data for a given key + Get(key string) (err ErrorCode, result *StorageEntry) - // Delete the stored data for a given key - Delete(key string) (err ErrorCode, deleted *StorageEntry) + // Delete the stored data for a given key + Delete(key string) (err ErrorCode, deleted *StorageEntry) - // Change data for some item in-place, incrementing or decrementing it. - // The data for the item is treated as decimal representation of a 64-bit unsigned integer. - // If the current data value does not conform to such a representation, returns an error. - // Also, the item must already exist for incr/decr to work; these commands won't pretend - // that a non-existent key exists with value 0; instead, they will fail. - Incr(key string, value uint64, incr bool) (err ErrorCode, previous *StorageEntry, result *StorageEntry) + // Change data for some item in-place, incrementing or decrementing it. + // The data for the item is treated as decimal representation of a 64-bit unsigned integer. + // If the current data value does not conform to such a representation, returns an error. + // Also, the item must already exist for incr/decr to work; these commands won't pretend + // that a non-existent key exists with value 0; instead, they will fail. + Incr(key string, value uint64, incr bool) (err ErrorCode, previous *StorageEntry, result *StorageEntry) - Expire(key string, check bool) + Expire(key string, check bool) } diff --git a/command.go b/command.go index 5f8c79b..7ccf94e 100644 --- a/command.go +++ b/command.go @@ -1,156 +1,155 @@ package main import ( - "os" - "net" - "bufio" - "strings" - "strconv" - "time" - "fmt" + "bufio" + "fmt" + "net" + "strconv" + "strings" + "time" ) type Session struct { - conn *net.TCPConn - bufreader *bufio.Reader - storage CacheStorage + conn *net.TCPConn + bufreader *bufio.Reader + storage CacheStorage } type Command interface { - parse(line []string) bool - Exec() + parse(line []string) bool + Exec() } type StorageCommand struct { - session *Session - command string - key string - flags uint32 - exptime uint32 - bytes uint32 - cas_unique uint64 - noreply bool - data []byte + session *Session + command string + key string + flags uint32 + exptime uint32 + bytes uint32 + cas_unique uint64 + noreply bool + data []byte } type RetrievalCommand struct { - session *Session - command string - keys []string + session *Session + command string + keys []string } type DeleteCommand struct { - session *Session - command string - key string - noreply bool + session *Session + command string + key string + noreply bool } type TouchCommand struct { - session *Session - command string - key string - exptime uint32 - noreply bool + session *Session + command string + key string + exptime uint32 + noreply bool } type IncrCommand struct { - session *Session - incr bool - key string - value uint64 - noreply bool + session *Session + incr bool + key string + value uint64 + noreply bool } type UnknownCommand struct { - session *Session - command string + session *Session + command string } type UninmplementedCommand struct { - session *Session - command string + session *Session + command string } const ( - NA = iota - InvalidCommand - ClientError - ServerError + NA = iota + InvalidCommand + ClientError + ServerError ) -func NewSession(conn *net.TCPConn, store CacheStorage) (*Session, os.Error) { - var s = &Session{conn, bufio.NewReader(conn), store} - return s, nil +func NewSession(conn *net.TCPConn, store CacheStorage) (*Session, error) { + var s = &Session{conn, bufio.NewReader(conn), store} + return s, nil } - /* Read a line and tokenize it */ func getTokenizedLine(r *bufio.Reader) []string { - if rawline, _, err := r.ReadLine(); err == nil { - return strings.Fields(string(rawline)) - } - return nil + if rawline, _, err := r.ReadLine(); err == nil { + return strings.Fields(string(rawline)) + } + return nil } - func (s *Session) CommandLoop() { - for line := getTokenizedLine(s.bufreader); - line != nil; line = getTokenizedLine(s.bufreader) { - var cmd Command = cmdSelect(line[0], s) - if cmd.parse(line) { - cmd.Exec() - } - } + for line := getTokenizedLine(s.bufreader); line != nil; line = getTokenizedLine(s.bufreader) { + var cmd Command = cmdSelect(line[0], s) + if cmd.parse(line) { + cmd.Exec() + } + } } func cmdSelect(name string, s *Session) Command { - switch name { - - case "set", "add", "replace", "append", "prepend", "cas": - return &StorageCommand{session: s} - case "get", "gets": - return &RetrievalCommand{session: s} - case "delete": - return &DeleteCommand{session: s} - case "touch": - return &TouchCommand{session: s} - case "incr", "decr": - return &IncrCommand{session: s} - case "stats", "flush_all", "version", "quit": - return &UninmplementedCommand{session: s, command: name} - default: - return &UnknownCommand{session: s, command: name} - } - //not reaching here - return nil + switch name { + + case "set", "add", "replace", "append", "prepend", "cas": + return &StorageCommand{session: s} + case "get", "gets": + return &RetrievalCommand{session: s} + case "delete": + return &DeleteCommand{session: s} + case "touch": + return &TouchCommand{session: s} + case "incr", "decr": + return &IncrCommand{session: s} + case "stats", "flush_all", "version", "quit": + return &UninmplementedCommand{session: s, command: name} + default: + return &UnknownCommand{session: s, command: name} + } + //not reaching here + return nil } ////////////////////////////// ERROR COMMANDS ////////////////////////////// /* a function to reply errors to client that always returns false */ func Error(s *Session, errtype int, errdesc string) bool { - var msg string - switch errtype { - case InvalidCommand: msg = "ERROR\r\n" - case ClientError: msg = "CLIENT_ERROR " + errdesc + "\r\n" - case ServerError: msg = "SERVER_ERROR " + errdesc + "\r\n" - } - s.conn.Write([]byte(msg)) - return false + var msg string + switch errtype { + case InvalidCommand: + msg = "ERROR\r\n" + case ClientError: + msg = "CLIENT_ERROR " + errdesc + "\r\n" + case ServerError: + msg = "SERVER_ERROR " + errdesc + "\r\n" + } + s.conn.Write([]byte(msg)) + return false } -func (self *UnknownCommand) parse(line []string) bool{ - return Error(self.session, InvalidCommand, "") +func (self *UnknownCommand) parse(line []string) bool { + return Error(self.session, InvalidCommand, "") } func (self *UnknownCommand) Exec() { } func (self *UninmplementedCommand) parse(line []string) bool { - return Error(self.session, ServerError, "Not Implemented") + return Error(self.session, ServerError, "Not Implemented") } func (self *UninmplementedCommand) Exec() { @@ -158,89 +157,89 @@ func (self *UninmplementedCommand) Exec() { ///////////////////////////// TOUCH COMMAND ////////////////////////////// -const secondsInMonth = 60*60*24*30 +const secondsInMonth = 60 * 60 * 24 * 30 func (self *TouchCommand) parse(line []string) bool { - var exptime uint64 - var err os.Error - if len(line) < 3 { - return Error(self.session, ClientError, "Bad touch command: missing parameters") - } else if exptime, err = strconv.Atoui64(line[2]); err != nil { - return Error(self.session, ClientError, "Bad touch command: bad expiration time") - } - self.command = line[0] - self.key = line[1] - if exptime == 0 || exptime > secondsInMonth { - self.exptime = uint32(exptime) - } else { - self.exptime = uint32(time.Seconds()) + uint32(exptime); - } - if line[len(line)-1] == "noreply" { - self.noreply = true - } - return true + var exptime uint64 + var err error + if len(line) < 3 { + return Error(self.session, ClientError, "Bad touch command: missing parameters") + } else if exptime, err = strconv.ParseUint(line[2], 10, 64); err != nil { + return Error(self.session, ClientError, "Bad touch command: bad expiration time") + } + self.command = line[0] + self.key = line[1] + if exptime == 0 || exptime > secondsInMonth { + self.exptime = uint32(exptime) + } else { + self.exptime = uint32(time.Now().Unix()) + uint32(exptime) + } + if line[len(line)-1] == "noreply" { + self.noreply = true + } + return true } func (self *TouchCommand) Exec() { - logger.Printf("Touch: command: %s, key: %s, , exptime %d, noreply: %t", - self.command, self.key, self.exptime, self.noreply) + logger.Printf("Touch: command: %s, key: %s, , exptime %d, noreply: %t", + self.command, self.key, self.exptime, self.noreply) } ///////////////////////////// DELETE COMMAND //////////////////////////// func (self *DeleteCommand) parse(line []string) bool { - if len(line) < 2 { - return Error(self.session, ClientError, "Bad delete command: missing parameters") - } - self.command = line[0] - self.key = line[1] - if line[len(line)-1] == "noreply" { - self.noreply = true - } - return true + if len(line) < 2 { + return Error(self.session, ClientError, "Bad delete command: missing parameters") + } + self.command = line[0] + self.key = line[1] + if line[len(line)-1] == "noreply" { + self.noreply = true + } + return true } func (self *DeleteCommand) Exec() { -// logger.Printf("Delete: command: %s, key: %s, noreply: %t", -// self.command, self.key, self.noreply) - var storage = self.session.storage - var conn = self.session.conn - if err, _ := storage.Delete(self.key) ; err != Ok && !self.noreply { - conn.Write([]byte("NOT_FOUND\r\n")) - } else if (err == Ok && !self.noreply) { - conn.Write([]byte("DELETED\r\n")) - } + // logger.Printf("Delete: command: %s, key: %s, noreply: %t", + // self.command, self.key, self.noreply) + var storage = self.session.storage + var conn = self.session.conn + if err, _ := storage.Delete(self.key); err != Ok && !self.noreply { + conn.Write([]byte("NOT_FOUND\r\n")) + } else if err == Ok && !self.noreply { + conn.Write([]byte("DELETED\r\n")) + } } ///////////////////////////// RETRIEVAL COMMANDS //////////////////////////// func (self *RetrievalCommand) parse(line []string) bool { - if len(line) < 2 { - return Error(self.session, ClientError, "Bad retrieval command: missing parameters") - } - self.command = line[0] - self.keys = line[1:] - return true + if len(line) < 2 { + return Error(self.session, ClientError, "Bad retrieval command: missing parameters") + } + self.command = line[0] + self.keys = line[1:] + return true } func (self *RetrievalCommand) Exec() { -// logger.Printf("Retrieval: command: %s, keys: %s", -// self.command, self.keys) - var storage = self.session.storage - var conn = self.session.conn - showAll := self.command == "gets" - for i := 0; i < len(self.keys); i++ { - if err, entry := storage.Get(self.keys[i]); err == Ok { - if showAll { - conn.Write([]byte(fmt.Sprintf("VALUE %s %d %d %d\r\n", self.keys[i], entry.flags, entry.bytes, entry.cas_unique))) - } else { - conn.Write([]byte(fmt.Sprintf("VALUE %s %d %d\r\n", self.keys[i], entry.flags, entry.bytes))) - } - conn.Write(entry.content) - conn.Write([]byte("\r\n")) - } - } - conn.Write([]byte("END\r\n")) + // logger.Printf("Retrieval: command: %s, keys: %s", + // self.command, self.keys) + var storage = self.session.storage + var conn = self.session.conn + showAll := self.command == "gets" + for i := 0; i < len(self.keys); i++ { + if err, entry := storage.Get(self.keys[i]); err == Ok { + if showAll { + conn.Write([]byte(fmt.Sprintf("VALUE %s %d %d %d\r\n", self.keys[i], entry.flags, entry.bytes, entry.cas_unique))) + } else { + conn.Write([]byte(fmt.Sprintf("VALUE %s %d %d\r\n", self.keys[i], entry.flags, entry.bytes))) + } + conn.Write(entry.content) + conn.Write([]byte("\r\n")) + } + } + conn.Write([]byte("END\r\n")) } ///////////////////////////// STORAGE COMMANDS ///////////////////////////// @@ -248,148 +247,148 @@ func (self *RetrievalCommand) Exec() { /* parse a storage command parameters and read the related data returns a flag indicating sucesss */ func (self *StorageCommand) parse(line []string) bool { - var flags, exptime, bytes, casuniq uint64 - var err os.Error - if len(line) < 5 { - return Error(self.session, ClientError, "Bad storage command: missing parameters") - } else if flags, err = strconv.Atoui64(line[2]); err != nil { - return Error(self.session, ClientError, "Bad storage command: bad flags") - } else if exptime, err = strconv.Atoui64(line[3]); err != nil { - return Error(self.session, ClientError, "Bad storage command: bad expiration time") - } else if bytes, err = strconv.Atoui64(line[4]); err != nil { - return Error(self.session, ClientError, "Bad storage command: bad byte-length") - } else if line[0] == "cas" { - if casuniq, err = strconv.Atoui64(line[5]); err != nil { - return Error(self.session, ClientError, "Bad storage command: bad cas value") - } - } - self.command = line[0] - self.key = line[1] - self.flags = uint32(flags) - if exptime == 0 || exptime > secondsInMonth { - self.exptime = uint32(exptime) - } else { - self.exptime = uint32(time.Seconds()) + uint32(exptime); - } - self.bytes = uint32(bytes) - self.cas_unique = casuniq - if line[len(line)-1] == "noreply" { - self.noreply = true - } - return self.readData() + var flags, exptime, bytes, casuniq uint64 + var err error + if len(line) < 5 { + return Error(self.session, ClientError, "Bad storage command: missing parameters") + } else if flags, err = strconv.ParseUint(line[2], 10, 64); err != nil { + return Error(self.session, ClientError, "Bad storage command: bad flags") + } else if exptime, err = strconv.ParseUint(line[3], 10, 64); err != nil { + return Error(self.session, ClientError, "Bad storage command: bad expiration time") + } else if bytes, err = strconv.ParseUint(line[4], 10, 64); err != nil { + return Error(self.session, ClientError, "Bad storage command: bad byte-length") + } else if line[0] == "cas" { + if casuniq, err = strconv.ParseUint(line[5], 10, 64); err != nil { + return Error(self.session, ClientError, "Bad storage command: bad cas value") + } + } + self.command = line[0] + self.key = line[1] + self.flags = uint32(flags) + if exptime == 0 || exptime > secondsInMonth { + self.exptime = uint32(exptime) + } else { + self.exptime = uint32(time.Now().Unix()) + uint32(exptime) + } + self.bytes = uint32(bytes) + self.cas_unique = casuniq + if line[len(line)-1] == "noreply" { + self.noreply = true + } + return self.readData() } /* read the data for a storage command and return a flag indicating success */ func (self *StorageCommand) readData() bool { - if self.bytes <= 0 { - return Error(self.session, ClientError, "Bad storage operation: trying to read 0 bytes") - } else { - self.data = make([]byte, self.bytes + 2) // \r\n is always present at the end - } - var reader = self.session.bufreader - // read all the data - for offset := 0; offset < int(self.bytes); { - if nread, err := reader.Read(self.data[offset:]); err != nil { - return Error(self.session, ServerError, "Failed to read data") - } else { - offset += nread - } - } - if string(self.data[len(self.data)-2:]) != "\r\n" { - return Error(self.session, ClientError, "Bad storage operation: bad data chunk") - } - self.data = self.data[:len(self.data)-2] // strip \n\r - return true + if self.bytes <= 0 { + return Error(self.session, ClientError, "Bad storage operation: trying to read 0 bytes") + } else { + self.data = make([]byte, self.bytes+2) // \r\n is always present at the end + } + var reader = self.session.bufreader + // read all the data + for offset := 0; offset < int(self.bytes); { + if nread, err := reader.Read(self.data[offset:]); err != nil { + return Error(self.session, ServerError, "Failed to read data") + } else { + offset += nread + } + } + if string(self.data[len(self.data)-2:]) != "\r\n" { + return Error(self.session, ClientError, "Bad storage operation: bad data chunk") + } + self.data = self.data[:len(self.data)-2] // strip \n\r + return true } func (self *StorageCommand) Exec() { -/* logger.Printf("Storage: key: %s, flags: %d, exptime: %d, " + - "bytes: %d, cas: %d, noreply: %t, content: %s\n", - self.key, self.flags, self.exptime, self.bytes, - self.cas_unique, self.noreply, string(self.data)) -*/ - var storage = self.session.storage - var conn = self.session.conn - - switch self.command { - - case "set": - storage.Set(self.key, self.flags, self.exptime, self.bytes, self.data) - if !self.noreply { - conn.Write([]byte("STORED\r\n")) - } - return - case "add": - if err, _ := storage.Add(self.key, self.flags, self.exptime, self.bytes, self.data); err != Ok && !self.noreply { - conn.Write([]byte("NOT_STORED\r\n")) - } else if err == Ok && !self.noreply { - conn.Write([]byte("STORED\r\n")) - } - case "replace": - if err, _, _ := storage.Replace(self.key, self.flags, self.exptime, self.bytes, self.data) ; err != Ok && !self.noreply { - conn.Write([]byte("NOT_STORED\r\n")) - } else if err == Ok && !self.noreply { - conn.Write([]byte("STORED\r\n")) - } - case "append": - if err, _, _ := storage.Append(self.key, self.bytes, self.data) ; err != Ok && !self.noreply { - conn.Write([]byte("NOT_STORED\r\n")) - } else if err == Ok && !self.noreply { - conn.Write([]byte("STORED\r\n")) - } - case "prepend": - if err, _, _ := storage.Prepend(self.key, self.bytes, self.data) ; err != Ok && !self.noreply { - conn.Write([]byte("NOT_STORED\r\n")) - } else if err == Ok && !self.noreply { - conn.Write([]byte("STORED\r\n")) - } - case "cas": - if err, prev, _ := storage.Cas(self.key, self.flags, self.exptime, self.bytes, self.cas_unique, self.data) ; err != Ok && !self.noreply { - if prev != nil { - conn.Write([]byte("EXISTS\r\n")) - } else { - conn.Write([]byte("NOT_STORED\r\n")) - } - } else if err == Ok && !self.noreply { - conn.Write([]byte("STORED\r\n")) - } - } + /* logger.Printf("Storage: key: %s, flags: %d, exptime: %d, " + + "bytes: %d, cas: %d, noreply: %t, content: %s\n", + self.key, self.flags, self.exptime, self.bytes, + self.cas_unique, self.noreply, string(self.data)) + */ + var storage = self.session.storage + var conn = self.session.conn + + switch self.command { + + case "set": + storage.Set(self.key, self.flags, self.exptime, self.bytes, self.data) + if !self.noreply { + conn.Write([]byte("STORED\r\n")) + } + return + case "add": + if err, _ := storage.Add(self.key, self.flags, self.exptime, self.bytes, self.data); err != Ok && !self.noreply { + conn.Write([]byte("NOT_STORED\r\n")) + } else if err == Ok && !self.noreply { + conn.Write([]byte("STORED\r\n")) + } + case "replace": + if err, _, _ := storage.Replace(self.key, self.flags, self.exptime, self.bytes, self.data); err != Ok && !self.noreply { + conn.Write([]byte("NOT_STORED\r\n")) + } else if err == Ok && !self.noreply { + conn.Write([]byte("STORED\r\n")) + } + case "append": + if err, _, _ := storage.Append(self.key, self.bytes, self.data); err != Ok && !self.noreply { + conn.Write([]byte("NOT_STORED\r\n")) + } else if err == Ok && !self.noreply { + conn.Write([]byte("STORED\r\n")) + } + case "prepend": + if err, _, _ := storage.Prepend(self.key, self.bytes, self.data); err != Ok && !self.noreply { + conn.Write([]byte("NOT_STORED\r\n")) + } else if err == Ok && !self.noreply { + conn.Write([]byte("STORED\r\n")) + } + case "cas": + if err, prev, _ := storage.Cas(self.key, self.flags, self.exptime, self.bytes, self.cas_unique, self.data); err != Ok && !self.noreply { + if prev != nil { + conn.Write([]byte("EXISTS\r\n")) + } else { + conn.Write([]byte("NOT_STORED\r\n")) + } + } else if err == Ok && !self.noreply { + conn.Write([]byte("STORED\r\n")) + } + } } ///////////////////////////// INCR/DECR COMMANDS ///////////////////////////// func (self *IncrCommand) parse(line []string) bool { - var err os.Error - if len(line) < 3 { - return Error(self.session, ClientError, "Bad incr/decr command: missing parameters") - } else if self.value, err = strconv.Atoui64(line[2]); err != nil { - return Error(self.session, ClientError, "Bad incr/decr command: bad value") - } - self.incr = (line[0] == "incr") - self.key = line[1] - - if len(line) == 4 && line[3] == "noreply" { - self.noreply = true - } else { - self.noreply = false - } - return true + var err error + if len(line) < 3 { + return Error(self.session, ClientError, "Bad incr/decr command: missing parameters") + } else if self.value, err = strconv.ParseUint(line[2], 10, 64); err != nil { + return Error(self.session, ClientError, "Bad incr/decr command: bad value") + } + self.incr = (line[0] == "incr") + self.key = line[1] + + if len(line) == 4 && line[3] == "noreply" { + self.noreply = true + } else { + self.noreply = false + } + return true } func (self *IncrCommand) Exec() { - var storage = self.session.storage - var conn = self.session.conn - err, _, current := storage.Incr(self.key, self.value, self.incr) - if self.noreply { return } - if err == Ok { - conn.Write(current.content) - conn.Write([]byte("\r\n")) - } else if err == KeyNotFound { - //not reaching here - conn.Write([]byte("NOT_FOUND\r\n")) - } else if err == IllegalParameter { - conn.Write([]byte(fmt.Sprintf("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"))) - } + var storage = self.session.storage + var conn = self.session.conn + err, _, current := storage.Incr(self.key, self.value, self.incr) + if self.noreply { + return + } + if err == Ok { + conn.Write(current.content) + conn.Write([]byte("\r\n")) + } else if err == KeyNotFound { + //not reaching here + conn.Write([]byte("NOT_FOUND\r\n")) + } else if err == IllegalParameter { + conn.Write([]byte(fmt.Sprintf("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"))) + } } - - diff --git a/eventnotifierstorage.go b/eventnotifierstorage.go index 2078004..1ab34e2 100644 --- a/eventnotifierstorage.go +++ b/eventnotifierstorage.go @@ -1,93 +1,93 @@ package main type EventNotifierStorage struct { - updatesChannel chan UpdateMessage - storage CacheStorage + updatesChannel chan UpdateMessage + storage CacheStorage } type UpdateMessage struct { - op int - key string - currentEpoch int64 - newEpoch int64 + op int + key string + currentEpoch int64 + newEpoch int64 } const ( - Delete = iota - Add - Change - Collect + Delete = iota + Add + Change + Collect ) func updateMessageLogger(updatesChannel chan UpdateMessage) { - for { - m := <-updatesChannel - logger.Printf("New message: op: %d, key: %s, currentEpoch: %d, newEpoch: %d", m.op, m.key, m.currentEpoch, m.newEpoch) - } + for { + m := <-updatesChannel + logger.Printf("New message: op: %d, key: %s, currentEpoch: %d, newEpoch: %d", m.op, m.key, m.currentEpoch, m.newEpoch) + } } func newEventNotifierStorage(storage CacheStorage, updatesChannel chan UpdateMessage) *EventNotifierStorage { - return &EventNotifierStorage{updatesChannel, storage} + return &EventNotifierStorage{updatesChannel, storage} } func (self *EventNotifierStorage) Set(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (*StorageEntry, *StorageEntry) { - previous, updated := self.storage.Set(key, flags, exptime, bytes, content) - if (previous != nil) { - self.updatesChannel <- UpdateMessage{Change, key, int64(previous.exptime), int64(exptime)} - } else { - self.updatesChannel <- UpdateMessage{Add, key, 0, int64(exptime)} - } - return previous, updated + previous, updated := self.storage.Set(key, flags, exptime, bytes, content) + if previous != nil { + self.updatesChannel <- UpdateMessage{Change, key, int64(previous.exptime), int64(exptime)} + } else { + self.updatesChannel <- UpdateMessage{Add, key, 0, int64(exptime)} + } + return previous, updated } func (self *EventNotifierStorage) Add(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (ErrorCode, *StorageEntry) { - err, updatedEntry := self.storage.Add(key, flags, exptime, bytes, content) - if (err == Ok) { - self.updatesChannel <- UpdateMessage{Add, key, 0, int64(exptime)} - } - return err, updatedEntry + err, updatedEntry := self.storage.Add(key, flags, exptime, bytes, content) + if err == Ok { + self.updatesChannel <- UpdateMessage{Add, key, 0, int64(exptime)} + } + return err, updatedEntry } func (self *EventNotifierStorage) Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { - err, prev, updated := self.storage.Replace(key, flags, exptime, bytes, content) - if (err == Ok) { - self.updatesChannel <- UpdateMessage{Change, key, int64(prev.exptime), int64(exptime)} - } - return err, prev, updated + err, prev, updated := self.storage.Replace(key, flags, exptime, bytes, content) + if err == Ok { + self.updatesChannel <- UpdateMessage{Change, key, int64(prev.exptime), int64(exptime)} + } + return err, prev, updated } func (self *EventNotifierStorage) Append(key string, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { - return self.storage.Append(key, bytes, content) + return self.storage.Append(key, bytes, content) } func (self *EventNotifierStorage) Prepend(key string, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { - return self.storage.Prepend(key, bytes, content) + return self.storage.Prepend(key, bytes, content) } func (self *EventNotifierStorage) Cas(key string, flags uint32, exptime uint32, bytes uint32, cas_unique uint64, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { - err, prev, updated := self.storage.Cas(key, flags, exptime, bytes, cas_unique, content) - if (err == Ok) { - self.updatesChannel <- UpdateMessage{Change, key, int64(prev.exptime), int64(exptime)} - } - return err, prev, updated + err, prev, updated := self.storage.Cas(key, flags, exptime, bytes, cas_unique, content) + if err == Ok { + self.updatesChannel <- UpdateMessage{Change, key, int64(prev.exptime), int64(exptime)} + } + return err, prev, updated } func (self *EventNotifierStorage) Get(key string) (err ErrorCode, result *StorageEntry) { - return self.storage.Get(key) + return self.storage.Get(key) } func (self *EventNotifierStorage) Delete(key string) (ErrorCode, *StorageEntry) { - err, deleted := self.storage.Delete(key) - if (err == Ok) { - self.updatesChannel <- UpdateMessage{Delete, key, int64(deleted.exptime), 0} - } - return err, deleted + err, deleted := self.storage.Delete(key) + if err == Ok { + self.updatesChannel <- UpdateMessage{Delete, key, int64(deleted.exptime), 0} + } + return err, deleted } func (self *EventNotifierStorage) Incr(key string, value uint64, incr bool) (ErrorCode, *StorageEntry, *StorageEntry) { - return self.storage.Incr(key, value, incr) + return self.storage.Incr(key, value, incr) } func (self *EventNotifierStorage) Expire(key string, check bool) { - self.storage.Expire(key, check) + self.storage.Expire(key, check) } diff --git a/generationalstorage.go b/generationalstorage.go index 133284e..bcc7497 100644 --- a/generationalstorage.go +++ b/generationalstorage.go @@ -1,135 +1,132 @@ package main import ( - "time" - "fmt" + "fmt" + "time" ) const ( - GCDelay = 60 - GenerationSize = 60 - StorageThreshold = 5000 + GCDelay = 60 + GenerationSize = 60 + StorageThreshold = 5000 ) var timer = func(updatesChannel chan UpdateMessage, frequency int64) { - for { - time.Sleep(1e9 * frequency) // one second * GCDelay - updatesChannel <- UpdateMessage{Collect, "", time.Seconds(), 0} - } + for { + time.Sleep(time.Duration(1e9 * frequency)) // one second * GCDelay + updatesChannel <- UpdateMessage{Collect, "", time.Now().Unix(), 0} + } } func (self *UpdateMessage) getCurrentTimeSlot() int64 { - return roundTime(self.currentEpoch) + return roundTime(self.currentEpoch) } func (self *UpdateMessage) getNewTimeSlot() int64 { - return roundTime(self.newEpoch) + return roundTime(self.newEpoch) } func roundTime(time int64) int64 { - return time - (time % GenerationSize) + GenerationSize + return time - (time % GenerationSize) + GenerationSize } type Generation struct { - startEpoch int64 - inhabitants map[string] bool + startEpoch int64 + inhabitants map[string]bool } func (self *Generation) String() string { - r := fmt.Sprintf("Generation [%s-%s]", time.SecondsToUTC(self.startEpoch), time.SecondsToUTC(self.startEpoch + GenerationSize)) - for key,_ := range(self.inhabitants) { - r += fmt.Sprintf("\n %s", key) - } - return r + r := fmt.Sprintf("Generation [%s-%s]", time.Unix(self.startEpoch, 0).UTC(), time.Unix(self.startEpoch+GenerationSize, 0).UTC()) + for key, _ := range self.inhabitants { + r += fmt.Sprintf("\n %s", key) + } + return r } func newGeneration(epoch int64) *Generation { - return &Generation{epoch, make(map[string] bool)} + return &Generation{epoch, make(map[string]bool)} } type GenerationalStorage struct { - generations map[int64] *Generation - updatesChannel chan UpdateMessage - cacheStorage CacheStorage - lastCollected int64 - items uint64 + generations map[int64]*Generation + updatesChannel chan UpdateMessage + cacheStorage CacheStorage + lastCollected int64 + items uint64 } func newGenerationalStorage(expiring_frequency int64, cacheStorage CacheStorage, updatesChannel chan UpdateMessage) *GenerationalStorage { - storage := &GenerationalStorage{ make(map [int64] *Generation), updatesChannel, cacheStorage, roundTime(time.Seconds()) - GenerationSize, 0 } - go timer(updatesChannel, expiring_frequency) - go processNodeChanges(storage, updatesChannel) - return storage; + storage := &GenerationalStorage{make(map[int64]*Generation), updatesChannel, cacheStorage, roundTime(time.Now().Unix()) - GenerationSize, 0} + go timer(updatesChannel, expiring_frequency) + go processNodeChanges(storage, updatesChannel) + return storage } func (self *GenerationalStorage) removeGenerationToCollect(now int64) *Generation { - if now >= self.lastCollected + GenerationSize { - gen := self.generations[now] - self.generations[now] = nil, false - self.lastCollected += GenerationSize - return gen - } - return nil + if now >= self.lastCollected+GenerationSize { + gen := self.generations[now] + delete(self.generations, now) + self.lastCollected += GenerationSize + return gen + } + return nil } func (self *GenerationalStorage) findGeneration(timeSlot int64, createIfNotExists bool) *Generation { - generation := self.generations[timeSlot] - if generation == nil && createIfNotExists { - generation = newGeneration(timeSlot) - self.generations[timeSlot] = generation - } - return generation + generation := self.generations[timeSlot] + if generation == nil && createIfNotExists { + generation = newGeneration(timeSlot) + self.generations[timeSlot] = generation + } + return generation } func (self *Generation) addInhabitant(key string) { - self.inhabitants[key] = true + self.inhabitants[key] = true } func processNodeChanges(storage *GenerationalStorage, channel <-chan UpdateMessage /*, ticker *time.Ticker*/) { - for { - msg := <-channel - switch msg.op { - case Add: - timeSlot := msg.getNewTimeSlot() - generation := storage.findGeneration(timeSlot, true) - generation.inhabitants[msg.key] = true - storage.items += 1 - case Delete: - timeSlot := msg.getCurrentTimeSlot() - if generation := storage.findGeneration(timeSlot, false); generation != nil { - generation.inhabitants[msg.key] = false, false - storage.items -= 1 - } - case Change: - timeSlot := msg.getCurrentTimeSlot() - if generation := storage.findGeneration(timeSlot, false); generation != nil { - generation.inhabitants[msg.key] = false, false - } - newTimeSlot := msg.getNewTimeSlot() - generation := storage.findGeneration(newTimeSlot, true) - generation.addInhabitant(msg.key) - case Collect: - logger.Println("Processing Collect message") - for { - generation := storage.removeGenerationToCollect(msg.getCurrentTimeSlot()- GenerationSize) - if generation == nil { - break - } - for key , _ := range(generation.inhabitants) { - storage.cacheStorage.Expire(key, false) - storage.items -= 1 - } - } - if storage.items > StorageThreshold { - permGen := storage.findGeneration(GenerationSize, true) - storage.generations[GenerationSize] = nil, false - for key , _ := range(permGen.inhabitants) { - storage.cacheStorage.Expire(key, false) - storage.items -= 1 - } - logger.Printf("Memory pressure. Collecting %d expiring items. %d items on permanent generation", storage.items) - } - logger.Printf("No more items to collect. %d Items", storage.items) - } - } + for { + msg := <-channel + switch msg.op { + case Add: + timeSlot := msg.getNewTimeSlot() + generation := storage.findGeneration(timeSlot, true) + generation.inhabitants[msg.key] = true + storage.items += 1 + case Delete: + timeSlot := msg.getCurrentTimeSlot() + if generation := storage.findGeneration(timeSlot, false); generation != nil { + delete(generation.inhabitants, msg.key) + storage.items -= 1 + } + case Change: + timeSlot := msg.getCurrentTimeSlot() + if generation := storage.findGeneration(timeSlot, false); generation != nil { + delete(generation.inhabitants, msg.key) + } + newTimeSlot := msg.getNewTimeSlot() + generation := storage.findGeneration(newTimeSlot, true) + generation.addInhabitant(msg.key) + case Collect: + for { + generation := storage.removeGenerationToCollect(msg.getCurrentTimeSlot() - GenerationSize) + if generation == nil { + break + } + for key, _ := range generation.inhabitants { + storage.cacheStorage.Expire(key, false) + storage.items -= 1 + } + } + if storage.items > StorageThreshold { + permGen := storage.findGeneration(GenerationSize, true) + delete(storage.generations, GenerationSize) + for key, _ := range permGen.inhabitants { + storage.cacheStorage.Expire(key, false) + storage.items -= 1 + } + } + } + } } diff --git a/gocached.go b/gocached.go index 205462f..98d065e 100644 --- a/gocached.go +++ b/gocached.go @@ -2,53 +2,52 @@ package main import ( "flag" - "os" "log" "net" - /*"runtime"*/ - /*"runtime/pprof"*/ + "os" + /*"runtime"*/ + /*"runtime/pprof"*/ ) //global logger var logger = log.New(os.Stdout, "gocached: ", log.Lshortfile|log.LstdFlags) // specific typing for base storage factory, just build a map cache storage -func base_storage_factory () CacheStorage { return newMapCacheStorage() } +func base_storage_factory() CacheStorage { return newMapCacheStorage() } func main() { - /*runtime.GOMAXPROCS(1)*/ + /*runtime.GOMAXPROCS(1)*/ // command line flags and parsing var port = flag.String("port", "11212", "memcached port") - /*var memprofile = flag.String("memprofile", "", "write memory profile to this file")*/ + /*var memprofile = flag.String("memprofile", "", "write memory profile to this file")*/ - var storage_choice = flag.String("storage", "generational", + var storage_choice = flag.String("storage", "generational", "storage implementation (generational, heap, leak)") var expiring_frequency = flag.Int64("expiring-interval", 10, "expiring interval in seconds") - var partitions = flag.Int("partitions", 10, - "storage partitions (0 or 1 to disable)") + var partitions = flag.Int("partitions", 10, + "storage partitions (0 or 1 to disable)") flag.Parse() - - /*if *memprofile != "" {*/ - /*defer func() {*/ - /*f, err := os.Create(*memprofile)*/ - /*if err != nil {*/ - /*log.Fatal(err)*/ - /*}*/ - /*pprof.WriteHeapProfile(f)*/ - /*f.Close()*/ - /*}()*/ - /*}*/ + /*if *memprofile != "" {*/ + /*defer func() {*/ + /*f, err := os.Create(*memprofile)*/ + /*if err != nil {*/ + /*log.Fatal(err)*/ + /*}*/ + /*pprof.WriteHeapProfile(f)*/ + /*f.Close()*/ + /*}()*/ + /*}*/ // whether using partitioned or single storage - var partition_storage CacheStorage - var eventful_storage CacheStorage + var partition_storage CacheStorage + var eventful_storage CacheStorage if *partitions > 1 { - partition_storage = newHashingStorage(uint32(*partitions), base_storage_factory) + partition_storage = newHashingStorage(uint32(*partitions), base_storage_factory) } else { partition_storage = base_storage_factory() } @@ -57,16 +56,16 @@ func main() { switch *storage_choice { case "leak": logger.Print("warning, will not expire entries") - eventful_storage = partition_storage + eventful_storage = partition_storage case "generational": - updatesChannel := make(chan UpdateMessage, 5000) - eventful_storage = newEventNotifierStorage(partition_storage, updatesChannel) - newGenerationalStorage(*expiring_frequency, partition_storage, updatesChannel) + updatesChannel := make(chan UpdateMessage, 5000) + eventful_storage = newEventNotifierStorage(partition_storage, updatesChannel) + newGenerationalStorage(*expiring_frequency, partition_storage, updatesChannel) case "heap": - updatesChannel := make(chan UpdateMessage, 5000) - eventful_storage = newEventNotifierStorage(partition_storage, updatesChannel) - NewHeapExpiringStorage(*expiring_frequency, partition_storage, updatesChannel) - } + updatesChannel := make(chan UpdateMessage, 5000) + eventful_storage = newEventNotifierStorage(partition_storage, updatesChannel) + NewHeapExpiringStorage(*expiring_frequency, partition_storage, updatesChannel) + } // network setup if addr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:"+*port); err != nil { @@ -74,17 +73,17 @@ func main() { } else if listener, err := net.ListenTCP("tcp", addr); err != nil { logger.Fatalln("Unable to listen on requested port") } else { - // server loop - logger.Printf("Starting Gocached server") - /*for i := 0; i < 21; i++ {*/ - for { - if conn, err := listener.AcceptTCP(); err != nil { - logger.Println("An error ocurred accepting a new connection") - } else { - go clientHandler(conn, eventful_storage) - } - } - } + // server loop + logger.Printf("Starting Gocached server") + /*for i := 0; i < 21; i++ {*/ + for { + if conn, err := listener.AcceptTCP(); err != nil { + logger.Println("An error ocurred accepting a new connection") + } else { + go clientHandler(conn, eventful_storage) + } + } + } } func clientHandler(conn *net.TCPConn, store CacheStorage) { @@ -92,6 +91,6 @@ func clientHandler(conn *net.TCPConn, store CacheStorage) { if session, err := NewSession(conn, store); err != nil { logger.Println("An error ocurred creating a new session") } else { - session.CommandLoop() + session.CommandLoop() } } diff --git a/hashingstorage.go b/hashingstorage.go index e08176c..31cba5c 100644 --- a/hashingstorage.go +++ b/hashingstorage.go @@ -1,72 +1,72 @@ package main -type Hasher func (string) uint32 +type Hasher func(string) uint32 type HashingStorage struct { - size uint32 - hasher Hasher - storageBuckets []CacheStorage + size uint32 + hasher Hasher + storageBuckets []CacheStorage } func newHashingStorage(size uint32, factory CacheStorageFactory) *HashingStorage { - s := &HashingStorage{size, hornerHasher, make([]CacheStorage, size)} - for i := uint32(0); i < size; i++ { - s.storageBuckets[i] = factory() - } - return s + s := &HashingStorage{size, hornerHasher, make([]CacheStorage, size)} + for i := uint32(0); i < size; i++ { + s.storageBuckets[i] = factory() + } + return s } func (self *HashingStorage) Set(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (previous *StorageEntry, result *StorageEntry) { - return self.findBucket(key).Set(key, flags, exptime, bytes, content) + return self.findBucket(key).Set(key, flags, exptime, bytes, content) } func (self *HashingStorage) Add(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (err ErrorCode, result *StorageEntry) { - return self.findBucket(key).Add(key, flags, exptime, bytes, content) + return self.findBucket(key).Add(key, flags, exptime, bytes, content) } -func (self *HashingStorage) Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (ErrorCode,*StorageEntry,*StorageEntry) { - return self.findBucket(key).Replace(key, flags, exptime, bytes, content) +func (self *HashingStorage) Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { + return self.findBucket(key).Replace(key, flags, exptime, bytes, content) } -func (self *HashingStorage) Append(key string, bytes uint32, content []byte) (ErrorCode,*StorageEntry,*StorageEntry) { - return self.findBucket(key).Append(key, bytes, content) +func (self *HashingStorage) Append(key string, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { + return self.findBucket(key).Append(key, bytes, content) } func (self *HashingStorage) Prepend(key string, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { - return self.findBucket(key).Prepend(key, bytes, content) + return self.findBucket(key).Prepend(key, bytes, content) } func (self *HashingStorage) Cas(key string, flags uint32, exptime uint32, bytes uint32, cas_unique uint64, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { - return self.findBucket(key).Cas(key, flags, exptime, bytes, cas_unique, content) + return self.findBucket(key).Cas(key, flags, exptime, bytes, cas_unique, content) } func (self *HashingStorage) Get(key string) (ErrorCode, *StorageEntry) { - return self.findBucket(key).Get(key) + return self.findBucket(key).Get(key) } func (self *HashingStorage) Delete(key string) (ErrorCode, *StorageEntry) { - return self.findBucket(key).Delete(key) + return self.findBucket(key).Delete(key) } func (self *HashingStorage) Incr(key string, value uint64, incr bool) (ErrorCode, *StorageEntry, *StorageEntry) { - return self.findBucket(key).Incr(key, value, incr) + return self.findBucket(key).Incr(key, value, incr) } func (self *HashingStorage) Expire(key string, check bool) { - self.findBucket(key).Expire(key, check) + self.findBucket(key).Expire(key, check) } func (self *HashingStorage) findBucket(key string) CacheStorage { - storageIndex := self.hasher(key) % self.size - storage := self.storageBuckets[storageIndex] - // logger.Printf("Using storage %d", storageIndex) - return storage + storageIndex := self.hasher(key) % self.size + storage := self.storageBuckets[storageIndex] + // logger.Printf("Using storage %d", storageIndex) + return storage } var hornerHasher = func(value string) uint32 { - var hashcode uint32 = 1 - for i := 0; i < len(value); i++ { - hashcode += (hashcode * 31) + uint32(value[i]) - } - return hashcode + var hashcode uint32 = 1 + for i := 0; i < len(value); i++ { + hashcode += (hashcode * 31) + uint32(value[i]) + } + return hashcode } diff --git a/heapexpiringstorage.go b/heapexpiringstorage.go index 9cce921..fbde101 100644 --- a/heapexpiringstorage.go +++ b/heapexpiringstorage.go @@ -1,29 +1,29 @@ package main import ( - "expiry" - "time" "container/heap" + "gocached/expiry" + "time" ) //Implements a CacheStorage interface with entry expiration. type HeapExpiringStorage struct { - CacheStorage - updatesChannel chan UpdateMessage - heap *expiry.Heap + CacheStorage + updatesChannel chan UpdateMessage + heap *expiry.Heap } func (hs *HeapExpiringStorage) ProcessUpdates() { - for { - msg := <-hs.updatesChannel - switch msg.op { - case Add, Change: - hs.AddEntry(expiry.Entry{&msg.key, uint32(msg.newEpoch)}, uint32(msg.currentEpoch)) - case Collect: - logger.Println("Collecting expired entries") - hs.Collect(uint32(msg.currentEpoch)) - } - } + for { + msg := <-hs.updatesChannel + switch msg.op { + case Add, Change: + hs.AddEntry(expiry.Entry{&msg.key, uint32(msg.newEpoch)}, uint32(msg.currentEpoch)) + case Collect: + logger.Println("Collecting expired entries") + hs.Collect(uint32(msg.currentEpoch)) + } + } } //Update. Given an exptime update, stores a new entry in a exptime ordered heap @@ -53,22 +53,22 @@ func (hs *HeapExpiringStorage) Collect(now uint32) { //Allocate a new HeapExpiringStorage and Initialize it func NewHeapExpiringStorage(collect_frequency int64, cacheStorage CacheStorage, updatesChannel chan UpdateMessage) *HeapExpiringStorage { - hs := &HeapExpiringStorage{cacheStorage, updatesChannel, nil} - hs.Init(collect_frequency) - return hs + hs := &HeapExpiringStorage{cacheStorage, updatesChannel, nil} + hs.Init(collect_frequency) + return hs } + //Init an allocated HeapExpiringStorage func (hs *HeapExpiringStorage) Init(collect_frequency int64) { logger.Println("init heap notify storage") hs.heap = expiry.NewHeap(100) //TODO, size as config parameter go hs.CollectTicker(collect_frequency) - go hs.ProcessUpdates() + go hs.ProcessUpdates() } -func (hs *HeapExpiringStorage) CollectTicker(collect_frequency int64){ - for { - time.Sleep(1e9 * collect_frequency) - hs.updatesChannel <- UpdateMessage{Collect, "", time.Seconds(), 0} - } +func (hs *HeapExpiringStorage) CollectTicker(collect_frequency int64) { + for { + time.Sleep(time.Duration(1e9 * collect_frequency)) + hs.updatesChannel <- UpdateMessage{Collect, "", time.Now().Unix(), 0} + } } - diff --git a/mapcachestorage.go b/mapcachestorage.go index 6245fc3..71d7510 100644 --- a/mapcachestorage.go +++ b/mapcachestorage.go @@ -1,9 +1,9 @@ package main import ( - "sync" - "time" - "strconv" + "strconv" + "sync" + "time" ) type MapCacheStorage struct { @@ -12,9 +12,9 @@ type MapCacheStorage struct { } func newMapCacheStorage() *MapCacheStorage { - storage := &MapCacheStorage{} - storage.Init() - return storage + storage := &MapCacheStorage{} + storage.Init() + return storage } func (self *MapCacheStorage) Init() { @@ -22,11 +22,11 @@ func (self *MapCacheStorage) Init() { } func (self *StorageEntry) expired() bool { - if self.exptime == 0 { - return false - } - now := uint32(time.Seconds()) - return self.exptime <= now + if self.exptime == 0 { + return false + } + now := uint32(time.Now().Unix()) + return self.exptime <= now } func (self *MapCacheStorage) Set(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (previous *StorageEntry, result *StorageEntry) { @@ -36,8 +36,8 @@ func (self *MapCacheStorage) Set(key string, flags uint32, exptime uint32, bytes var newEntry *StorageEntry if present && !entry.expired() { newEntry = &StorageEntry{exptime, flags, bytes, entry.cas_unique + 1, content} - self.storageMap[key] = newEntry - return entry, newEntry + self.storageMap[key] = newEntry + return entry, newEntry } newEntry = &StorageEntry{exptime, flags, bytes, 0, content} self.storageMap[key] = newEntry @@ -51,12 +51,12 @@ func (self *MapCacheStorage) Add(key string, flags uint32, exptime uint32, bytes if present && !entry.expired() { return KeyAlreadyInUse, nil } - entry = &StorageEntry{exptime, flags, bytes, 0, content} + entry = &StorageEntry{exptime, flags, bytes, 0, content} self.storageMap[key] = entry return Ok, entry } -func (self *MapCacheStorage) Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (ErrorCode,*StorageEntry,*StorageEntry) { +func (self *MapCacheStorage) Replace(key string, flags uint32, exptime uint32, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { self.rwLock.Lock() defer self.rwLock.Unlock() entry, present := self.storageMap[key] @@ -68,7 +68,7 @@ func (self *MapCacheStorage) Replace(key string, flags uint32, exptime uint32, b return KeyNotFound, nil, nil } -func (self *MapCacheStorage) Append(key string, bytes uint32, content []byte) (ErrorCode,*StorageEntry,*StorageEntry) { +func (self *MapCacheStorage) Append(key string, bytes uint32, content []byte) (ErrorCode, *StorageEntry, *StorageEntry) { self.rwLock.Lock() defer self.rwLock.Unlock() entry, present := self.storageMap[key] @@ -122,7 +122,7 @@ func (self *MapCacheStorage) Get(key string) (ErrorCode, *StorageEntry) { if present && !entry.expired() { return Ok, entry } - return KeyNotFound, nil + return KeyNotFound, nil } func (self *MapCacheStorage) Delete(key string) (ErrorCode, *StorageEntry) { @@ -130,7 +130,7 @@ func (self *MapCacheStorage) Delete(key string) (ErrorCode, *StorageEntry) { defer self.rwLock.Unlock() entry, present := self.storageMap[key] if present && !entry.expired() { - self.storageMap[key] = nil, false + delete(self.storageMap, key) return Ok, entry } return KeyNotFound, nil @@ -140,26 +140,26 @@ func (self *MapCacheStorage) Incr(key string, value uint64, incr bool) (ErrorCod self.rwLock.Lock() defer self.rwLock.Unlock() entry, present := self.storageMap[key] - if present && !entry.expired() { - if addValue, err := strconv.Atoui64(string(entry.content)); err == nil { - var incrValue uint64 - if incr { - incrValue = uint64(addValue) + value - } else if value > addValue { - incrValue = 0 - } else { - incrValue = uint64(addValue) - value - } - incrStrValue := strconv.Uitoa64(incrValue) - old_value := entry.content - entry.content = []byte(incrStrValue) - entry.bytes = uint32(len(entry.content)) - entry.cas_unique += 1 - return Ok, &StorageEntry{entry.exptime, entry.flags, entry.bytes, entry.cas_unique, old_value}, entry - } else { - return IllegalParameter, nil, nil - } - } + if present && !entry.expired() { + if addValue, err := strconv.ParseUint(string(entry.content), 10, 64); err == nil { + var incrValue uint64 + if incr { + incrValue = uint64(addValue) + value + } else if value > addValue { + incrValue = 0 + } else { + incrValue = uint64(addValue) - value + } + incrStrValue := strconv.FormatUint(incrValue, 10) + old_value := entry.content + entry.content = []byte(incrStrValue) + entry.bytes = uint32(len(entry.content)) + entry.cas_unique += 1 + return Ok, &StorageEntry{entry.exptime, entry.flags, entry.bytes, entry.cas_unique, old_value}, entry + } else { + return IllegalParameter, nil, nil + } + } return KeyNotFound, nil, nil } @@ -169,8 +169,8 @@ var nullStorageEntry = &StorageEntry{} func (self *MapCacheStorage) Expire(key string, check bool) { self.rwLock.Lock() defer self.rwLock.Unlock() - entry, present := self.storageMap[key] + entry, present := self.storageMap[key] if present && (!check || entry.expired()) { - self.storageMap[key] = nullStorageEntry, false + delete(self.storageMap, key) } }