Skip to content

Commit

Permalink
feat:支持长轮询及通知机制 (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewshan authored Apr 6, 2023
1 parent 44417fd commit c41a0f7
Show file tree
Hide file tree
Showing 47 changed files with 1,318 additions and 446 deletions.
6 changes: 5 additions & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type GetServicesRequest api.GetServicesRequest
// InitCalleeServiceRequest is the request struct for InitCalleeService.
type InitCalleeServiceRequest api.InitCalleeServiceRequest

// WatchAllInstancesRequest is the request to watch instances
type WatchAllInstancesRequest api.WatchAllInstancesRequest

// ConsumerAPI 主调端API方法.
type ConsumerAPI interface {
api.SDKOwner
Expand All @@ -66,6 +69,8 @@ type ConsumerAPI interface {
GetServices(req *GetServicesRequest) (*model.ServicesResponse, error)
// InitCalleeService 初始化服务运行中需要的被调服务
InitCalleeService(req *InitCalleeServiceRequest) error
// WatchAllInstances 监听服务实例变更事件
WatchAllInstances(req *WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error)
// Destroy 销毁API,销毁后无法再进行调用
Destroy()
}
Expand All @@ -85,7 +90,6 @@ type ProviderAPI interface {
// RegisterInstance
// minimum supported version of polaris-server is v1.10.0
RegisterInstance(instance *InstanceRegisterRequest) (*model.InstanceRegisterResponse, error)
// Deprecated: Use RegisterInstance instead.
// Register
// 同步注册服务,服务注册成功后会填充instance中的InstanceID字段
// 用户可保持该instance对象用于反注册和心跳上报
Expand Down
18 changes: 12 additions & 6 deletions api/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ const (
EventInstance = model.EventInstance
)

// 网格类型
const (
MeshVirtualService = model.MeshVirtualService
MeshServiceEntry = model.MeshServiceEntry
MeshDestinationRule = model.MeshDestinationRule
MeshEnvoyFilter = model.MeshEnvoyFilter
MeshGateway = model.MeshGateway
// WatchModeLongPull watch model by long pulling, the invocation would be hang on until revision updated or timeout
WatchModeLongPull = model.WatchModeLongPull
// WatchModeNotify watch model by notify to listener
WatchModeNotify = model.WatchModeNotify
)

// GetOneInstanceRequest 获取单个服务的请求对象
Expand Down Expand Up @@ -120,6 +118,11 @@ type InitCalleeServiceRequest struct {
model.InitCalleeServiceRequest
}

// WatchAllInstancesRequest .
type WatchAllInstancesRequest struct {
model.WatchAllInstancesRequest
}

// ConsumerAPI 主调端API方法
type ConsumerAPI interface {
SDKOwner
Expand All @@ -135,12 +138,15 @@ type ConsumerAPI interface {
UpdateServiceCallResult(req *ServiceCallResult) error
// Destroy 销毁API,销毁后无法再进行调用
Destroy()
// Deprecated: please use WatchAllInstances instead
// WatchService 订阅服务消息
WatchService(req *WatchServiceRequest) (*model.WatchServiceResponse, error)
// GetServices 根据业务同步获取批量服务
GetServices(req *GetServicesRequest) (*model.ServicesResponse, error)
// InitCalleeService 初始化服务运行中需要的被调服务
InitCalleeService(req *InitCalleeServiceRequest) error
// WatchAllInstances 监听服务实例变更事件
WatchAllInstances(req *WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error)
}

var (
Expand Down
11 changes: 11 additions & 0 deletions api/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ func (c *consumerAPI) InitCalleeService(req *InitCalleeServiceRequest) error {
return c.context.GetEngine().InitCalleeService(&req.InitCalleeServiceRequest)
}

// WatchAllInstances 监听服务实例变更事件
func (c *consumerAPI) WatchAllInstances(req *WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error) {
if err := checkAvailable(c); err != nil {
return nil, err
}
if err := req.Validate(); err != nil {
return nil, err
}
return c.context.GetEngine().WatchAllInstances(&req.WatchAllInstancesRequest)
}

// SDKContext 获取SDK上下文
func (c *consumerAPI) SDKContext() SDKContext {
return c.context
Expand Down
23 changes: 23 additions & 0 deletions api/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func SetDetectLogger(logger Logger) {
log.SetDetectLogger(logger)
}

// SetCacheLogger 设置缓存日志对象
func SetCacheLogger(logger Logger) {
log.SetCacheLogger(logger)
}

// GetDetectLogger 获取探测日志对象
func GetDetectLogger() Logger {
return log.GetDetectLogger()
Expand All @@ -101,6 +106,11 @@ func GetStatReportLogger() Logger {
return log.GetStatReportLogger()
}

// GetCacheLogger 获取缓存日志对象
func GetCacheLogger() Logger {
return log.GetCacheLogger()
}

// ConfigLoggers 全局配置日志对象
func ConfigLoggers(logDir string, logLevel int) error {
var err error
Expand All @@ -119,6 +129,9 @@ func ConfigLoggers(logDir string, logLevel int) error {
if err = ConfigNetworkLogger(logDir, logLevel); err != nil {
return fmt.Errorf("fail to ConfigNetworkLogger: %v", err)
}
if err = ConfigCacheLogger(logDir, logLevel); err != nil {
return fmt.Errorf("fail to ConfigCacheLogger: %v", err)
}
return nil
}

Expand Down Expand Up @@ -152,6 +165,12 @@ func ConfigNetworkLogger(logDir string, logLevel int) error {
return log.ConfigNetworkLogger(log.DefaultLogger, option)
}

// ConfigCacheLogger 配置缓存更新日志对象
func ConfigCacheLogger(logDir string, logLevel int) error {
option := log.CreateDefaultLoggerOptions(filepath.Join(logDir, log.DefaultCacheLogRotationPath), logLevel)
return log.ConfigNetworkLogger(log.DefaultLogger, option)
}

// SetLoggersLevel 设置所有日志级别
func SetLoggersLevel(loglevel int) error {
var err error
Expand All @@ -175,6 +194,10 @@ func SetLoggersLevel(loglevel int) error {
if nil != logErr {
err = multierror.Append(err, multierror.Prefix(err, "fail to set network logLevel"))
}
logErr = log.GetCacheLogger().SetLogLevel(loglevel)
if nil != logErr {
err = multierror.Append(err, multierror.Prefix(err, "fail to set cache logLevel"))
}
return err
}

Expand Down
5 changes: 5 additions & 0 deletions api_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (c *consumerAPI) InitCalleeService(req *InitCalleeServiceRequest) error {
return c.rawAPI.InitCalleeService((*api.InitCalleeServiceRequest)(req))
}

// WatchAllInstances 监听服务实例变更事件
func (c *consumerAPI) WatchAllInstances(req *WatchAllInstancesRequest) (*model.WatchAllInstancesResponse, error) {
return c.rawAPI.WatchAllInstances((*api.WatchAllInstancesRequest)(req))
}

// Destroy 销毁API,销毁后无法再进行调用
func (c *consumerAPI) Destroy() {
c.rawAPI.Destroy()
Expand Down
11 changes: 9 additions & 2 deletions examples/quickstart/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
global:
system:
discoverCluster:
namespace: Polaris
service: polaris.discover
healthCheckCluster:
namespace: Polaris
service: polaris.healthcheck
serverConnector:
addresses:
- 127.0.0.1:8091
- 127.0.0.1:8090
statReporter:
enable: true
enable: false
chain:
- prometheus
# - pushgateway
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
)

func initArgs() {
flag.StringVar(&namespace, "namespace", "default", "namespace")
flag.StringVar(&namespace, "namespace", "Production", "namespace")
flag.StringVar(&service, "service", "DiscoverEchoServer", "service")
// 当北极星开启鉴权时,需要配置此参数完成相关的权限检查
flag.StringVar(&token, "token", "", "token")
Expand Down
11 changes: 9 additions & 2 deletions examples/quickstart/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
global:
system:
discoverCluster:
namespace: Polaris
service: polaris.discover
healthCheckCluster:
namespace: Polaris
service: polaris.healthcheck
serverConnector:
addresses:
- 127.0.0.1:8091
- 81.71.45.120:8090
statReporter:
enable: true
enable: false
chain:
- prometheus
# - pushgateway
Expand Down
135 changes: 135 additions & 0 deletions examples/watch/longpull/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package main

import (
"flag"
"fmt"
"log"
"sync"
"sync/atomic"
"time"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/api"
)

var (
namespace string
service string
waitIndex uint64
waitTime time.Duration
)

func initArgs() {
flag.StringVar(&namespace, "namespace", "", "namespace")
flag.StringVar(&service, "service", "", "service")
flag.Uint64Var(&waitIndex, "waitIndex", 0, "waitIndex")
flag.DurationVar(&waitTime, "waitTime", 10*time.Second, "waitTime")
}

func registerInstance(svcName string, host string, port int32, provider polaris.ProviderAPI) string {
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = svcName
registerRequest.Namespace = namespace
registerRequest.Host = host
registerRequest.Port = int(port)
resp, err := provider.Register(registerRequest)
if err != nil {
log.Fatalf("fail to register instance to service %s, err is %v", svcName, err)
}
log.Printf("register response: service %s, instanceId %s", svcName, resp.InstanceID)
return resp.InstanceID
}

func deregisterService(svcName string, instanceId string, provider polaris.ProviderAPI) {
log.Printf("start to invoke deregister operation")
deregisterRequest := &polaris.InstanceDeRegisterRequest{}
deregisterRequest.InstanceID = instanceId
if err := provider.Deregister(deregisterRequest); err != nil {
log.Fatalf("fail to deregister instance to service %s, err is %v", svcName, err)
}
log.Printf("deregister successfully to service %s, id=%s", svcName, instanceId)
}

const svcCount = 10

var port int32 = 1000

func main() {
initArgs()
flag.Parse()
if len(namespace) == 0 || len(service) == 0 {
log.Print("namespace and service are required")
return
}
consumer, err := polaris.NewConsumerAPI()
if err != nil {
log.Fatalf("fail to create consumerAPI, err is %v", err)
}
defer consumer.Destroy()

var index uint64 = waitIndex

provider := polaris.NewProviderAPIByContext(consumer.SDKContext())
for i := 0; i < svcCount; i++ {
go func(svcName string) {
time.Sleep(5 * time.Second)
instId1 := registerInstance(svcName, "127.0.0.1", atomic.AddInt32(&port, 1), provider)
instId2 := registerInstance(svcName, "127.0.0.1", atomic.AddInt32(&port, 1), provider)
instId3 := registerInstance(svcName, "127.0.0.1", atomic.AddInt32(&port, 1), provider)
time.Sleep(10 * time.Second)
deregisterService(svcName, instId1, provider)
time.Sleep(10 * time.Second)
deregisterService(svcName, instId2, provider)
time.Sleep(10 * time.Second)
deregisterService(svcName, instId3, provider)
}(fmt.Sprintf("%s-%d", service, i))
}
wg := &sync.WaitGroup{}
wg.Add(svcCount)
for j := 0; j < svcCount; j++ {
go func(svcName string) {
defer wg.Done()
for i := 0; i < 10; i++ {
req := &polaris.WatchAllInstancesRequest{}
req.Service = svcName
req.Namespace = namespace
req.WaitTime = waitTime
req.WaitIndex = index
req.WatchMode = api.WatchModeLongPull
resp, err := consumer.WatchAllInstances(req)
if err != nil {
log.Fatalf("fail to watch all instances, svc %s, err: %s", svcName, err)
}
instanceResp := resp.InstancesResponse()
index = instanceResp.HashValue
log.Printf("svc %s, instances count is %d, next watch index %d", svcName, len(instanceResp.Instances), index)
for i, instance := range instanceResp.Instances {
log.Printf("svc %s, instance %d is %s:%d", svcName, i, instance.GetHost(), instance.GetPort())
}

log.Printf("svc %s, watch id is %d\n", svcName, resp.WatchId())
resp.CancelWatch()
}
}(fmt.Sprintf("%s-%d", service, j))
}
log.Printf("start to wait finish")
wg.Wait()
}
6 changes: 6 additions & 0 deletions examples/watch/longpull/polaris.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
statReporter:
enable: false
Loading

0 comments on commit c41a0f7

Please sign in to comment.