Skip to content

Commit

Permalink
enhance: [cherry-pick] Avoid initializing casbin enforcer for each re…
Browse files Browse the repository at this point in the history
…quest (#29118)

Cherry-pick from master
pr: #29117 
See also #29113

This patch:
- Replace plain Enforcer with `casbin.SyncedEnforcer`
- Add implementation of persist.Adapter with `MetaCacheCasbinAdapter`
- Invoke enforcer.LoadPolicy when policy updated

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Dec 12, 2023
1 parent 0b532b4 commit d36bf21
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 19 deletions.
17 changes: 16 additions & 1 deletion internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,12 @@ func (m *MetaCache) expireShardLeaderCache(ctx context.Context) {
}

func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string) {
defer func() {
err := getEnforcer().LoadPolicy()
if err != nil {
log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(err))
}
}()
m.mu.Lock()
defer m.mu.Unlock()
m.unsafeInitPolicyInfo(info, userRoles)
Expand Down Expand Up @@ -933,14 +939,23 @@ func (m *MetaCache) GetUserRole(user string) []string {
return util.StringList(m.userToRoles[user])
}

func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) error {
func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {
defer func() {
if err == nil {
le := getEnforcer().LoadPolicy()
if le != nil {
log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(le))
}
}
}()
if op.OpType != typeutil.CacheRefresh {
m.mu.Lock()
defer m.mu.Unlock()
if op.OpKey == "" {
return errors.New("empty op key")
}
}

switch op.OpType {
case typeutil.CacheGrantPrivilege:
m.privilegeInfos[op.OpKey] = struct{}{}
Expand Down
85 changes: 85 additions & 0 deletions internal/proxy/meta_cache_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

import (
"context"
"fmt"
"strings"

"github.com/casbin/casbin/v2/model"
jsonadapter "github.com/casbin/json-adapter/v2"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)

// MetaCacheCasbinAdapter is the implementation of `persist.Adapter` with Cache
// Since the usage shall be read-only, it implements only `LoadPolicy` for now.
type MetaCacheCasbinAdapter struct {
cacheSource func() Cache
}

func NewMetaCacheCasbinAdapter(cacheSource func() Cache) *MetaCacheCasbinAdapter {
return &MetaCacheCasbinAdapter{
cacheSource: cacheSource,
}
}

// LoadPolicy loads all policy rules from the storage.
// Implementing `persist.Adapter`.
func (a *MetaCacheCasbinAdapter) LoadPolicy(model model.Model) error {
cache := a.cacheSource()
if cache == nil {
return merr.WrapErrServiceInternal("cache source return nil cache")
}
policyInfo := strings.Join(cache.GetPrivilegeInfo(context.Background()), ",")

policy := fmt.Sprintf("[%s]", policyInfo)
log.Ctx(context.Background()).Info("LoddPolicy update policyinfo", zap.String("policyInfo", policy))
byteSource := []byte(policy)
jAdapter := jsonadapter.NewAdapter(&byteSource)
return jAdapter.LoadPolicy(model)
}

// SavePolicy saves all policy rules to the storage.
// Implementing `persist.Adapter`.
// MetaCacheCasbinAdapter is read-only, always returns error
func (a *MetaCacheCasbinAdapter) SavePolicy(model model.Model) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received SavePolicy call")
}

// AddPolicy adds a policy rule to the storage.
// Implementing `persist.Adapter`.
// MetaCacheCasbinAdapter is read-only, always returns error
func (a *MetaCacheCasbinAdapter) AddPolicy(sec string, ptype string, rule []string) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received AddPolicy call")
}

// RemovePolicy removes a policy rule from the storage.
// Implementing `persist.Adapter`.
// MetaCacheCasbinAdapter is read-only, always returns error
func (a *MetaCacheCasbinAdapter) RemovePolicy(sec string, ptype string, rule []string) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received RemovePolicy call")
}

// RemoveFilteredPolicy removes policy rules that match the filter from the storage.
// This is part of the Auto-Save feature.
func (a *MetaCacheCasbinAdapter) RemoveFilteredPolicy(sec string, ptype string, fieldIndex int, fieldValues ...string) error {
return merr.WrapErrServiceInternal("MetaCacheCasbinAdapter is read-only, but received RemoveFilteredPolicy call")
}
76 changes: 76 additions & 0 deletions internal/proxy/meta_cache_adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)

type MetaCacheCasbinAdapterSuite struct {
suite.Suite

cache *MockCache
adapter *MetaCacheCasbinAdapter
}

func (s *MetaCacheCasbinAdapterSuite) SetupTest() {
s.cache = NewMockCache(s.T())

s.adapter = NewMetaCacheCasbinAdapter(func() Cache { return s.cache })
}

