diff --git a/pkg/kosmosctl/floater/check.go b/pkg/kosmosctl/floater/check.go index 3e098b84a..61e15027f 100644 --- a/pkg/kosmosctl/floater/check.go +++ b/pkg/kosmosctl/floater/check.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" @@ -45,6 +46,11 @@ type CommandCheckOptions struct { SrcFloater *Floater DstFloater *Floater + + routinesMaxNum int + routineInfoChan chan routineInfo + waitGroup sync.WaitGroup + resultDataChan chan *PrintCheckData } type PrintCheckData struct { @@ -54,6 +60,12 @@ type PrintCheckData struct { TargetIP string } +type routineInfo struct { + IInfo *FloatInfo + JInfo *FloatInfo + routineIp string +} + func NewCmdCheck() *cobra.Command { o := &CommandCheckOptions{ Version: version.GetReleaseVersion().PatchRelease(), @@ -92,6 +104,11 @@ func NewCmdCheck() *cobra.Command { flags.StringVar(&o.Port, "port", "8889", "Port used by floater.") flags.IntVarP(&o.PodWaitTime, "pod-wait-time", "w", 30, "Time for wait pod(floater) launch.") flags.StringVar(&o.Protocol, "protocol", string(TCP), "Protocol for the network problem.") + flags.IntVarP(&o.routinesMaxNum, "routines-max-number", "", 5, "Number of goroutines to use.") + + o.routineInfoChan = make(chan routineInfo, o.routinesMaxNum) + o.waitGroup = sync.WaitGroup{} + o.resultDataChan = make(chan *PrintCheckData, o.routinesMaxNum) return cmd } @@ -197,63 +214,112 @@ func (o *CommandCheckOptions) Run() error { } func (o *CommandCheckOptions) RunRange(iPodInfos []*FloatInfo, jPodInfos []*FloatInfo) []*PrintCheckData { - var resultData []*PrintCheckData + var resultDatas []*PrintCheckData if len(iPodInfos) > 0 && len(jPodInfos) > 0 { - for _, iPodInfo := range iPodInfos { - for _, jPodInfo := range jPodInfos { - for _, ip := range jPodInfo.PodIPs { - var targetIP string - var err error - var cmdResult *command.Result - if o.DstFloater != nil { - targetIP, err = netmap.NetMap(ip, o.DstFloater.CIDRsMap) - } else { - targetIP = ip - } - if err != nil { - cmdResult = command.ParseError(err) - } else { - // ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized - cmdObj := &command.Ping{ - TargetIP: targetIP, - } - cmdResult = o.SrcFloater.CommandExec(iPodInfo, cmdObj) - } - resultData = append(resultData, &PrintCheckData{ - *cmdResult, - iPodInfo.NodeName, jPodInfo.NodeName, targetIP, - }) - } + go o.toRoutineInfoChan(iPodInfos, jPodInfos) + } + + for i := 0; i < o.routinesMaxNum; i++ { + o.waitGroup.Add(1) + go o.checkRange() + } + + go func() { + o.waitGroup.Wait() + close(o.resultDataChan) + }() + + for resultData := range o.resultDataChan { + resultDatas = append(resultDatas, resultData) + } + return resultDatas +} + +func (o *CommandCheckOptions) checkRange() { + defer o.waitGroup.Done() + for routineInfo := range o.routineInfoChan { + var targetIP string + var err error + var cmdResult *command.Result + if o.DstFloater != nil { + targetIP, err = netmap.NetMap(routineInfo.routineIp, o.DstFloater.CIDRsMap) + } else { + targetIP = routineInfo.routineIp + } + if err != nil { + cmdResult = command.ParseError(err) + } else { + // ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized + cmdObj := &command.Ping{ + TargetIP: targetIP, } + cmdResult = o.SrcFloater.CommandExec(routineInfo.IInfo, cmdObj) + } + resultData := &PrintCheckData{ + *cmdResult, + routineInfo.IInfo.NodeName, routineInfo.JInfo.NodeName, targetIP, } + o.resultDataChan <- resultData } +} - return resultData +func (o *CommandCheckOptions) toRoutineInfoChan(iInfos []*FloatInfo, jInfos []*FloatInfo) { + for _, iInfo := range iInfos { + for _, jInfo := range jInfos { + for _, ip := range jInfo.NodeIPs { + routineIInfo := iInfo + routineJInfo := jInfo + routineIp := ip + info := routineInfo{ + IInfo: routineIInfo, + JInfo: routineJInfo, + routineIp: routineIp, + } + o.routineInfoChan <- info + } + } + } + close(o.routineInfoChan) } func (o *CommandCheckOptions) RunNative(iNodeInfos []*FloatInfo, jNodeInfos []*FloatInfo) []*PrintCheckData { - var resultData []*PrintCheckData + var resultDatas []*PrintCheckData if len(iNodeInfos) > 0 && len(jNodeInfos) > 0 { - for _, iNodeInfo := range iNodeInfos { - for _, jNodeInfo := range jNodeInfos { - for _, ip := range jNodeInfo.NodeIPs { - // ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized - cmdObj := &command.Ping{ - TargetIP: ip, - } - cmdResult := o.SrcFloater.CommandExec(iNodeInfo, cmdObj) - resultData = append(resultData, &PrintCheckData{ - *cmdResult, - iNodeInfo.NodeName, jNodeInfo.NodeName, ip, - }) - } - } - } + go o.toRoutineInfoChan(iNodeInfos, jNodeInfos) } - return resultData + for i := 0; i < o.routinesMaxNum; i++ { + o.waitGroup.Add(1) + go o.checkNative() + } + + go func() { + o.waitGroup.Wait() + close(o.resultDataChan) + }() + + for resultData := range o.resultDataChan { + resultDatas = append(resultDatas, resultData) + } + return resultDatas +} + +func (o *CommandCheckOptions) checkNative() { + defer o.waitGroup.Done() + for routineInfo := range o.routineInfoChan { + // ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized + cmdObj := &command.Ping{ + TargetIP: routineInfo.routineIp, + } + cmdResult := o.SrcFloater.CommandExec(routineInfo.IInfo, cmdObj) + resultData := &PrintCheckData{ + *cmdResult, + routineInfo.IInfo.NodeName, routineInfo.JInfo.NodeName, routineInfo.routineIp, + } + o.resultDataChan <- resultData + } } func (o *CommandCheckOptions) PrintResult(resultData []*PrintCheckData) {