Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Return io error other than NotExist refreshing config #38924

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package config

import (
"fmt"
"log"
"strings"

"github.com/cockroachdb/errors"
"github.com/spf13/cast"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -44,7 +46,10 @@ func Init(opts ...Option) (*Manager, error) {
sourceManager := NewManager()
if o.FileInfo != nil {
s := NewFileSource(o.FileInfo)
sourceManager.AddSource(s)
err := sourceManager.AddSource(s)
if err != nil {
log.Fatal("failed to add FileSource config", zap.Error(err))
}
}
if o.EnvKeyFormatter != nil {
sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter))
Expand Down
17 changes: 13 additions & 4 deletions pkg/config/file_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,38 @@ func (fs *FileSource) loadFromFile() error {
configFiles = fs.files
fs.RUnlock()

notExistsNum := 0
for _, configFile := range configFiles {
if _, err := os.Stat(configFile); err != nil {
continue
if os.IsNotExist(err) {
notExistsNum++
continue
}
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure the err thrown here is properly handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to discussion offline, this error shall be handled by caller

}

ext := filepath.Ext(configFile)
if len(ext) == 0 || (ext[1:] != "yaml" && ext[1:] != "yml") {
return fmt.Errorf("Unsupported Config Type: " + ext)
return fmt.Errorf("Unsupported Config Type: %s", ext)
}

data, err := os.ReadFile(configFile)
if err != nil {
return errors.Wrap(err, "Read config failed: "+configFile)
return errors.Wrapf(err, "Read config failed: %s", configFile)
}

var config map[string]interface{}
err = yaml.Unmarshal(data, &config)
if err != nil {
return errors.Wrap(err, "unmarshal yaml file "+configFile+" failed")
return errors.Wrapf(err, "unmarshal yaml file %s failed", configFile)
}

flattenAndMergeMap("", config, newConfig)
}
// not allow all config files missing, return error for this case
if notExistsNum == len(configFiles) {
return errors.Newf("all config files not exists, files: %v", configFiles)
}

return fs.update(newConfig)
}
Expand Down
51 changes: 31 additions & 20 deletions pkg/config/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestOnEvent(t *testing.T) {

dir, _ := os.MkdirTemp("", "milvus")
yamlFile := path.Join(dir, "milvus.yaml")
os.WriteFile(yamlFile, []byte("a.b: \"\""), 0o600)
mgr, _ := Init(WithEnvSource(formatKey),
WithFilesSource(&FileInfo{
Files: []string{yamlFile},
Expand All @@ -147,31 +148,41 @@ func TestOnEvent(t *testing.T) {
RefreshInterval: 10 * time.Millisecond,
}))
os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600)
time.Sleep(time.Second)
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "aaa")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "aaa"
}, time.Second*5, time.Second)

ctx := context.Background()
client.KV.Put(ctx, "test/config/a/b", "bbb")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "bbb")

assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "bbb"
}, time.Second*5, time.Second)

client.KV.Put(ctx, "test/config/a/b", "ccc")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "ccc"
}, time.Second*5, time.Second)

os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600)
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ccc")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "ccc"
}, time.Second*5, time.Second)

client.KV.Delete(ctx, "test/config/a/b")
time.Sleep(time.Second)
value, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, value, "ddd")
assert.Eventually(t, func() bool {
value, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
return value == "ddd"
}, time.Second*5, time.Second)
}

func TestDeadlock(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func (r *refresher) refreshPeriodically(name string) {
case <-ticker.C:
err := r.fetchFunc()
if err != nil {
log.Error("can not pull configs", zap.Error(err))
r.stop()
log.WithRateGroup("refresher", 1, 60).RatedWarn(60, "can not pull configs", zap.Error(err))
}
case <-r.intervalDone:
log.Info("stop refreshing configurations", zap.String("source", name))
Expand Down
Loading