diff --git a/agollo.go b/agollo.go index 5726cee..3bc70c2 100644 --- a/agollo.go +++ b/agollo.go @@ -132,6 +132,12 @@ func (a *agollo) initNamespace(namespace string) error { } func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Configurations, err error) { + // 判断relaod的通知id是否大于缓存通知id,防止无谓的刷新缓存, + savedNotificationID, ok := a.notificationMap.Load(namespace) + if ok && savedNotificationID.(int) >= notificationID { + conf = a.getNameSpace(namespace) + return + } a.notificationMap.Store(namespace, notificationID) var configServerURL string @@ -153,20 +159,27 @@ func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Con ReleaseKey(cachedReleaseKey.(string)), ) - // 服务端未找到namespace时返回404 - if status == http.StatusNotFound { + switch status { + case http.StatusOK: // 正常响应 + a.cache.Store(namespace, config.Configurations) // 覆盖旧缓存 + a.releaseKeyMap.Store(namespace, config.ReleaseKey) // 存储最新的release_key + conf = config.Configurations + + // 备份配置 + if err = a.backup(); err != nil { + return + } + case http.StatusNotModified: // 服务端未修改配置情况下返回304 + conf = a.getNameSpace(namespace) + case http.StatusNotFound: // 服务端未找到namespace时返回404 // fix #23 当需要加载的namespace还未创建时,置为以下状态 // 1. notificationMap添加namespace以保证在服务器端的轮训列表中 // 2. 且notificationID(0) > 默认值(-1), 服务端新建完后发送改变事件 a.notificationMap.Store(namespace, 0) a.cache.Store(namespace, Configurations{}) - return - } - - if err != nil || status != http.StatusOK { - a.log("ConfigServerUrl", configServerURL, - "Namespace", namespace, - "Action", "ReloadNameSpace", + default: // error || 其他未知错误情况 + a.log("ConfigServerUrl", configServerURL, "Namespace", namespace, + "Action", "ReloadNameSpace", "ServerResponseStatus", status, "Error", err) conf = Configurations{} @@ -178,20 +191,7 @@ func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Con err = nil } } - a.cache.Store(namespace, conf) - a.releaseKeyMap.Store(namespace, cachedReleaseKey.(string)) - - return - } - - conf = config.Configurations - a.cache.Store(namespace, config.Configurations) // 覆盖旧缓存 - a.releaseKeyMap.Store(namespace, config.ReleaseKey) // 存储最新的release_key - - // 备份配置 - if err = a.backup(); err != nil { - return } return @@ -476,13 +476,9 @@ func (a *agollo) longPoll() { // 更新namespace newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID) - if err == nil { - if isSendChange { - // 发送到监听channel - a.sendWatchCh(notification.NamespaceName, - oldValue, - newValue) - } + if err == nil && isSendChange { + // 发送到监听channel + a.sendWatchCh(notification.NamespaceName, oldValue, newValue) } else { a.sendErrorsCh(configServerURL, notifications, notification.NamespaceName, err) } @@ -490,6 +486,9 @@ func (a *agollo) longPoll() { } } +// apollo有个蛋疼的地方,通过noncache接口可以获取到配置,releasekey信息 +// 但是无法获取到namespace的notificationID +// 导致监听配置时,无法判断是否需要通知或者更新 func (a *agollo) isSendChange(namespace string) bool { v, ok := a.notificationMap.Load(namespace) return ok && v.(int) > defaultNotificationID diff --git a/agollo_test.go b/agollo_test.go new file mode 100644 index 0000000..6bfe488 --- /dev/null +++ b/agollo_test.go @@ -0,0 +1,267 @@ +package agollo + +import ( + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type mockApolloClient struct { + notifications func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) + getConfigsFromNonCache func(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (int, *Config, error) + getConfigsFromCache func(configServerURL, appID, cluster, namespace string) (Configurations, error) + getConfigServers func(metaServerURL, appID string) (int, []ConfigServer, error) +} + +func (c *mockApolloClient) Apply(opts ...ApolloClientOption) { + +} + +func (c *mockApolloClient) Notifications(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) { + if c.notifications == nil { + return 404, nil, nil + } + return c.notifications(configServerURL, appID, clusterName, notifications) +} + +func (c *mockApolloClient) GetConfigsFromNonCache(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (int, *Config, error) { + if c.getConfigsFromNonCache == nil { + return 404, nil, nil + } + return c.getConfigsFromNonCache(configServerURL, appID, cluster, namespace, opts...) +} + +func (c *mockApolloClient) GetConfigsFromCache(configServerURL, appID, cluster, namespace string) (Configurations, error) { + if c.getConfigsFromCache == nil { + return nil, nil + } + return c.getConfigsFromCache(configServerURL, appID, cluster, namespace) +} + +func (c *mockApolloClient) GetConfigServers(metaServerURL, appID string) (int, []ConfigServer, error) { + if c.getConfigServers == nil { + return 404, nil, nil + } + return c.getConfigServers(metaServerURL, appID) +} + +func TestAgollo(t *testing.T) { + configServerURL := "http://localhost:8080" + appid := "test" + cluster := "default" + newConfigs := func() map[string]*Config { + return map[string]*Config{ + "application": &Config{ + AppID: appid, + Cluster: cluster, + NamespaceName: "application", + Configurations: map[string]interface{}{ + "timeout": "100", + }, + ReleaseKey: "111", + }, + "test.json": &Config{ + AppID: appid, + Cluster: cluster, + NamespaceName: "test.json", + Configurations: map[string]interface{}{ + "content": `{"name":"foo","age":18}`, + }, + ReleaseKey: "121", + }, + } + } + + newClient := func(configs map[string]*Config) ApolloClient { + var lock sync.RWMutex + var once sync.Once + return &mockApolloClient{ + notifications: func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) { + lock.RLock() + rk, _ := strconv.Atoi(configs["application"].ReleaseKey) + lock.RUnlock() + + once.Do(func() { + rk++ + + lock.Lock() + configs["application"].ReleaseKey = fmt.Sprint(rk) + lock.Unlock() + }) + return 200, []Notification{ + Notification{ + NamespaceName: "application", + NotificationID: rk, + }, + }, nil + }, + getConfigsFromCache: func(configServerURL, appID, cluster, namespace string) (Configurations, error) { + return nil, nil + }, + getConfigsFromNonCache: func(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (i int, config *Config, err error) { + var options NotificationsOptions + for _, opt := range opts { + opt(&options) + } + + lock.RLock() + config, ok := configs[namespace] + lock.RUnlock() + if !ok { + return 404, nil, nil + } + + if config.ReleaseKey == options.ReleaseKey { + return 304, nil, nil + } + + return 200, config, nil + }, + getConfigServers: func(metaServerURL, appID string) (i int, servers []ConfigServer, err error) { + return 200, []ConfigServer{ + ConfigServer{HomePageURL: metaServerURL}, + }, nil + }, + } + } + + newBadClient := func(configs map[string]*Config) ApolloClient { + return &mockApolloClient{ + notifications: func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) { + return 500, nil, nil + }, + getConfigsFromCache: func(configServerURL, appID, cluster, namespace string) (Configurations, error) { + return nil, nil + }, + getConfigsFromNonCache: func(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (i int, config *Config, err error) { + return 500, nil, nil + }, + getConfigServers: func(metaServerURL, appID string) (i int, servers []ConfigServer, err error) { + return 500, nil, nil + }, + } + } + + var tests = []struct { + Name string + NewAgollo func(configs map[string]*Config) Agollo + Test func(a Agollo, configs map[string]*Config) + }{ + { + Name: "测试:预加载的namespace应该正常可获取,非预加载的namespace无法获取配置", + NewAgollo: func(configs map[string]*Config) Agollo { + a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), PreloadNamespaces("test.json")) + assert.Nil(t, err) + assert.NotNil(t, a) + return a + }, + Test: func(a Agollo, configs map[string]*Config) { + for namespace, config := range configs { + for key, expected := range config.Configurations { + if namespace == "test.json" { + actual := a.Get(key, WithNamespace(namespace)) + assert.Equal(t, expected, actual) + } else { + actual := a.Get(key, WithNamespace(namespace)) + assert.Empty(t, actual) + } + } + } + }, + }, + { + Name: "测试:自动获取非预加载namespace时,正常读取配置配置项", + NewAgollo: func(configs map[string]*Config) Agollo { + a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss()) + assert.Nil(t, err) + assert.NotNil(t, a) + return a + }, + Test: func(a Agollo, configs map[string]*Config) { + for namespace, config := range configs { + for key, expected := range config.Configurations { + actual := a.Get(key, WithNamespace(namespace)) + assert.Equal(t, expected, actual) + } + } + + // 测试无WithNamespace配置项时读取application的配置 + key := "timeout" + expected := configs["application"].Configurations[key] + actual := a.Get(key) + assert.Equal(t, expected, actual) + }, + }, + { + Name: "测试:初始化后 start 监听配置的情况", + NewAgollo: func(configs map[string]*Config) Agollo { + a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss()) + assert.Nil(t, err) + assert.NotNil(t, a) + return a + }, + Test: func(a Agollo, configs map[string]*Config) { + a.Start() + defer a.Stop() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 5; i++ { + for namespace, config := range configs { + for key, expected := range config.Configurations { + actual := a.Get(key, WithNamespace(namespace)) + assert.Equal(t, expected, actual) + } + } + time.Sleep(time.Second) + } + }() + + wg.Wait() + }, + }, + { + Name: "测试:容灾配置项", + NewAgollo: func(configs map[string]*Config) Agollo { + a, err := New(configServerURL, appid, WithApolloClient(newBadClient(configs)), AutoFetchOnCacheMiss(), FailTolerantOnBackupExists()) + assert.Nil(t, err) + assert.NotNil(t, a) + return a + }, + Test: func(a Agollo, configs map[string]*Config) { + a.Start() + defer a.Stop() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 5; i++ { + for namespace, config := range configs { + for key, expected := range config.Configurations { + actual := a.Get(key, WithNamespace(namespace)) + assert.Equal(t, expected, actual) + } + } + time.Sleep(time.Second) + } + }() + + wg.Wait() + }, + }, + } + + for _, test := range tests { + configs := newConfigs() + test.Test(test.NewAgollo(configs), configs) + } +} diff --git a/apollo_client.go b/apollo_client.go index 3b484d5..ea25df7 100644 --- a/apollo_client.go +++ b/apollo_client.go @@ -19,6 +19,8 @@ var ( // https://github.com/ctripcorp/apollo/wiki/%E5%85%B6%E5%AE%83%E8%AF%AD%E8%A8%80%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%8E%A5%E5%85%A5%E6%8C%87%E5%8D%97 type ApolloClient interface { + Apply(opts ...ApolloClientOption) + Notifications(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) // 该接口会直接从数据库中获取配置,可以配合配置推送通知实现实时更新配置。 @@ -107,24 +109,15 @@ func WithAccessKey(accessKey string) ApolloClientOption { } func NewApolloClient(opts ...ApolloClientOption) ApolloClient { - c := &apolloClient{} - for _, opt := range opts { - opt(c) - } - - if c.Doer == nil { - c.Doer = &http.Client{ + c := &apolloClient{ + IP: getLocalIP(), + ConfigType: defaultConfigType, + Doer: &http.Client{ Timeout: defaultClientTimeout, // Notifications由于服务端会hold住请求60秒,所以请确保客户端访问服务端的超时时间要大于60秒。 - } - } - - if c.IP == "" { - c.IP = getLocalIP() + }, } - if c.ConfigType == "" { - c.ConfigType = defaultConfigType - } + c.Apply(opts...) return c } @@ -162,6 +155,12 @@ func (c *apolloClient) httpHeader(appID, uri string) map[string]string { return headers } +func (c *apolloClient) Apply(opts ...ApolloClientOption) { + for _, opt := range opts { + opt(c) + } +} + func (c *apolloClient) Notifications(configServerURL, appID, cluster string, notifications []Notification) (status int, result []Notification, err error) { configServerURL = normalizeURL(configServerURL) requestURI := fmt.Sprintf("/notifications/v2?appId=%s&cluster=%s¬ifications=%s", diff --git a/go.mod b/go.mod index ac6f492..33b1f71 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/pelletier/go-toml v1.4.0 // indirect github.com/spf13/viper v1.4.0 + github.com/stretchr/testify v1.6.1 github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77 gopkg.in/go-playground/assert.v1 v1.2.1 ) diff --git a/options.go b/options.go index d47a52a..7dcba0a 100644 --- a/options.go +++ b/options.go @@ -29,7 +29,7 @@ type Options struct { Balancer Balancer // ConfigServer负载均衡 EnableSLB bool // 启用ConfigServer负载均衡 RefreshIntervalInSecond time.Duration // ConfigServer刷新间隔 - AccessKey string + ClientOptions []ApolloClientOption } func newOptions(configServerURL, appID string, opts ...Option) (Options, error) { @@ -37,7 +37,7 @@ func newOptions(configServerURL, appID string, opts ...Option) (Options, error) AppID: appID, Cluster: defaultCluster, DefaultNamespace: defaultNamespace, - // ApolloClient: NewApolloClient(), + ApolloClient: NewApolloClient(), Logger: NewLogger(), AutoFetchOnCacheMiss: defaultAutoFetchOnCacheMiss, LongPollerInterval: defaultLongPollInterval, @@ -49,7 +49,7 @@ func newOptions(configServerURL, appID string, opts ...Option) (Options, error) opt(&options) } - options.ApolloClient = NewApolloClient(WithAccessKey(options.AccessKey)) + options.ApolloClient.Apply(options.ClientOptions...) if options.Balancer == nil { var b Balancer @@ -173,7 +173,13 @@ func ConfigServerRefreshIntervalInSecond(refreshIntervalInSecond time.Duration) func AccessKey(accessKey string) Option { return func(o *Options) { - o.AccessKey = accessKey + o.ClientOptions = append(o.ClientOptions, WithAccessKey(accessKey)) + } +} + +func WithClientOptinons(opts ...ApolloClientOption) Option { + return func(o *Options) { + o.ClientOptions = append(o.ClientOptions, opts...) } }