Skip to content

Commit

Permalink
adding duplicate checker for executors and coordinators (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsvihladremio authored Sep 3, 2024
1 parent 9d2a345 commit 2f91918
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 29 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## [3.2.2] - 2024-09-03

## Changed

* added note that the executor can be empty during the UI prompts

## Added

* remove host from executor list if it is present in the coordinator list
* remove a host from the list of nodes if it has been specified twice

## [3.2.1] - 2024-08-26

## Added
Expand Down
2 changes: 1 addition & 1 deletion cmd/local/queriesjson/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func parseLine(line string, i int) (QueriesRow, error) {
dat := make(map[string]interface{})
err := json.Unmarshal([]byte(line), &dat)
if err != nil {
return *new(QueriesRow), fmt.Errorf("queries.json line #%v: %v[...] - error: %v", i, strutils.LimitString(line, 50), err)
return *new(QueriesRow), fmt.Errorf("queries.json line #%v: %v[...] - error: %v", i, strutils.GetEndOfString(line, 50), err)
}
var row = new(QueriesRow)
if val, ok := dat["queryId"]; ok {
Expand Down
1 change: 0 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func startTicker() (stop func()) {
close(quit)
}
}

func RemoteCollect(collectionArgs collection.Args, sshArgs ssh.Args, kubeArgs kubernetes.KubeArgs, fallbackEnabled bool, hook shutdown.Hook) error {
patSet := collectionArgs.DremioPAT != ""
consoleprint.UpdateRuntime(
Expand Down
2 changes: 1 addition & 1 deletion cmd/root/collection/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func StartCapture(c HostCaptureConfiguration, localDDCPath, localDDCYamlPath str
StatusUX: "LOCAL-COLLECT",
Result: consoleprint.ResultFailure,
EndProcess: true,
Message: strutils.LimitString(strings.Join(allHostLog, " - "), 1024),
Message: strutils.GetEndOfString(strings.Join(allHostLog, " - "), 1024),
}
consoleprint.UpdateNodeState(nodeState)
simplelog.HostLog(host, fmt.Sprintf("%#v", nodeState))
Expand Down
57 changes: 56 additions & 1 deletion cmd/root/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"io/fs"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -84,6 +86,58 @@ type HostCaptureConfiguration struct {
CollectionMode string
}

func FilterCoordinators(coordinators []string) []string {
// use a map for the unique key property, so we handle duplicates in the list
filteredList := make(map[string]bool)
for _, e := range coordinators {
filteredList[e] = true
}
// convert back into a slice
// again we are dealing with not
// large enough sizes to warrant optimization
var result []string
for k := range filteredList {
result = append(result, k)
}
// sort for consistent behavior and testing
slices.Sort(result)
slices.Reverse(result)
return result
}

func FilterExecutors(executors []string, coordinators []string) []string {
// use a map for the unique key property, so we handle duplicates in the list
filteredList := make(map[string]bool)
for _, e := range executors {
// we're not going to bother optimizing this:
// the list will not be long enough to matter
var dupe bool
// if it's a coordinator we don't need it
for _, c := range coordinators {
if c == e {
dupe = true
simplelog.Warningf("found %v in coordinator and executor list, removing from executor list", e)
consoleprint.AddWarningToConsole(fmt.Sprintf("%v was listed as executor and coordinator, choosing coordinator", e))
break
}
}
if !dupe {
filteredList[e] = true
}
}
// convert back into a slice
// again we are dealing with not
// large enough sizes to warrant optimization
var result []string
for k := range filteredList {
result = append(result, k)
}
// sort for consistent behavior and testing
sort.Strings(result)
slices.Reverse(result)
return result
}

func Execute(c Collector, s CopyStrategy, collectionArgs Args, hook shutdown.Hook, clusterCollection ...func([]string)) error {
start := time.Now().UTC()
outputLoc := collectionArgs.OutputLoc
Expand Down Expand Up @@ -117,10 +171,11 @@ func Execute(c Collector, s CopyStrategy, collectionArgs Args, hook shutdown.Hoo
return err
}

executors, err := c.GetExecutors()
executorsRaw, err := c.GetExecutors()
if err != nil {
return err
}
executors := FilterExecutors(executorsRaw, coordinators)

totalNodes := len(executors) + len(coordinators)
if totalNodes == 0 {
Expand Down
61 changes: 61 additions & 0 deletions cmd/root/collection/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 Dremio Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// collection package provides the interface for collection implementation and the actual collection execution
package collection

import "testing"

func TestFilterCoordinators(t *testing.T) {
t.Log("testing filtering duplicates")
firstItem := "192.168.1.20"
secondItem := "192.168.1.120"
filtered := FilterCoordinators([]string{firstItem, firstItem, secondItem})
expectedItems := 2
if len(filtered) != expectedItems {
t.Errorf("expected %v but got %v items", expectedItems, len(filtered))
}
t.Log("items are sorted in desc order by default")
if filtered[0] != firstItem {
t.Errorf("expected %v but got %v", firstItem, filtered[1])
}
if filtered[1] != secondItem {
t.Errorf("expected %v but got %v", secondItem, filtered[0])
}
}

func TestFilterExecutors(t *testing.T) {
t.Log("testing filtering duplicates")
firstItem := "192.168.1.20"
secondItem := "192.168.1.120"
filtered := FilterExecutors([]string{firstItem, firstItem, secondItem}, []string{})
expectedItems := 2
if len(filtered) != expectedItems {
t.Errorf("expected %v but got %v items", expectedItems, len(filtered))
}
t.Log("items are sorted in desc order by default")
if filtered[0] != firstItem {
t.Errorf("expected %v but got %v", firstItem, filtered[1])
}
if filtered[1] != secondItem {
t.Errorf("expected %v but got %v", secondItem, filtered[0])
}

t.Logf("now verify filter out coordinators")
filtered = FilterExecutors([]string{firstItem, firstItem, secondItem}, []string{firstItem, secondItem})
expectedItems = 0
if len(filtered) != expectedItems {
t.Errorf("expected %v but got %v items", expectedItems, len(filtered))
}
}
25 changes: 24 additions & 1 deletion pkg/consoleprint/consoleprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"sync"
"time"

"github.com/dremio/dremio-diagnostic-collector/v3/pkg/strutils"
)

// NodeCaptureStats represents stats for a node capture.
Expand Down Expand Up @@ -54,6 +56,7 @@ type CollectionStats struct {
patSet bool
startTime int64
endTime int64
warnings []string
mu sync.RWMutex // Mutex to protect access
}

Expand Down Expand Up @@ -157,6 +160,16 @@ func UpdateCollectionMode(collectionMode string) {
c.collectionMode = collectionMode
}

// AddWarningToConsole adds a trimed string to the list of warnings
// lines after the first line are also trimmed
func AddWarningToConsole(warning string) {
c.mu.Lock()
defer c.mu.Unlock()
tokens := strings.Split(warning, "\n")
trimmed := tokens[0]
c.warnings = append(c.warnings, strutils.TruncateString(trimmed, 120))
}

var c *CollectionStats

func init() {
Expand Down Expand Up @@ -315,6 +328,13 @@ func PrintState() {
if c.ddcVersion != "" {
ddcVersion = c.ddcVersion
}
var warningsBuilder strings.Builder
for i, w := range c.warnings {
_, err := warningsBuilder.WriteString(fmt.Sprintf("%v. %v\n", i+1, w))
if err != nil {
fmt.Printf("unable to write string %v: (%v)", w, err)
}
}
fmt.Printf(
`=================================
== Dremio Diagnostic Collector ==
Expand All @@ -338,10 +358,13 @@ Collect Duration : elapsed %v seconds
Tarball : %v
Result : %v
-- Warnings --
%v
%v
`, time.Now().Format(time.RFC1123), strings.TrimSpace(ddcVersion), c.ddcYaml, c.logFile, c.collectionType, strings.Join(c.enabled, ","), strings.Join(c.disabled, ","), strings.ToUpper(c.collectionMode), c.collectionArgs, patMessage, autodetectEnabled, c.TransfersComplete, total,
durationElapsed, c.tarball, c.result, nodes.String())
durationElapsed, c.tarball, c.result, warningsBuilder.String(), nodes.String())
c.mu.Unlock()

}
32 changes: 16 additions & 16 deletions pkg/simplelog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,42 +243,42 @@ func newLogger(f io.Writer, cleanup func()) *Logger {
}

func (l *Logger) Debug(format string) {
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(l.debugLogger.Output(2, trimmed), trimmed, "DEBUG")
}

func (l *Logger) Info(format string) {
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(l.infoLogger.Output(2, trimmed), trimmed, "INFO")
}

func (l *Logger) Warning(format string) {
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(l.warningLogger.Output(2, trimmed), trimmed, "WARNING")
}

func (l *Logger) Error(format string) {
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(l.errorLogger.Output(2, trimmed), trimmed, "ERROR")
}

func (l *Logger) Debugf(format string, v ...interface{}) {
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(l.debugLogger.Output(2, msg), msg, "DEBUGF")
}

func (l *Logger) Infof(format string, v ...interface{}) {
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(l.infoLogger.Output(2, msg), msg, "INFOF")
}

func (l *Logger) Warningf(format string, v ...interface{}) {
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(l.warningLogger.Output(2, msg), msg, "WARNINGF")
}

func (l *Logger) Errorf(format string, v ...interface{}) {
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(l.errorLogger.Output(2, msg), msg, "ERRORF")
}

Expand All @@ -287,56 +287,56 @@ func (l *Logger) Errorf(format string, v ...interface{}) {
func Debug(format string) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(logger.debugLogger.Output(2, trimmed), trimmed, "DEBUG")
}

func Info(format string) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(logger.infoLogger.Output(2, trimmed), trimmed, "INFO")
}

func Warning(format string) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(logger.warningLogger.Output(2, trimmed), trimmed, "WARNING")
}

func Error(format string) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
trimmed := strutils.LimitString(format, msgMax)
trimmed := strutils.GetEndOfString(format, msgMax)
handleLogError(logger.errorLogger.Output(2, trimmed), trimmed, "ERROR")
}

func Debugf(format string, v ...interface{}) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(logger.debugLogger.Output(2, msg), msg, "DEBUGF")
}

func Infof(format string, v ...interface{}) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(logger.infoLogger.Output(2, msg), msg, "INFOF")
}

