Skip to content

Commit

Permalink
[fix] 1. 传入的client会被带有accesskey的新client覆盖的问题 2. start后会将配置覆盖为空的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxingwang authored and liuxingwang committed Jun 13, 2020
1 parent 67e4268 commit 6e3e180
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 48 deletions.
57 changes: 28 additions & 29 deletions agollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (a *agollo) initNamespace(namespace string) error {
}

func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Configurations, err error) {
// 判断relaod的通知id是否大于缓存通知id,防止无谓的刷新缓存,
savedNotificationID, ok := a.notificationMap.Load(namespace)
if ok && savedNotificationID.(int) >= notificationID {
conf = a.getNameSpace(namespace)
return
}
a.notificationMap.Store(namespace, notificationID)

var configServerURL string
Expand All @@ -153,20 +159,27 @@ func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Con
ReleaseKey(cachedReleaseKey.(string)),
)

// 服务端未找到namespace时返回404
if status == http.StatusNotFound {
switch status {
case http.StatusOK: // 正常响应
a.cache.Store(namespace, config.Configurations) // 覆盖旧缓存
a.releaseKeyMap.Store(namespace, config.ReleaseKey) // 存储最新的release_key
conf = config.Configurations

// 备份配置
if err = a.backup(); err != nil {
return
}
case http.StatusNotModified: // 服务端未修改配置情况下返回304
conf = a.getNameSpace(namespace)
case http.StatusNotFound: // 服务端未找到namespace时返回404
// fix #23 当需要加载的namespace还未创建时,置为以下状态
// 1. notificationMap添加namespace以保证在服务器端的轮训列表中
// 2. 且notificationID(0) > 默认值(-1), 服务端新建完后发送改变事件
a.notificationMap.Store(namespace, 0)
a.cache.Store(namespace, Configurations{})
return
}

if err != nil || status != http.StatusOK {
a.log("ConfigServerUrl", configServerURL,
"Namespace", namespace,
"Action", "ReloadNameSpace",
default: // error || 其他未知错误情况
a.log("ConfigServerUrl", configServerURL, "Namespace", namespace,
"Action", "ReloadNameSpace", "ServerResponseStatus", status,
"Error", err)

conf = Configurations{}
Expand All @@ -178,20 +191,7 @@ func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Con
err = nil
}
}

a.cache.Store(namespace, conf)
a.releaseKeyMap.Store(namespace, cachedReleaseKey.(string))

return
}

conf = config.Configurations
a.cache.Store(namespace, config.Configurations) // 覆盖旧缓存
a.releaseKeyMap.Store(namespace, config.ReleaseKey) // 存储最新的release_key

// 备份配置
if err = a.backup(); err != nil {
return
}

return
Expand Down Expand Up @@ -476,20 +476,19 @@ func (a *agollo) longPoll() {
// 更新namespace
newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID)

if err == nil {
if isSendChange {
// 发送到监听channel
a.sendWatchCh(notification.NamespaceName,
oldValue,
newValue)
}
if err == nil && isSendChange {
// 发送到监听channel
a.sendWatchCh(notification.NamespaceName, oldValue, newValue)
} else {
a.sendErrorsCh(configServerURL, notifications, notification.NamespaceName, err)
}
}
}
}

