Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supports inmemory cache expiration for twindb #1293

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion grid-client/deployer/tf_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func NewTFPluginClient(
}

if !cfg.rmbInMemCache {
peerOpts = append(peerOpts, peer.WithTwinCache(10*60*60)) // in seconds that's 10 hours
peerOpts = append(peerOpts, peer.WithTmpCacheExpiration(10*60*60)) // in seconds that's 10 hours
}
rmbClient, err := peer.NewRpcClient(ctx, tfPluginClient.mnemonicOrSeed, manager, peerOpts...)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions rmb-sdk-go/peer/examples/peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func app() error {
relayCallback,
peer.WithRelay("wss://relay.dev.grid.tf"),
peer.WithSession("test-client"),
peer.WithInMemoryExpiration(6*60*60), // six hours
)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/examples/peer_pingmany/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
handler,
peer.WithKeyType(peer.KeyTypeSr25519),
peer.WithSession("rmb-playground999"),
peer.WithTwinCache(10*60*60), // in seconds that's 10 hours
peer.WithInMemoryExpiration(10*60*60), // in seconds that's 10 hours
)

if err != nil {
Expand Down
15 changes: 12 additions & 3 deletions rmb-sdk-go/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,24 @@ func WithEncoder(encoder encoder.Encoder) PeerOpt {
}

// WithTwinCache cache twin information for this ttl number of seconds
// by default twins are cached in memory forever
func WithTwinCache(ttl uint64) PeerOpt {
// if ttl == 0, twins are cached forever
func WithTmpCacheExpiration(ttl uint64) PeerOpt {
return func(pc *peerCfg) {
pc.cacheFactory = func(inner TwinDB, chainURL string) (TwinDB, error) {
return newTmpCache(ttl, inner, chainURL)
}
}
}

// if ttl == 0 twins are cached forever
func WithInMemoryExpiration(ttl uint64) PeerOpt {
return func(pc *peerCfg) {
pc.cacheFactory = func(inner TwinDB, chainURL string) (TwinDB, error) {
return newInMemoryCache(inner, ttl), nil
}
}
}

// Peer exposes the functionality to talk directly to an rmb relay
type Peer struct {
source *types.Address
Expand Down Expand Up @@ -158,7 +167,7 @@ func NewPeer(
enableEncryption: true,
keyType: KeyTypeSr25519,
cacheFactory: func(inner TwinDB, _ string) (TwinDB, error) {
return newInMemoryCache(inner), nil
return newInMemoryCache(inner, 0), nil
},
}

Expand Down
42 changes: 22 additions & 20 deletions rmb-sdk-go/peer/twindb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Twin struct {
PublicKey []byte
Relay *string
E2EKey []byte
Timestamp uint64
}

type twinDB struct {
Expand Down Expand Up @@ -71,32 +72,43 @@ func (t *twinDB) GetByPk(pk []byte) (uint32, error) {
return t.subConn.GetTwinByPubKey(pk)
}

// if ttl == 0, then the data will stay forever
type inMemoryCache struct {
cache map[uint32]Twin
inner TwinDB
m sync.RWMutex
ttl uint64
}

func newInMemoryCache(inner TwinDB) TwinDB {
func newInMemoryCache(inner TwinDB, ttl uint64) TwinDB {
return &inMemoryCache{
cache: make(map[uint32]Twin),
inner: inner,
ttl: ttl,
}
}

func (twin *Twin) isExpired(ttl uint64) bool {
age := uint64(time.Now().Unix()) - twin.Timestamp
if ttl != 0 && age > ttl {
log.Trace().Uint64("age", age).Msg("twin cache hit but expired")
return true
}
return false
}

func (m *inMemoryCache) Get(id uint32) (twin Twin, err error) {
m.m.RLock()
twin, ok := m.cache[id]
m.m.RUnlock()
if ok {
if ok && !twin.isExpired(m.ttl) {
return twin, nil
}

twin, err = m.inner.Get(id)
if err != nil {
return Twin{}, errors.Wrapf(err, "could not get twin with id %d", id)
}

twin.Timestamp = uint64(time.Now().Unix())
m.m.Lock()
m.cache[id] = twin
m.m.Unlock()
Expand All @@ -108,11 +120,6 @@ func (m *inMemoryCache) GetByPk(pk []byte) (uint32, error) {
return m.inner.GetByPk(pk)
}

type cachedTwin struct {
Twin
Timestamp uint64
}

type tmpCache struct {
base string
ttl uint64
Expand All @@ -136,7 +143,7 @@ func newTmpCache(ttl uint64, inner TwinDB, chainURL string) (TwinDB, error) {
}, nil
}

func (r *tmpCache) get(path string) (twin cachedTwin, err error) {
func (r *tmpCache) get(path string) (twin Twin, err error) {
data, err := os.ReadFile(path)

if os.IsNotExist(err) {
Expand All @@ -151,10 +158,7 @@ func (r *tmpCache) get(path string) (twin cachedTwin, err error) {
// crash on file corruption
return twin, errNoCache
}

age := uint64(time.Now().Unix()) - twin.Timestamp
if age > r.ttl {
log.Trace().Uint64("age", age).Msg("twin cache hit but expired")
if twin.isExpired(r.ttl) {
return twin, errNoCache
}

Expand All @@ -163,10 +167,7 @@ func (r *tmpCache) get(path string) (twin cachedTwin, err error) {
}

func (r *tmpCache) set(path string, twin Twin) error {
data, err := json.Marshal(cachedTwin{
Twin: twin,
Timestamp: uint64(time.Now().Unix()),
})
data, err := json.Marshal(twin)

if err != nil {
return err
Expand All @@ -178,13 +179,14 @@ func (r *tmpCache) set(path string, twin Twin) error {
func (r *tmpCache) Get(id uint32) (twin Twin, err error) {
path := filepath.Join(r.base, fmt.Sprint(id))

cached, err := r.get(path)
twin, err = r.get(path)
if err == errNoCache {
twin, err = r.inner.Get(id)
if err != nil {
return twin, err
}
// set cache
twin.Timestamp = uint64(time.Now().Unix())
if err := r.set(path, twin); err != nil {
log.Error().Err(err).Msg("failed to warm up cache")
}
Expand All @@ -193,7 +195,7 @@ func (r *tmpCache) Get(id uint32) (twin Twin, err error) {
return twin, err
}

return cached.Twin, nil
return twin, nil
}

func (r *tmpCache) GetByPk(pk []byte) (uint32, error) {
Expand Down
Loading