func Warningf(format string, v ...interface{}) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(logger.warningLogger.Output(2, msg), msg, "WARNINGF")
}

func Errorf(format string, v ...interface{}) {
ddcLogMut.Lock()
defer ddcLogMut.Unlock()
msg := strutils.LimitString(fmt.Sprintf(format, v...), msgMax)
msg := strutils.GetEndOfString(fmt.Sprintf(format, v...), msgMax)
handleLogError(logger.errorLogger.Output(2, msg), msg, "ERRORF")
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/strutils/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"unicode/utf8"
)

func LimitString(s string, maxLength int) string {
func GetEndOfString(s string, maxLength int) string {
max := 0
if maxLength > 0 {
max = maxLength
Expand All @@ -29,12 +29,24 @@ func LimitString(s string, maxLength int) string {
return s
}

// Truncate the string to the desired length
// get the end of the string up to desired length
runes := []rune(s)
truncatedRunes := runes[len(runes)-max:]
return string(truncatedRunes)
}

func TruncateString(s string, maxLength int) string {
max := 0
if maxLength > 0 {
max = maxLength
}
// Check if the string is already within the desired length
if utf8.RuneCountInString(s) <= max {
return s
}
return s[:max]
}

func GetLastLine(s string) string {
index := strings.LastIndex(s, "\n")
if index == -1 {
Expand Down
Loading

0 comments on commit 2f91918

Please sign in to comment.