func (s *MetaCacheCasbinAdapterSuite) TestLoadPolicy() {
s.Run("normal_load", func() {
s.cache.EXPECT().GetPrivilegeInfo(mock.Anything).Return([]string{})

m := getPolicyModel(ModelStr)
err := s.adapter.LoadPolicy(m)
s.NoError(err)
})

s.Run("source_return_nil", func() {
adapter := NewMetaCacheCasbinAdapter(func() Cache { return nil })

m := getPolicyModel(ModelStr)
err := adapter.LoadPolicy(m)
s.Error(err)
})
}

func (s *MetaCacheCasbinAdapterSuite) TestSavePolicy() {
m := getPolicyModel(ModelStr)
s.Error(s.adapter.SavePolicy(m))
}

func (s *MetaCacheCasbinAdapterSuite) TestAddPolicy() {
s.Error(s.adapter.AddPolicy("", "", []string{}))
}

func (s *MetaCacheCasbinAdapterSuite) TestRemovePolicy() {
s.Error(s.adapter.RemovePolicy("", "", []string{}))
}

func (s *MetaCacheCasbinAdapterSuite) TestRemoveFiltererPolicy() {
s.Error(s.adapter.RemoveFilteredPolicy("", "", 0))
}

func TestMetaCacheCasbinAdapter(t *testing.T) {
suite.Run(t, new(MetaCacheCasbinAdapterSuite))
}
45 changes: 27 additions & 18 deletions internal/proxy/privilege_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"reflect"
"strings"
"sync"

"github.com/casbin/casbin/v2"
"github.com/casbin/casbin/v2/model"
jsonadapter "github.com/casbin/json-adapter/v2"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -43,6 +43,26 @@ m = r.sub == p.sub && globMatch(r.obj, p.obj) && globMatch(r.act, p.act) || r.su

var templateModel = getPolicyModel(ModelStr)

var (
enforcer *casbin.SyncedEnforcer
initOnce sync.Once
)

func getEnforcer() *casbin.SyncedEnforcer {
initOnce.Do(func() {
e, err := casbin.NewSyncedEnforcer()
if err != nil {
log.Panic("failed to create casbin enforcer", zap.Error(err))
}
casbinModel := getPolicyModel(ModelStr)
adapter := NewMetaCacheCasbinAdapter(func() Cache { return globalMetaCache })
e.InitWithModelAndAdapter(casbinModel, adapter)
e.AddFunction("dbMatch", DBMatchFunc)
enforcer = e
})
return enforcer
}

func getPolicyModel(modelString string) model.Model {
m, err := model.NewModelFromString(modelString)
if err != nil {
Expand All @@ -66,6 +86,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
if !Params.CommonCfg.AuthorizationEnabled.GetAsBool() {
return ctx, nil
}
log := log.Ctx(ctx)
log.Debug("PrivilegeInterceptor", zap.String("type", reflect.TypeOf(req).String()))
privilegeExt, err := funcutil.GetPrivilegeExtObj(req)
if err != nil {
Expand Down Expand Up @@ -96,26 +117,14 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
objectNames := funcutil.GetObjectNames(req, objectNameIndexs)
objectPrivilege := privilegeExt.ObjectPrivilege.String()
dbName := GetCurDBNameFromContextOrDefault(ctx)
policyInfo := strings.Join(globalMetaCache.GetPrivilegeInfo(ctx), ",")

log := log.With(zap.String("username", username), zap.Strings("role_names", roleNames),
log = log.With(zap.String("username", username), zap.Strings("role_names", roleNames),
zap.String("object_type", objectType), zap.String("object_privilege", objectPrivilege),
zap.String("db_name", dbName),
zap.Int32("object_index", objectNameIndex), zap.String("object_name", objectName),
zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames),
zap.String("policy_info", policyInfo))

policy := fmt.Sprintf("[%s]", policyInfo)
b := []byte(policy)
a := jsonadapter.NewAdapter(&b)
// the `templateModel` object isn't safe in the concurrent situation
casbinModel := templateModel.Copy()
e, err := casbin.NewEnforcer(casbinModel, a)
if err != nil {
log.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err))
return ctx, err
}
e.AddFunction("dbMatch", DBMatchFunc)
zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames))

e := getEnforcer()
for _, roleName := range roleNames {
permitFunc := func(resName string) (bool, error) {
object := funcutil.PolicyForResource(dbName, objectType, resName)
Expand Down Expand Up @@ -158,7 +167,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
}
}

log.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames))
log.Info("permission deny", zap.Strings("roles", roleNames))
return ctx, status.Error(codes.PermissionDenied, fmt.Sprintf("%s: permission deny", objectPrivilege))
}

Expand Down

0 comments on commit d36bf21

Please sign in to comment.