// apollo有个蛋疼的地方,通过noncache接口可以获取到配置,releasekey信息
// 但是无法获取到namespace的notificationID
// 导致监听配置时,无法判断是否需要通知或者更新
func (a *agollo) isSendChange(namespace string) bool {
v, ok := a.notificationMap.Load(namespace)
return ok && v.(int) > defaultNotificationID
Expand Down
267 changes: 267 additions & 0 deletions agollo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package agollo

import (
"fmt"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type mockApolloClient struct {
notifications func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error)
getConfigsFromNonCache func(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (int, *Config, error)
getConfigsFromCache func(configServerURL, appID, cluster, namespace string) (Configurations, error)
getConfigServers func(metaServerURL, appID string) (int, []ConfigServer, error)
}

func (c *mockApolloClient) Apply(opts ...ApolloClientOption) {

}

func (c *mockApolloClient) Notifications(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) {
if c.notifications == nil {
return 404, nil, nil
}
return c.notifications(configServerURL, appID, clusterName, notifications)
}

func (c *mockApolloClient) GetConfigsFromNonCache(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (int, *Config, error) {
if c.getConfigsFromNonCache == nil {
return 404, nil, nil
}
return c.getConfigsFromNonCache(configServerURL, appID, cluster, namespace, opts...)
}

func (c *mockApolloClient) GetConfigsFromCache(configServerURL, appID, cluster, namespace string) (Configurations, error) {
if c.getConfigsFromCache == nil {
return nil, nil
}
return c.getConfigsFromCache(configServerURL, appID, cluster, namespace)
}

func (c *mockApolloClient) GetConfigServers(metaServerURL, appID string) (int, []ConfigServer, error) {
if c.getConfigServers == nil {
return 404, nil, nil
}
return c.getConfigServers(metaServerURL, appID)
}

func TestAgollo(t *testing.T) {
configServerURL := "http://localhost:8080"
appid := "test"
cluster := "default"
newConfigs := func() map[string]*Config {
return map[string]*Config{
"application": &Config{
AppID: appid,
Cluster: cluster,
NamespaceName: "application",
Configurations: map[string]interface{}{
"timeout": "100",
},
ReleaseKey: "111",
},
"test.json": &Config{
AppID: appid,
Cluster: cluster,
NamespaceName: "test.json",
Configurations: map[string]interface{}{
"content": `{"name":"foo","age":18}`,
},
ReleaseKey: "121",
},
}
}

newClient := func(configs map[string]*Config) ApolloClient {
var lock sync.RWMutex
var once sync.Once
return &mockApolloClient{
notifications: func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) {
lock.RLock()
rk, _ := strconv.Atoi(configs["application"].ReleaseKey)
lock.RUnlock()

once.Do(func() {
rk++

lock.Lock()
configs["application"].ReleaseKey = fmt.Sprint(rk)
lock.Unlock()
})
return 200, []Notification{
Notification{
NamespaceName: "application",
NotificationID: rk,
},
}, nil
},
getConfigsFromCache: func(configServerURL, appID, cluster, namespace string) (Configurations, error) {
return nil, nil
},
getConfigsFromNonCache: func(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (i int, config *Config, err error) {
var options NotificationsOptions
for _, opt := range opts {
opt(&options)
}

lock.RLock()
config, ok := configs[namespace]
lock.RUnlock()
if !ok {
return 404, nil, nil
}

if config.ReleaseKey == options.ReleaseKey {
return 304, nil, nil
}

return 200, config, nil
},
getConfigServers: func(metaServerURL, appID string) (i int, servers []ConfigServer, err error) {
return 200, []ConfigServer{
ConfigServer{HomePageURL: metaServerURL},
}, nil
},
}
}

newBadClient := func(configs map[string]*Config) ApolloClient {
return &mockApolloClient{
notifications: func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) {
return 500, nil, nil
},
getConfigsFromCache: func(configServerURL, appID, cluster, namespace string) (Configurations, error) {
return nil, nil
},
getConfigsFromNonCache: func(configServerURL, appID, cluster, namespace string, opts ...NotificationsOption) (i int, config *Config, err error) {
return 500, nil, nil
},
getConfigServers: func(metaServerURL, appID string) (i int, servers []ConfigServer, err error) {
return 500, nil, nil
},
}
}

var tests = []struct {
Name string
NewAgollo func(configs map[string]*Config) Agollo
Test func(a Agollo, configs map[string]*Config)
}{
{
Name: "测试:预加载的namespace应该正常可获取,非预加载的namespace无法获取配置",
NewAgollo: func(configs map[string]*Config) Agollo {
a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), PreloadNamespaces("test.json"))
assert.Nil(t, err)
assert.NotNil(t, a)
return a
},
Test: func(a Agollo, configs map[string]*Config) {
for namespace, config := range configs {
for key, expected := range config.Configurations {
if namespace == "test.json" {
actual := a.Get(key, WithNamespace(namespace))
assert.Equal(t, expected, actual)
} else {
actual := a.Get(key, WithNamespace(namespace))
assert.Empty(t, actual)
}
}
}
},
},
{
Name: "测试:自动获取非预加载namespace时,正常读取配置配置项",
NewAgollo: func(configs map[string]*Config) Agollo {
a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss())
assert.Nil(t, err)
assert.NotNil(t, a)
return a
},
Test: func(a Agollo, configs map[string]*Config) {
for namespace, config := range configs {
for key, expected := range config.Configurations {
actual := a.Get(key, WithNamespace(namespace))
assert.Equal(t, expected, actual)
}
}

// 测试无WithNamespace配置项时读取application的配置
key := "timeout"
expected := configs["application"].Configurations[key]
actual := a.Get(key)
assert.Equal(t, expected, actual)
},
},
{
Name: "测试:初始化后 start 监听配置的情况",
NewAgollo: func(configs map[string]*Config) Agollo {
a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss())
assert.Nil(t, err)
assert.NotNil(t, a)
return a
},
Test: func(a Agollo, configs map[string]*Config) {
a.Start()
defer a.Stop()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 5; i++ {
for namespace, config := range configs {
for key, expected := range config.Configurations {
actual := a.Get(key, WithNamespace(namespace))
assert.Equal(t, expected, actual)
}
}
time.Sleep(time.Second)
}
}()

wg.Wait()
},
},
{
Name: "测试:容灾配置项",
NewAgollo: func(configs map[string]*Config) Agollo {
a, err := New(configServerURL, appid, WithApolloClient(newBadClient(configs)), AutoFetchOnCacheMiss(), FailTolerantOnBackupExists())
assert.Nil(t, err)
assert.NotNil(t, a)
return a
},
Test: func(a Agollo, configs map[string]*Config) {
a.Start()
defer a.Stop()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 5; i++ {
for namespace, config := range configs {
for key, expected := range config.Configurations {
actual := a.Get(key, WithNamespace(namespace))
assert.Equal(t, expected, actual)
}
}
time.Sleep(time.Second)
}
}()

wg.Wait()
},
},
}

for _, test := range tests {
configs := newConfigs()
test.Test(test.NewAgollo(configs), configs)
}
}
Loading

0 comments on commit 6e3e180

Please sign in to comment.