diff --git a/pkg/config/config.go b/pkg/config/config.go index 387692378483f..3ef5e5254f8a7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -18,10 +18,12 @@ package config import ( "fmt" + "log" "strings" "github.com/cockroachdb/errors" "github.com/spf13/cast" + "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -44,7 +46,10 @@ func Init(opts ...Option) (*Manager, error) { sourceManager := NewManager() if o.FileInfo != nil { s := NewFileSource(o.FileInfo) - sourceManager.AddSource(s) + err := sourceManager.AddSource(s) + if err != nil { + log.Fatal("failed to add FileSource config", zap.Error(err)) + } } if o.EnvKeyFormatter != nil { sourceManager.AddSource(NewEnvSource(o.EnvKeyFormatter)) diff --git a/pkg/config/file_source.go b/pkg/config/file_source.go index 6b158fcd2fdfa..b4055997f7979 100644 --- a/pkg/config/file_source.go +++ b/pkg/config/file_source.go @@ -123,29 +123,38 @@ func (fs *FileSource) loadFromFile() error { configFiles = fs.files fs.RUnlock() + notExistsNum := 0 for _, configFile := range configFiles { if _, err := os.Stat(configFile); err != nil { - continue + if os.IsNotExist(err) { + notExistsNum++ + continue + } + return err } ext := filepath.Ext(configFile) if len(ext) == 0 || (ext[1:] != "yaml" && ext[1:] != "yml") { - return fmt.Errorf("Unsupported Config Type: " + ext) + return fmt.Errorf("Unsupported Config Type: %s", ext) } data, err := os.ReadFile(configFile) if err != nil { - return errors.Wrap(err, "Read config failed: "+configFile) + return errors.Wrapf(err, "Read config failed: %s", configFile) } var config map[string]interface{} err = yaml.Unmarshal(data, &config) if err != nil { - return errors.Wrap(err, "unmarshal yaml file "+configFile+" failed") + return errors.Wrapf(err, "unmarshal yaml file %s failed", configFile) } flattenAndMergeMap("", config, newConfig) } + // not allow all config files missing, return error for this case + if notExistsNum == len(configFiles) { + return errors.Newf("all config files not exists, files: %v", configFiles) + } return fs.update(newConfig) } diff --git a/pkg/config/manager_test.go b/pkg/config/manager_test.go index 0fccc876af509..c2afac823314c 100644 --- a/pkg/config/manager_test.go +++ b/pkg/config/manager_test.go @@ -136,6 +136,7 @@ func TestOnEvent(t *testing.T) { dir, _ := os.MkdirTemp("", "milvus") yamlFile := path.Join(dir, "milvus.yaml") + os.WriteFile(yamlFile, []byte("a.b: \"\""), 0o600) mgr, _ := Init(WithEnvSource(formatKey), WithFilesSource(&FileInfo{ Files: []string{yamlFile}, @@ -147,31 +148,41 @@ func TestOnEvent(t *testing.T) { RefreshInterval: 10 * time.Millisecond, })) os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) - time.Sleep(time.Second) - value, err := mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "aaa") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "aaa" + }, time.Second*5, time.Second) + ctx := context.Background() client.KV.Put(ctx, "test/config/a/b", "bbb") - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "bbb") + + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "bbb" + }, time.Second*5, time.Second) + client.KV.Put(ctx, "test/config/a/b", "ccc") - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "ccc") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "ccc" + }, time.Second*5, time.Second) + os.WriteFile(yamlFile, []byte("a.b: ddd"), 0o600) - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "ccc") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "ccc" + }, time.Second*5, time.Second) + client.KV.Delete(ctx, "test/config/a/b") - time.Sleep(time.Second) - value, err = mgr.GetConfig("a.b") - assert.NoError(t, err) - assert.Equal(t, value, "ddd") + assert.Eventually(t, func() bool { + value, err := mgr.GetConfig("a.b") + assert.NoError(t, err) + return value == "ddd" + }, time.Second*5, time.Second) } func TestDeadlock(t *testing.T) { @@ -206,6 +217,7 @@ func TestCachedConfig(t *testing.T) { dir, _ := os.MkdirTemp("", "milvus") yamlFile := path.Join(dir, "milvus.yaml") + os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) mgr, _ := Init(WithEnvSource(formatKey), WithFilesSource(&FileInfo{ Files: []string{yamlFile}, @@ -218,7 +230,6 @@ func TestCachedConfig(t *testing.T) { })) // test get cached value from file { - os.WriteFile(yamlFile, []byte("a.b: aaa"), 0o600) time.Sleep(time.Second) _, exist := mgr.GetCachedValue("a.b") assert.False(t, exist) diff --git a/pkg/config/refresher.go b/pkg/config/refresher.go index 64ea2b2714ed1..472c3537bcbbd 100644 --- a/pkg/config/refresher.go +++ b/pkg/config/refresher.go @@ -72,8 +72,7 @@ func (r *refresher) refreshPeriodically(name string) { case <-ticker.C: err := r.fetchFunc() if err != nil { - log.Error("can not pull configs", zap.Error(err)) - r.stop() + log.WithRateGroup("refresher", 1, 60).RatedWarn(60, "can not pull configs", zap.Error(err)) } case <-r.intervalDone: log.Info("stop refreshing configurations", zap.String("source", name))