Skip to content

Commit

Permalink
enhance: purge client infos periodically (#31037) (#31092)
Browse files Browse the repository at this point in the history
#31007
pr: #31037 

---------

Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan authored Mar 8, 2024
1 parent 542b46f commit c37b779
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 5 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ proxy:
maxTaskNum: 1024 # max task number of proxy task queue
connectionCheckIntervalSeconds: 120 # the interval time(in seconds) for connection manager to scan inactive client info
connectionClientInfoTTLSeconds: 86400 # inactive client info TTL duration, in seconds
maxConnectionNum: 10000 # the max client info numbers that proxy should manage, avoid too many client infos.
accessLog:
enable: false
# Log filename, set as "" to use stdout.
Expand Down
46 changes: 41 additions & 5 deletions internal/proxy/connection/manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package connection

import (
"container/heap"
"context"
"strconv"
"sync"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -22,7 +23,6 @@ type connectionManager struct {
wg sync.WaitGroup

clientInfos *typeutil.ConcurrentMap[int64, clientInfo]
count atomic.Int64
}

func (s *connectionManager) init() {
Expand Down Expand Up @@ -52,19 +52,56 @@ func (s *connectionManager) checkLoop() {
return
case <-t.C:
s.removeLongInactiveClients()
// not sure if we should purge them periodically.
s.purgeIfNumOfClientsExceed()
t.Reset(paramtable.Get().ProxyCfg.ConnectionCheckIntervalSeconds.GetAsDuration(time.Second))
}
}
}

func (s *connectionManager) purgeIfNumOfClientsExceed() {
diffNum := int64(s.clientInfos.Len()) - paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64()
if diffNum <= 0 {
return
}

begin := time.Now()

log := log.With(
zap.Int64("num", int64(s.clientInfos.Len())),
zap.Int64("limit", paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64()))

log.Info("number of client infos exceed limit, ready to purge the oldest")
q := newPriorityQueueWithCap(int(diffNum + 1))
s.clientInfos.Range(func(identifier int64, info clientInfo) bool {
heap.Push(&q, newQueryItem(info.identifier, info.lastActiveTime))
if int64(q.Len()) > diffNum {
// pop the newest.
heap.Pop(&q)
}
return true
})

// time order doesn't matter here.
for _, item := range q {
info, exist := s.clientInfos.GetAndRemove(item.identifier)
if exist {
log.Info("remove client info", info.GetLogger()...)
}
}

log.Info("purge client infos done",
zap.Duration("cost", time.Since(begin)),
zap.Int64("num after purge", int64(s.clientInfos.Len())))
}

func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
cli := clientInfo{
ClientInfo: info,
identifier: identifier,
lastActiveTime: time.Now(),
}

s.count.Inc()
s.clientInfos.Insert(identifier, cli)
log.Ctx(ctx).Info("client register", cli.GetLogger()...)
}
Expand All @@ -74,7 +111,7 @@ func (s *connectionManager) KeepActive(identifier int64) {
}

func (s *connectionManager) List() []*commonpb.ClientInfo {
clients := make([]*commonpb.ClientInfo, 0, s.count.Load())
clients := make([]*commonpb.ClientInfo, 0, s.clientInfos.Len())

s.clientInfos.Range(func(identifier int64, info clientInfo) bool {
if info.ClientInfo != nil {
Expand Down Expand Up @@ -120,7 +157,6 @@ func (s *connectionManager) removeLongInactiveClients() {
if time.Since(info.lastActiveTime) > ttl {
log.Info("client deregister", info.GetLogger()...)
s.clientInfos.Remove(candidate)
s.count.Dec()
}
return true
})
Expand Down
21 changes: 21 additions & 0 deletions internal/proxy/connection/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,24 @@ func TestConnectionManager(t *testing.T) {
return len(s.List()) == 0
}, time.Second*5, time.Second)
}

func TestConnectionManager_Purge(t *testing.T) {
paramtable.Init()

pt := paramtable.Get()
pt.Save(pt.ProxyCfg.ConnectionCheckIntervalSeconds.Key, "2")
pt.Save(pt.ProxyCfg.MaxConnectionNum.Key, "2")
defer pt.Reset(pt.ProxyCfg.ConnectionCheckIntervalSeconds.Key)
defer pt.Reset(pt.ProxyCfg.MaxConnectionNum.Key)
s := newConnectionManager()
defer s.Stop()

repeat := 10
for i := 0; i < repeat; i++ {
s.Register(context.TODO(), int64(i), &commonpb.ClientInfo{})
}

assert.Eventually(t, func() bool {
return s.clientInfos.Len() <= 2
}, time.Second*5, time.Second)
}
56 changes: 56 additions & 0 deletions internal/proxy/connection/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package connection

import (
"container/heap"
"time"
)

type queueItem struct {
identifier int64
lastActiveTime time.Time
}

func newQueryItem(identifier int64, lastActiveTime time.Time) *queueItem {
return &queueItem{
identifier: identifier,
lastActiveTime: lastActiveTime,
}
}

type priorityQueue []*queueItem

func (pq priorityQueue) Len() int {
return len(pq)
}

func (pq priorityQueue) Less(i, j int) bool {
// we should purge the oldest, so the newest should be on the root.
return pq[i].lastActiveTime.After(pq[j].lastActiveTime)
}

func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *priorityQueue) Push(x interface{}) {
item := x.(*queueItem)
*pq = append(*pq, item)
}

func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[:n-1]
return item
}

func newPriorityQueueWithCap(cap int) priorityQueue {
q := make(priorityQueue, 0, cap)
heap.Init(&q)
return q
}

func newPriorityQueue() priorityQueue {
return newPriorityQueueWithCap(0)
}
23 changes: 23 additions & 0 deletions internal/proxy/connection/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package connection

import (
"container/heap"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_priorityQueue(t *testing.T) {
q := newPriorityQueue()
repeat := 10
for i := 0; i < repeat; i++ {
heap.Push(&q, newQueryItem(int64(i), time.Now()))
}
counter := repeat - 1
for q.Len() > 0 {
item := heap.Pop(&q).(*queueItem)
assert.Equal(t, int64(counter), item.identifier)
counter--
}
}
10 changes: 10 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ type proxyConfig struct {
// connection manager
ConnectionCheckIntervalSeconds ParamItem `refreshable:"true"`
ConnectionClientInfoTTLSeconds ParamItem `refreshable:"true"`
MaxConnectionNum ParamItem `refreshable:"true"`

GracefulStopTimeout ParamItem `refreshable:"true"`
}
Expand Down Expand Up @@ -1296,6 +1297,15 @@ please adjust in embedded Milvus: false`,
Export: true,
}
p.ConnectionClientInfoTTLSeconds.Init(base.mgr)

p.MaxConnectionNum = ParamItem{
Key: "proxy.maxConnectionNum",
Version: "2.3.11",
Doc: "the max client info numbers that proxy should manage, avoid too many client infos",
DefaultValue: "10000",
Export: true,
}
p.MaxConnectionNum.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit c37b779

Please sign in to comment.