From b9dd2b1fc858a833a3d33fa7d60b8b8ea227f86b Mon Sep 17 00:00:00 2001 From: Juliano Martinez Date: Fri, 5 Jul 2024 20:00:46 +0200 Subject: [PATCH] draft: Juliano/cleanup (#14) * small code cleanup * improve a bit error handling * adds extra tests * allocate toUpdate based on the size of the current acls * adds extra tests for ACL SAVE and LOAD * update * adds missing mock --- pkg/aclmanager/aclmanager.go | 205 ++++++++++++++++++------------ pkg/aclmanager/aclmanager_test.go | 168 ++++++++++++++++++++++++ 2 files changed, 291 insertions(+), 82 deletions(-) diff --git a/pkg/aclmanager/aclmanager.go b/pkg/aclmanager/aclmanager.go index 86c44ca..b21c3f3 100644 --- a/pkg/aclmanager/aclmanager.go +++ b/pkg/aclmanager/aclmanager.go @@ -27,7 +27,7 @@ var ( filterUser = regexp.MustCompile(`^user\s+`) ) -// AclManager containers the struct for bedel to manager the state of aclmanager acls +// AclManager contains the struct for managing the state of ACLs type AclManager struct { Addr string Username string @@ -45,6 +45,7 @@ func New(addr string, username string, password string, aclfile bool) *AclManage Username: username, Password: password, }) + slog.Info("Created new AclManager", "addr", addr, "username", username) return &AclManager{ Addr: addr, Username: username, @@ -56,245 +57,285 @@ func New(addr string, username string, password string, aclfile bool) *AclManage } // findNodes returns a list of nodes in the cluster based on the redis info replication command -func (a *AclManager) findNodes(ctx context.Context) (err error) { - slog.Debug("Finding nodes") +func (a *AclManager) findNodes(ctx context.Context) error { + slog.Debug("Entering findNodes") + defer slog.Debug("Exiting findNodes") + replicationInfo, err := a.RedisClient.Info(ctx, "replication").Result() if err != nil { - return err + slog.Error("Failed to get replication info", "error", err) + return fmt.Errorf("findNodes: failed to get replication info: %w", err) } a.primary.Store(role.MatchString(replicationInfo)) var masterHost, masterPort string - var nodes []string + nodes := make([]string, 0) scanner := bufio.NewScanner(strings.NewReader(replicationInfo)) for scanner.Scan() { line := scanner.Text() - slog.Debug("Parsing line looking for masterHost", "content", line) + slog.Debug("Parsing line for masterHost", "line", line) if matches := primaryHostRegex.FindStringSubmatch(line); matches != nil { masterHost = matches[1] } - slog.Debug("Parsing line looking for Follower", "content", line) + slog.Debug("Parsing line for masterPort", "line", line) if matches := primaryPortRegex.FindStringSubmatch(line); matches != nil { masterPort = matches[1] - nodes = append(nodes, fmt.Sprintf("%s:%s", masterHost, masterPort)) - a.nodes[fmt.Sprintf("%s:%s", masterHost, masterPort)] = Primary + node := fmt.Sprintf("%s:%s", masterHost, masterPort) + nodes = append(nodes, node) + a.nodes[node] = Primary } + slog.Debug("Parsing line for follower", "line", line) if matches := followerRegex.FindStringSubmatch(line); matches != nil { ip := matches[followerRegex.SubexpIndex("ip")] port := matches[followerRegex.SubexpIndex("port")] - nodes = append(nodes, fmt.Sprintf("%s:%s", ip, port)) - a.nodes[fmt.Sprintf("%s:%s", ip, port)] = Follower + node := fmt.Sprintf("%s:%s", ip, port) + nodes = append(nodes, node) + a.nodes[node] = Follower } } if err := scanner.Err(); err != nil { - return err + slog.Error("Scanner error", "error", err) + return fmt.Errorf("findNodes: scanner error: %w", err) } for _, node := range nodes { if _, ok := a.nodes[node]; !ok { delete(a.nodes, node) + slog.Debug("Deleted node", "node", node) } } - return err + return nil } -// CurrentFunction check if the current node is the Primary node -func (a *AclManager) CurrentFunction(ctx context.Context) (function int, err error) { - slog.Debug("Check node current function") - err = a.findNodes(ctx) +// CurrentFunction checks if the current node is the Primary node +func (a *AclManager) CurrentFunction(ctx context.Context) (int, error) { + slog.Debug("Entering CurrentFunction") + defer slog.Debug("Exiting CurrentFunction") + + err := a.findNodes(ctx) if err != nil { - return Unknown, err + slog.Error("Failed to find nodes", "error", err) + return Unknown, fmt.Errorf("CurrentFunction: %w", err) } if a.primary.Load() { - return Primary, err + slog.Info("Current node is Primary") + return Primary, nil } - - return Follower, err + slog.Info("Current node is Follower") + return Follower, nil } -func (a *AclManager) Primary(ctx context.Context) (primary *AclManager, err error) { - err = a.findNodes(ctx) +func (a *AclManager) Primary(ctx context.Context) (*AclManager, error) { + slog.Debug("Entering Primary") + defer slog.Debug("Exiting Primary") + + err := a.findNodes(ctx) if err != nil { - return nil, err + slog.Error("Failed to find nodes", "error", err) + return nil, fmt.Errorf("Primary: %w", err) } for address, function := range a.nodes { if function == Primary { - return New(address, a.Username, a.Password, a.aclFile), err + slog.Info("Found Primary node", "address", address) + return New(address, a.Username, a.Password, a.aclFile), nil } } - - return nil, err + slog.Warn("Primary node not found") + return nil, nil } // Close closes the redis client func (a *AclManager) Close() error { + slog.Debug("Closing Redis client") return a.RedisClient.Close() } // listAcls returns a list of acls in the cluster based on the redis acl list command -func listAcls(ctx context.Context, client *redis.Client) (acls []string, err error) { +func listAcls(ctx context.Context, client *redis.Client) ([]string, error) { + slog.Debug("Entering listAcls") + defer slog.Debug("Exiting listAcls") + result, err := client.Do(ctx, "ACL", "LIST").Result() if err != nil { - return nil, err + slog.Error("Failed to list ACLs", "error", err) + return nil, fmt.Errorf("listAcls: %w", err) } aclList, ok := result.([]interface{}) if !ok { - return nil, fmt.Errorf("unexpected result format: %v", result) + err := fmt.Errorf("unexpected result format: %v", result) + slog.Error("Unexpected result format", "result", result) + return nil, fmt.Errorf("listAcls: %w", err) } if len(aclList) == 0 { + slog.Info("No ACLs found") return nil, nil // Return nil if no ACLs are found } - acls = make([]string, len(aclList)) + acls := make([]string, len(aclList)) for i, acl := range aclList { aclStr, ok := acl.(string) if !ok { - return nil, fmt.Errorf("unexpected type for ACL: %v", acl) + err := fmt.Errorf("unexpected type for ACL: %v", acl) + slog.Error("Unexpected type for ACL", "acl", acl) + return nil, fmt.Errorf("listAcls: %w", err) } acls[i] = aclStr } - + slog.Info("Listed ACLs", "count", len(acls)) return acls, nil } -// saveAclFile call the redis command ACL SAVE to save the acls to the aclFile +// saveAclFile calls the redis command ACL SAVE to save the acls to the aclFile func saveAclFile(ctx context.Context, client *redis.Client) error { - slog.Debug("Saving acls to aclFile") + slog.Debug("Entering saveAclFile") + defer slog.Debug("Exiting saveAclFile") + if err := client.Do(ctx, "ACL", "SAVE").Err(); err != nil { - return fmt.Errorf("error saving acls to aclFile: %v", err) + slog.Error("Failed to save ACLs to aclFile", "error", err) + return fmt.Errorf("saveAclFile: %w", err) } - + slog.Info("Saved ACLs to aclFile") return nil } -// loadAclFile call the redis command ACL LOAD to load the acls from the aclFile +// loadAclFile calls the redis command ACL LOAD to load the acls from the aclFile func loadAclFile(ctx context.Context, client *redis.Client) error { - slog.Debug("Loading acls from aclFile") + slog.Debug("Entering loadAclFile") + defer slog.Debug("Exiting loadAclFile") + if err := client.Do(ctx, "ACL", "LOAD").Err(); err != nil { - return fmt.Errorf("error loading acls from aclFile: %v", err) + slog.Error("Failed to load ACLs from aclFile", "error", err) + return fmt.Errorf("loadAclFile: %w", err) } - + slog.Info("Loaded ACLs from aclFile") return nil } // SyncAcls connects to master node and syncs the acls to the current node -func (a *AclManager) SyncAcls(ctx context.Context, primary *AclManager) (updated []string, deleted []string, err error) { - slog.Debug("Syncing acls") +func (a *AclManager) SyncAcls(ctx context.Context, primary *AclManager) ([]string, []string, error) { + slog.Debug("Entering SyncAcls") + defer slog.Debug("Exiting SyncAcls") + if primary == nil { - return updated, deleted, fmt.Errorf("no primary found") + err := fmt.Errorf("no primary found") + slog.Error("No primary found", "error", err) + return nil, nil, err } sourceAcls, err := listAcls(ctx, primary.RedisClient) if err != nil { - return nil, nil, fmt.Errorf("error listing source acls: %v", err) + slog.Error("Failed to list source ACLs", "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error listing source acls: %w", err) } if a.aclFile { - err = saveAclFile(ctx, primary.RedisClient) - if err != nil { - return nil, nil, fmt.Errorf("error saving primary acls to aclFile: %v", err) + if err = saveAclFile(ctx, primary.RedisClient); err != nil { + slog.Error("Failed to save primary ACLs to aclFile", "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error saving primary acls to aclFile: %w", err) } } destinationAcls, err := listAcls(ctx, a.RedisClient) if err != nil { - return nil, nil, fmt.Errorf("error listing current acls: %v", err) + slog.Error("Failed to list current ACLs", "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error listing current acls: %w", err) } - // Map to keep track of ACLs to add - toUpdate := make(map[string]string) + toUpdate := make(map[string]string, len(sourceAcls)) for _, acl := range sourceAcls { - username := strings.Split(acl, " ")[1] + username := strings.Fields(acl)[1] toUpdate[username] = acl } - // Delete ACLs not in source and remove from the toUpdate map if present in destination + var updated, deleted []string + for _, acl := range destinationAcls { - username := strings.Split(acl, " ")[1] + username := strings.Fields(acl)[1] if currentAcl, found := toUpdate[username]; found { if currentAcl == acl { - // If found in source, don't need to add, so remove from map delete(toUpdate, username) slog.Debug("ACL already in sync", "username", username) } continue } - // If not found in source, delete from destination slog.Debug("Deleting ACL", "username", username) - if err := a.RedisClient.Do(context.Background(), "ACL", "DELUSER", username).Err(); err != nil { - return nil, nil, fmt.Errorf("error deleting acl: %v", err) + if err := a.RedisClient.Do(ctx, "ACL", "DELUSER", username).Err(); err != nil { + slog.Error("Failed to delete ACL", "username", username, "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error deleting acl: %w", err) } deleted = append(deleted, username) } - // Add remaining ACLs from source for username, acl := range toUpdate { slog.Debug("Syncing ACL", "username", username, "line", acl) command := strings.Split(filterUser.ReplaceAllString(acl, "ACL SETUSER "), " ") - commandInterfce := make([]interface{}, len(command)) + commandInterface := make([]interface{}, len(command)) for i, s := range command { - commandInterfce[i] = s + commandInterface[i] = s } - if err := a.RedisClient.Do(context.Background(), commandInterfce...).Err(); err != nil { - return nil, nil, fmt.Errorf("error setting acl: %v", err) + if err := a.RedisClient.Do(ctx, commandInterface...).Err(); err != nil { + slog.Error("Failed to set ACL", "username", username, "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error setting acl: %w", err) } updated = append(updated, username) } if a.aclFile { - err = saveAclFile(ctx, a.RedisClient) - if err != nil { - return nil, nil, fmt.Errorf("error saving acls to aclFile: %v", err) + if err = saveAclFile(ctx, a.RedisClient); err != nil { + slog.Error("Failed to save ACLs to aclFile", "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error saving acls to aclFile: %w", err) } - err = loadAclFile(ctx, a.RedisClient) - if err != nil { - return nil, nil, fmt.Errorf("error loading synced acls from aclFile: %v", err) + if err = loadAclFile(ctx, a.RedisClient); err != nil { + slog.Error("Failed to load synced ACLs from aclFile", "error", err) + return nil, nil, fmt.Errorf("SyncAcls: error loading synced acls from aclFile: %w", err) } } + slog.Info("Synced ACLs", "added", updated, "deleted", deleted) return updated, deleted, nil } // Loop loops through the sync interval and syncs the acls -func (a *AclManager) Loop(ctx context.Context) (err error) { +func (a *AclManager) Loop(ctx context.Context) error { + slog.Debug("Entering Loop") + defer slog.Debug("Exiting Loop") + ticker := time.NewTicker(viper.GetDuration("syncInterval") * time.Second) defer ticker.Stop() - var primary *AclManager for { select { case <-ctx.Done(): - return err + slog.Info("Context done, exiting Loop") + return nil case <-ticker.C: - function, e := a.CurrentFunction(ctx) + function, err := a.CurrentFunction(ctx) if err != nil { - slog.Warn("unable to check if it's a Primary", "message", err) - err = fmt.Errorf("unable to check if it's a Primary: %w", err) + slog.Warn("Unable to check if it's a Primary", "error", err) + continue } if function == Follower { - primary, err = a.Primary(ctx) + primary, err := a.Primary(ctx) if err != nil { - slog.Warn("unable to find Primary", "message", e) + slog.Warn("Unable to find Primary", "error", err) continue } - var added, deleted []string - added, deleted, err = a.SyncAcls(ctx, primary) + added, deleted, err := a.SyncAcls(ctx, primary) if err != nil { - slog.Warn("unable to sync acls from Primary", "message", err) - err = fmt.Errorf("unable to sync acls from Primary: %w", err) + slog.Warn("Unable to sync ACLs from Primary", "error", err) continue } - slog.Info("Synced acls from Primary", "added", added, "deleted", deleted) + slog.Info("Synced ACLs from Primary", "added", added, "deleted", deleted) } } } diff --git a/pkg/aclmanager/aclmanager_test.go b/pkg/aclmanager/aclmanager_test.go index 9a49497..0b9058d 100644 --- a/pkg/aclmanager/aclmanager_test.go +++ b/pkg/aclmanager/aclmanager_test.go @@ -797,3 +797,171 @@ func TestLoadAclFile(t *testing.T) { }) } } + +func TestFindNodes_LargeCluster(t *testing.T) { + mockResp := generateLargeClusterOutput(1000) // Generates a mock output for 1000 nodes + redisClient, mock := redismock.NewClientMock() + mock.ExpectInfo("replication").SetVal(mockResp) + + aclManager := AclManager{RedisClient: redisClient, nodes: make(map[string]int)} + ctx := context.Background() + + err := aclManager.findNodes(ctx) + assert.NoError(t, err) + assert.Equal(t, 1000, len(aclManager.nodes)) +} + +func TestLoop_ShortInterval(t *testing.T) { + viper.Set("syncInterval", 1) // Set a very short sync interval for testing + redisClient, mock := redismock.NewClientMock() + + aclManager := &AclManager{ + Addr: "localhost:6379", + Password: "password", + Username: "username", + RedisClient: redisClient, + nodes: make(map[string]int), + } + + mock.ExpectInfo("replication").SetVal(followerOutput) + mock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user default on nopass ~* &* +@all", + }) + + // Set up a cancellable context to control the loop + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run Loop in a separate goroutine + done := make(chan error, 1) + go func() { + done <- aclManager.Loop(ctx) + }() + + time.Sleep(time.Second * 5) // Run the loop for a few seconds + + // Cancel the context to stop the loop + cancel() + + // Check for errors + err := <-done + assert.NoError(t, err) +} + +func generateLargeClusterOutput(nodeCount int) string { + var sb strings.Builder + sb.WriteString("# Replication\nrole:master\nconnected_slaves:" + fmt.Sprint(nodeCount) + "\n") + for i := 0; i < nodeCount; i++ { + sb.WriteString(fmt.Sprintf("slave%d:ip=172.21.0.%d,port=6379,state=online,offset=322,lag=0\n", i, i+3)) + } + return sb.String() +} + +func TestSyncAcls_ACLFileEnabled(t *testing.T) { + primaryClient, primaryMock := redismock.NewClientMock() + followerClient, followerMock := redismock.NewClientMock() + + aclManagerPrimary := &AclManager{RedisClient: primaryClient, nodes: make(map[string]int), aclFile: true} + aclManagerFollower := &AclManager{RedisClient: followerClient, nodes: make(map[string]int), aclFile: true} + + primaryMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + followerMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl3", + }) + followerMock.ExpectDo("ACL", "DELUSER", "acl3").SetVal("OK") + followerMock.ExpectDo("ACL", "SETUSER", "acl2").SetVal("OK") + + primaryMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "LOAD").SetVal("OK") + + updated, deleted, err := aclManagerFollower.SyncAcls(context.Background(), aclManagerPrimary) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"acl2"}, updated) + assert.ElementsMatch(t, []string{"acl3"}, deleted) +} + +func TestSyncAcls_SaveACLFileError(t *testing.T) { + primaryClient, primaryMock := redismock.NewClientMock() + followerClient, followerMock := redismock.NewClientMock() + + aclManagerPrimary := &AclManager{RedisClient: primaryClient, nodes: make(map[string]int), aclFile: true} + aclManagerFollower := &AclManager{RedisClient: followerClient, nodes: make(map[string]int), aclFile: true} + + primaryMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + followerMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl3", + }) + + primaryMock.ExpectDo("ACL", "SAVE").SetErr(fmt.Errorf("failed to save ACL on primary")) + + _, _, err := aclManagerFollower.SyncAcls(context.Background(), aclManagerPrimary) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to save ACL on primary") +} + +func TestSyncAcls_LoadACLFileError(t *testing.T) { + primaryClient, primaryMock := redismock.NewClientMock() + followerClient, followerMock := redismock.NewClientMock() + + aclManagerPrimary := &AclManager{RedisClient: primaryClient, nodes: make(map[string]int), aclFile: true} + aclManagerFollower := &AclManager{RedisClient: followerClient, nodes: make(map[string]int), aclFile: true} + + primaryMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + followerMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + + primaryMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "LOAD").SetErr(fmt.Errorf("failed to load ACL on follower")) + + _, _, err := aclManagerFollower.SyncAcls(context.Background(), aclManagerPrimary) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to load ACL on follower") +} + +func TestSyncAcls_ACLFileSync(t *testing.T) { + primaryClient, primaryMock := redismock.NewClientMock() + followerClient, followerMock := redismock.NewClientMock() + + aclManagerPrimary := &AclManager{RedisClient: primaryClient, nodes: make(map[string]int), aclFile: true} + aclManagerFollower := &AclManager{RedisClient: followerClient, nodes: make(map[string]int), aclFile: true} + + primaryMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + followerMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", "user acl3", + }) + followerMock.ExpectDo("ACL", "DELUSER", "acl3").SetVal("OK") + + primaryMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "LOAD").SetVal("OK") + + _, _, err := aclManagerFollower.SyncAcls(context.Background(), aclManagerPrimary) + assert.NoError(t, err) + + primaryMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + followerMock.ExpectDo("ACL", "LIST").SetVal([]interface{}{ + "user acl1", "user acl2", + }) + + primaryMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "SAVE").SetVal("OK") + followerMock.ExpectDo("ACL", "LOAD").SetVal("OK") + + updated, deleted, err := aclManagerFollower.SyncAcls(context.Background(), aclManagerPrimary) + assert.NoError(t, err) + assert.Empty(t, updated) + assert.Empty(t, deleted) +}