Skip to content

Commit

Permalink
use per collector per pod
Browse files Browse the repository at this point in the history
  • Loading branch information
DexterYan committed Nov 5, 2024
1 parent c983127 commit f2e0a23
Showing 1 changed file with 44 additions and 49 deletions.
93 changes: 44 additions & 49 deletions pkg/supportbundle/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,45 +352,43 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot
}

var eg errgroup.Group
for _, pod := range pods.Items {
eg.Go(func() error {
// TODO: set timeout waiting
if err := waitForPodRunning(ctx, clientset, &pod); err != nil {
return err
}

results := map[string][]byte{}
for _, collectorSpec := range hostCollectors {
collector, ok := collect.GetHostCollector(collectorSpec, bundlePath)
if !ok {
opts.ProgressChan <- "Host collector not found"
continue
}
for _, collectorSpec := range hostCollectors {
collector, ok := collect.GetHostCollector(collectorSpec, bundlePath)
if !ok {
opts.ProgressChan <- "Host collector not found"
continue
}

// Start a span for tracing
_, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title())
span.SetAttributes(attribute.String("type", "Collect"))
// Start a span for tracing
_, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title())
span.SetAttributes(attribute.String("type", "Collect"))

isExcluded, _ := collector.IsExcluded()
if isExcluded {
msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title())
opts.CollectorProgressCallback(opts.ProgressChan, msg)
span.SetAttributes(attribute.Bool(constants.EXCLUDED, true))
span.End()
continue
}
isExcluded, _ := collector.IsExcluded()
if isExcluded {
msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title())
opts.CollectorProgressCallback(opts.ProgressChan, msg)
span.SetAttributes(attribute.Bool(constants.EXCLUDED, true))
span.End()
continue
}

// Send progress event: starting the collector
msg := fmt.Sprintf("[%s] Running host collector...", collector.Title())
opts.CollectorProgressCallback(opts.ProgressChan, msg)
// Send progress event: starting the collector
msg := fmt.Sprintf("[%s] Running host collector...", collector.Title())
opts.CollectorProgressCallback(opts.ProgressChan, msg)

// convert host collectors into a HostCollector spec
spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec})
specJSON, err := json.Marshal(spec)
if err != nil {
// convert host collectors into a HostCollector spec
spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec})
specJSON, err := json.Marshal(spec)
if err != nil {
return nil, err
}
klog.V(2).Infof("HostCollector spec: %s", specJSON)
for _, pod := range pods.Items {
eg.Go(func() error {
if err := waitForPodRunning(ctx, clientset, &pod); err != nil {
return err
}
klog.V(2).Infof("HostCollector spec: %s", specJSON)

stdout, _, err := getExecOutputs(ctx, opts.KubernetesRestConfig, clientset, pod, specJSON)
if err != nil {
Expand All @@ -410,26 +408,23 @@ func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshoot
opts.CollectorProgressCallback(opts.ProgressChan, msg)

// Aggregate the results
mu.Lock()
for file, data := range result {
results[file] = []byte(data)
if nodeLogs[pod.Spec.NodeName] == nil {
nodeLogs[pod.Spec.NodeName] = make(map[string][]byte)
}
nodeLogs[pod.Spec.NodeName][file] = []byte(data)
}
mu.Unlock()
return nil
})
}

span.End()
// time.Sleep(1 * time.Second)
}

// // wait for log stream to catch up
// time.Sleep(1 * time.Second)

mu.Lock()
nodeLogs[pod.Spec.NodeName] = results
mu.Unlock()
return nil
})
}
err = eg.Wait()
if err != nil {
return nil, err
err = eg.Wait()
if err != nil {
return nil, err
}
span.End()
}

klog.V(2).Infof("All remote host collectors completed")
Expand Down

0 comments on commit f2e0a23

Please sign in to comment.