diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index a4592e8a75f3f..fdb55b0ff9c22 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -899,6 +899,12 @@ func (m *MetaCache) expireShardLeaderCache(ctx context.Context) { } func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string) { + defer func() { + err := getEnforcer().LoadPolicy() + if err != nil { + log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(err)) + } + }() m.mu.Lock() defer m.mu.Unlock() m.unsafeInitPolicyInfo(info, userRoles) @@ -933,7 +939,15 @@ func (m *MetaCache) GetUserRole(user string) []string { return util.StringList(m.userToRoles[user]) } -func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error { +func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) { + defer func() { + if err == nil { + le := getEnforcer().LoadPolicy() + if le != nil { + log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(le)) + } + } + }() if op.OpType != typeutil.CacheRefresh { m.mu.Lock() defer m.mu.Unlock() @@ -941,6 +955,7 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error { return errors.New("empty op key") } } + switch op.OpType { case typeutil.CacheGrantPrivilege: m.privilegeInfos[op.OpKey] = struct{}{} diff --git a/internal/proxy/meta_cache_adapter.go b/internal/proxy/meta_cache_adapter.go new file mode 100644 index 0000000000000..c72665066f72b --- /dev/null +++ b/internal/proxy/meta_cache_adapter.go @@ -0,0 +1,85 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + "fmt" + "strings" + + "github.com/casbin/casbin/v2/model" + jsonadapter "github.com/casbin/json-adapter/v2" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// MetaCacheCasbinAdapter is the implementation of `persist.Adapter` with Cache +// Since the usage shall be read-only, it implements only `LoadPolicy` for now. +type MetaCacheCasbinAdapter struct { + cacheSource func() Cache +} + +func NewMetaCacheCasbinAdapter(cacheSource func() Cache) *MetaCacheCasbinAdapter { + return &MetaCacheCasbinAdapter{ + cacheSource: cacheSource, + } +} + +// LoadPolicy loads all policy rules from the storage. +// Implementing `persist.Adapter`. +func (a *MetaCacheCasbinAdapter) LoadPolicy(model model.Model) error { + cache := a.cacheSource() + if cache == nil { + return merr.WrapErrServiceInternal("cache source return nil cache") + } + policyInfo := strings.Join(cache.GetPrivilegeInfo(context.Background()), ",") + + policy := fmt.Sprintf("[%s]", policyInfo) + log.Ctx(context.Background()).Info("LoddPolicy update policyinfo", zap.String("policyInfo", policy)) + byteSource := []byte(policy) + jAdapter := jsonadapter.NewAdapter(&byteSource) + return jAdapter.LoadPolicy(model) +} + +// SavePolicy saves all policy rules to the storage. +// Implementing `persist.Adapter`. +// MetaCacheCasbinAdapter is read-only, always returns error +func (a *MetaCacheCasbinAdapter) SavePolicy(model model.Model) error { + return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received SavePolicy call") +} + +// AddPolicy adds a policy rule to the storage. +// Implementing `persist.Adapter`. +// MetaCacheCasbinAdapter is read-only, always returns error +func (a *MetaCacheCasbinAdapter) AddPolicy(sec string, ptype string, rule []string) error { + return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received AddPolicy call") +} + +// RemovePolicy removes a policy rule from the storage. +// Implementing `persist.Adapter`. +// MetaCacheCasbinAdapter is read-only, always returns error +func (a *MetaCacheCasbinAdapter) RemovePolicy(sec string, ptype string, rule []string) error { + return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received RemovePolicy call") +} + +// RemoveFilteredPolicy removes policy rules that match the filter from the storage. +// This is part of the Auto-Save feature. +func (a *MetaCacheCasbinAdapter) RemoveFilteredPolicy(sec string, ptype string, fieldIndex int, fieldValues ...string) error { + return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received RemoveFilteredPolicy call") +} diff --git a/internal/proxy/meta_cache_adapter_test.go b/internal/proxy/meta_cache_adapter_test.go new file mode 100644 index 0000000000000..63c48351b3898 --- /dev/null +++ b/internal/proxy/meta_cache_adapter_test.go @@ -0,0 +1,76 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type MetaCacheCasbinAdapterSuite struct { + suite.Suite + + cache *MockCache + adapter *MetaCacheCasbinAdapter +} + +func (s *MetaCacheCasbinAdapterSuite) SetupTest() { + s.cache = NewMockCache(s.T()) + + s.adapter = NewMetaCacheCasbinAdapter(func() Cache { return s.cache }) +} + +func (s *MetaCacheCasbinAdapterSuite) TestLoadPolicy() { + s.Run("normal_load", func() { + s.cache.EXPECT().GetPrivilegeInfo(mock.Anything).Return([]string{}) + + m := getPolicyModel(ModelStr) + err := s.adapter.LoadPolicy(m) + s.NoError(err) + }) + + s.Run("source_return_nil", func() { + adapter := NewMetaCacheCasbinAdapter(func() Cache { return nil }) + + m := getPolicyModel(ModelStr) + err := adapter.LoadPolicy(m) + s.Error(err) + }) +} + +func (s *MetaCacheCasbinAdapterSuite) TestSavePolicy() { + m := getPolicyModel(ModelStr) + s.Error(s.adapter.SavePolicy(m)) +} + +func (s *MetaCacheCasbinAdapterSuite) TestAddPolicy() { + s.Error(s.adapter.AddPolicy("", "", []string{})) +} + +func (s *MetaCacheCasbinAdapterSuite) TestRemovePolicy() { + s.Error(s.adapter.RemovePolicy("", "", []string{})) +} + +func (s *MetaCacheCasbinAdapterSuite) TestRemoveFiltererPolicy() { + s.Error(s.adapter.RemoveFilteredPolicy("", "", 0)) +} + +func TestMetaCacheCasbinAdapter(t *testing.T) { + suite.Run(t, new(MetaCacheCasbinAdapterSuite)) +} diff --git a/internal/proxy/privilege_interceptor.go b/internal/proxy/privilege_interceptor.go index 81df2b27a99e7..acd386ded2fee 100644 --- a/internal/proxy/privilege_interceptor.go +++ b/internal/proxy/privilege_interceptor.go @@ -5,10 +5,10 @@ import ( "fmt" "reflect" "strings" + "sync" "github.com/casbin/casbin/v2" "github.com/casbin/casbin/v2/model" - jsonadapter "github.com/casbin/json-adapter/v2" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -43,6 +43,26 @@ m = r.sub == p.sub && globMatch(r.obj, p.obj) && globMatch(r.act, p.act) || r.su var templateModel = getPolicyModel(ModelStr) +var ( + enforcer *casbin.SyncedEnforcer + initOnce sync.Once +) + +func getEnforcer() *casbin.SyncedEnforcer { + initOnce.Do(func() { + e, err := casbin.NewSyncedEnforcer() + if err != nil { + log.Panic("failed to create casbin enforcer", zap.Error(err)) + } + casbinModel := getPolicyModel(ModelStr) + adapter := NewMetaCacheCasbinAdapter(func() Cache { return globalMetaCache }) + e.InitWithModelAndAdapter(casbinModel, adapter) + e.AddFunction("dbMatch", DBMatchFunc) + enforcer = e + }) + return enforcer +} + func getPolicyModel(modelString string) model.Model { m, err := model.NewModelFromString(modelString) if err != nil { @@ -66,6 +86,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context if !Params.CommonCfg.AuthorizationEnabled.GetAsBool() { return ctx, nil } + log := log.Ctx(ctx) log.Debug("PrivilegeInterceptor", zap.String("type", reflect.TypeOf(req).String())) privilegeExt, err := funcutil.GetPrivilegeExtObj(req) if err != nil { @@ -96,26 +117,14 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context objectNames := funcutil.GetObjectNames(req, objectNameIndexs) objectPrivilege := privilegeExt.ObjectPrivilege.String() dbName := GetCurDBNameFromContextOrDefault(ctx) - policyInfo := strings.Join(globalMetaCache.GetPrivilegeInfo(ctx), ",") - log := log.With(zap.String("username", username), zap.Strings("role_names", roleNames), + log = log.With(zap.String("username", username), zap.Strings("role_names", roleNames), zap.String("object_type", objectType), zap.String("object_privilege", objectPrivilege), zap.String("db_name", dbName), zap.Int32("object_index", objectNameIndex), zap.String("object_name", objectName), - zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames), - zap.String("policy_info", policyInfo)) - - policy := fmt.Sprintf("[%s]", policyInfo) - b := []byte(policy) - a := jsonadapter.NewAdapter(&b) - // the `templateModel` object isn't safe in the concurrent situation - casbinModel := templateModel.Copy() - e, err := casbin.NewEnforcer(casbinModel, a) - if err != nil { - log.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err)) - return ctx, err - } - e.AddFunction("dbMatch", DBMatchFunc) + zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames)) + + e := getEnforcer() for _, roleName := range roleNames { permitFunc := func(resName string) (bool, error) { object := funcutil.PolicyForResource(dbName, objectType, resName) @@ -158,7 +167,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context } } - log.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames)) + log.Info("permission deny", zap.Strings("roles", roleNames)) return ctx, status.Error(codes.PermissionDenied, fmt.Sprintf("%s: permission deny", objectPrivilege)) }