From c37b7792f404325deb64c244c39e150d3d7b0cdd Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Fri, 8 Mar 2024 10:17:01 +0800 Subject: [PATCH] enhance: purge client infos periodically (#31037) (#31092) https://github.com/milvus-io/milvus/issues/31007 pr: #31037 --------- Signed-off-by: longjiquan --- configs/milvus.yaml | 1 + internal/proxy/connection/manager.go | 46 +++++++++++++-- internal/proxy/connection/manager_test.go | 21 +++++++ internal/proxy/connection/priority_queue.go | 56 +++++++++++++++++++ .../proxy/connection/priority_queue_test.go | 23 ++++++++ pkg/util/paramtable/component_param.go | 10 ++++ 6 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 internal/proxy/connection/priority_queue.go create mode 100644 internal/proxy/connection/priority_queue_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e4f43dc3c68e1..5555533141f9e 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -209,6 +209,7 @@ proxy: maxTaskNum: 1024 # max task number of proxy task queue connectionCheckIntervalSeconds: 120 # the interval time(in seconds) for connection manager to scan inactive client info connectionClientInfoTTLSeconds: 86400 # inactive client info TTL duration, in seconds + maxConnectionNum: 10000 # the max client info numbers that proxy should manage, avoid too many client infos. accessLog: enable: false # Log filename, set as "" to use stdout. diff --git a/internal/proxy/connection/manager.go b/internal/proxy/connection/manager.go index 03e5a9f0af33f..500ec9aa16a57 100644 --- a/internal/proxy/connection/manager.go +++ b/internal/proxy/connection/manager.go @@ -1,12 +1,13 @@ package connection import ( + "container/heap" "context" "strconv" "sync" "time" - "go.uber.org/atomic" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/log" @@ -22,7 +23,6 @@ type connectionManager struct { wg sync.WaitGroup clientInfos *typeutil.ConcurrentMap[int64, clientInfo] - count atomic.Int64 } func (s *connectionManager) init() { @@ -52,11 +52,49 @@ func (s *connectionManager) checkLoop() { return case <-t.C: s.removeLongInactiveClients() + // not sure if we should purge them periodically. + s.purgeIfNumOfClientsExceed() t.Reset(paramtable.Get().ProxyCfg.ConnectionCheckIntervalSeconds.GetAsDuration(time.Second)) } } } +func (s *connectionManager) purgeIfNumOfClientsExceed() { + diffNum := int64(s.clientInfos.Len()) - paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64() + if diffNum <= 0 { + return + } + + begin := time.Now() + + log := log.With( + zap.Int64("num", int64(s.clientInfos.Len())), + zap.Int64("limit", paramtable.Get().ProxyCfg.MaxConnectionNum.GetAsInt64())) + + log.Info("number of client infos exceed limit, ready to purge the oldest") + q := newPriorityQueueWithCap(int(diffNum + 1)) + s.clientInfos.Range(func(identifier int64, info clientInfo) bool { + heap.Push(&q, newQueryItem(info.identifier, info.lastActiveTime)) + if int64(q.Len()) > diffNum { + // pop the newest. + heap.Pop(&q) + } + return true + }) + + // time order doesn't matter here. + for _, item := range q { + info, exist := s.clientInfos.GetAndRemove(item.identifier) + if exist { + log.Info("remove client info", info.GetLogger()...) + } + } + + log.Info("purge client infos done", + zap.Duration("cost", time.Since(begin)), + zap.Int64("num after purge", int64(s.clientInfos.Len()))) +} + func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) { cli := clientInfo{ ClientInfo: info, @@ -64,7 +102,6 @@ func (s *connectionManager) Register(ctx context.Context, identifier int64, info lastActiveTime: time.Now(), } - s.count.Inc() s.clientInfos.Insert(identifier, cli) log.Ctx(ctx).Info("client register", cli.GetLogger()...) } @@ -74,7 +111,7 @@ func (s *connectionManager) KeepActive(identifier int64) { } func (s *connectionManager) List() []*commonpb.ClientInfo { - clients := make([]*commonpb.ClientInfo, 0, s.count.Load()) + clients := make([]*commonpb.ClientInfo, 0, s.clientInfos.Len()) s.clientInfos.Range(func(identifier int64, info clientInfo) bool { if info.ClientInfo != nil { @@ -120,7 +157,6 @@ func (s *connectionManager) removeLongInactiveClients() { if time.Since(info.lastActiveTime) > ttl { log.Info("client deregister", info.GetLogger()...) s.clientInfos.Remove(candidate) - s.count.Dec() } return true }) diff --git a/internal/proxy/connection/manager_test.go b/internal/proxy/connection/manager_test.go index d85d9206a332c..98af4476ffd78 100644 --- a/internal/proxy/connection/manager_test.go +++ b/internal/proxy/connection/manager_test.go @@ -44,3 +44,24 @@ func TestConnectionManager(t *testing.T) { return len(s.List()) == 0 }, time.Second*5, time.Second) } + +func TestConnectionManager_Purge(t *testing.T) { + paramtable.Init() + + pt := paramtable.Get() + pt.Save(pt.ProxyCfg.ConnectionCheckIntervalSeconds.Key, "2") + pt.Save(pt.ProxyCfg.MaxConnectionNum.Key, "2") + defer pt.Reset(pt.ProxyCfg.ConnectionCheckIntervalSeconds.Key) + defer pt.Reset(pt.ProxyCfg.MaxConnectionNum.Key) + s := newConnectionManager() + defer s.Stop() + + repeat := 10 + for i := 0; i < repeat; i++ { + s.Register(context.TODO(), int64(i), &commonpb.ClientInfo{}) + } + + assert.Eventually(t, func() bool { + return s.clientInfos.Len() <= 2 + }, time.Second*5, time.Second) +} diff --git a/internal/proxy/connection/priority_queue.go b/internal/proxy/connection/priority_queue.go new file mode 100644 index 0000000000000..3ce31bbb61b50 --- /dev/null +++ b/internal/proxy/connection/priority_queue.go @@ -0,0 +1,56 @@ +package connection + +import ( + "container/heap" + "time" +) + +type queueItem struct { + identifier int64 + lastActiveTime time.Time +} + +func newQueryItem(identifier int64, lastActiveTime time.Time) *queueItem { + return &queueItem{ + identifier: identifier, + lastActiveTime: lastActiveTime, + } +} + +type priorityQueue []*queueItem + +func (pq priorityQueue) Len() int { + return len(pq) +} + +func (pq priorityQueue) Less(i, j int) bool { + // we should purge the oldest, so the newest should be on the root. + return pq[i].lastActiveTime.After(pq[j].lastActiveTime) +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + +func (pq *priorityQueue) Push(x interface{}) { + item := x.(*queueItem) + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + *pq = old[:n-1] + return item +} + +func newPriorityQueueWithCap(cap int) priorityQueue { + q := make(priorityQueue, 0, cap) + heap.Init(&q) + return q +} + +func newPriorityQueue() priorityQueue { + return newPriorityQueueWithCap(0) +} diff --git a/internal/proxy/connection/priority_queue_test.go b/internal/proxy/connection/priority_queue_test.go new file mode 100644 index 0000000000000..ce665f89de0c4 --- /dev/null +++ b/internal/proxy/connection/priority_queue_test.go @@ -0,0 +1,23 @@ +package connection + +import ( + "container/heap" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_priorityQueue(t *testing.T) { + q := newPriorityQueue() + repeat := 10 + for i := 0; i < repeat; i++ { + heap.Push(&q, newQueryItem(int64(i), time.Now())) + } + counter := repeat - 1 + for q.Len() > 0 { + item := heap.Pop(&q).(*queueItem) + assert.Equal(t, int64(counter), item.identifier) + counter-- + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f950c74e8e308..f8ea4acef98df 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -975,6 +975,7 @@ type proxyConfig struct { // connection manager ConnectionCheckIntervalSeconds ParamItem `refreshable:"true"` ConnectionClientInfoTTLSeconds ParamItem `refreshable:"true"` + MaxConnectionNum ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` } @@ -1296,6 +1297,15 @@ please adjust in embedded Milvus: false`, Export: true, } p.ConnectionClientInfoTTLSeconds.Init(base.mgr) + + p.MaxConnectionNum = ParamItem{ + Key: "proxy.maxConnectionNum", + Version: "2.3.11", + Doc: "the max client info numbers that proxy should manage, avoid too many client infos", + DefaultValue: "10000", + Export: true, + } + p.MaxConnectionNum.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////