diff --git a/pkg/supportbundle/collect.go b/pkg/supportbundle/collect.go index d77989398..cb3f508a2 100644 --- a/pkg/supportbundle/collect.go +++ b/pkg/supportbundle/collect.go @@ -8,6 +8,8 @@ import ( "io" "os" "reflect" + "strings" + "sync" "time" "github.com/pkg/errors" @@ -20,44 +22,60 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "golang.org/x/sync/errgroup" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + kuberneteserrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +const ( + selectorLabelKey = "ds-selector-label" + selectorLabelValue = "remote-host-collector" + defaultTimeout = 30 ) func runHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, additionalRedactors *troubleshootv1beta2.Redactor, bundlePath string, opts SupportBundleCreateOpts) (collect.CollectorResult, error) { - collectSpecs := append([]*troubleshootv1beta2.HostCollect{}, hostCollectors...) - collectedData := make(map[string][]byte) - if opts.RunHostCollectorsInPod { - if err := checkRemoteCollectorRBAC(ctx, opts.KubernetesRestConfig, "Remote Host Collectors", opts.Namespace); err != nil { - if rbacErr, ok := err.(*RBACPermissionError); ok { - for _, forbiddenErr := range rbacErr.Forbidden { - opts.ProgressChan <- forbiddenErr - } + var err error + var collectResult map[string][]byte - if !opts.CollectWithoutPermissions { - return nil, collect.ErrInsufficientPermissionsToRun - } - } else { - return nil, err - } - } - if err := collectRemoteHost(ctx, collectSpecs, bundlePath, opts, collectedData); err != nil { - return nil, err + if opts.RunHostCollectorsInPod { + collectResult, err = runRemoteHostCollectors(ctx, hostCollectors, bundlePath, opts) + if err != nil { + return collectResult, err } } else { - if err := collectHost(ctx, collectSpecs, bundlePath, opts, collectedData); err != nil { - return nil, err - } + collectResult = runLocalHostCollectors(ctx, hostCollectors, bundlePath, opts) + } + + // redact result if any + globalRedactors := []*troubleshootv1beta2.Redact{} + if additionalRedactors != nil { + globalRedactors = additionalRedactors.Spec.Redactors } if opts.Redact { - globalRedactors := getGlobalRedactors(additionalRedactors) - if err := redactResults(ctx, bundlePath, collectedData, globalRedactors); err != nil { - return collectedData, err + _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, "Host collectors") + span.SetAttributes(attribute.String("type", "Redactors")) + err := collect.RedactResult(bundlePath, collectResult, globalRedactors) + if err != nil { + err = errors.Wrap(err, "failed to redact host collector results") + span.SetStatus(codes.Error, err.Error()) + return collectResult, err } + span.End() } - return collectedData, nil + return collectResult, nil } func runCollectors(ctx context.Context, collectors []*troubleshootv1beta2.Collect, additionalRedactors *troubleshootv1beta2.Redactor, bundlePath string, opts SupportBundleCreateOpts) (collect.CollectorResult, error) { @@ -207,147 +225,391 @@ func getAnalysisFile(analyzeResults []*analyze.AnalyzeResult) (io.Reader, error) return bytes.NewBuffer(analysis), nil } -// collectRemoteHost runs remote host collectors sequentially -func collectRemoteHost(ctx context.Context, collectSpecs []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts, collectedData map[string][]byte) error { - opts.KubernetesRestConfig.QPS = constants.DEFAULT_CLIENT_QPS - opts.KubernetesRestConfig.Burst = constants.DEFAULT_CLIENT_BURST - opts.KubernetesRestConfig.UserAgent = fmt.Sprintf("%s/%s", constants.DEFAULT_CLIENT_USER_AGENT, version.Version()) +func runLocalHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) map[string][]byte { + collectSpecs := make([]*troubleshootv1beta2.HostCollect, 0) + collectSpecs = append(collectSpecs, hostCollectors...) - if err := saveNodeList(opts, bundlePath); err != nil { - return err - } + allCollectedData := make(map[string][]byte) - // Run remote collectors sequentially - for _, spec := range collectSpecs { - collector, ok := collect.GetHostCollector(spec, bundlePath) - if !ok { - opts.ProgressChan <- "Host collector not found" - continue + var collectors []collect.HostCollector + for _, desiredCollector := range collectSpecs { + collector, ok := collect.GetHostCollector(desiredCollector, bundlePath) + if ok { + collectors = append(collectors, collector) } + } - // Start a span for tracing + for _, collector := range collectors { + // TODO: Add context to host collectors _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) span.SetAttributes(attribute.String("type", reflect.TypeOf(collector).String())) isExcluded, _ := collector.IsExcluded() if isExcluded { - msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title()) - opts.CollectorProgressCallback(opts.ProgressChan, msg) + opts.ProgressChan <- fmt.Sprintf("[%s] Excluding host collector", collector.Title()) span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) span.End() continue } - // Send progress event: starting the collector - msg := fmt.Sprintf("[%s] Running host collector...", collector.Title()) - opts.CollectorProgressCallback(opts.ProgressChan, msg) - - // Parameters for remote collection - params := &collect.RemoteCollectParams{ - ProgressChan: opts.ProgressChan, - HostCollector: spec, - BundlePath: bundlePath, - ClientConfig: opts.KubernetesRestConfig, - Image: "replicated/troubleshoot:latest", - PullPolicy: "IfNotPresent", - Timeout: time.Duration(60 * time.Second), - LabelSelector: "", - NamePrefix: "host-remote", - Namespace: "default", - Title: collector.Title(), - } - - // Perform the collection - result, err := collect.RemoteHostCollect(ctx, *params) + opts.ProgressChan <- fmt.Sprintf("[%s] Running host collector...", collector.Title()) + result, err := collector.Collect(opts.ProgressChan) if err != nil { span.SetStatus(codes.Error, err.Error()) - msg = fmt.Sprintf("[%s] Error: %v", collector.Title(), err) - opts.CollectorProgressCallback(opts.ProgressChan, msg) - return errors.Wrap(err, "failed to run remote host collector") + opts.ProgressChan <- errors.Errorf("failed to run host collector: %s: %v", collector.Title(), err) } - - // Send progress event: completed successfully - msg = fmt.Sprintf("[%s] Completed host collector", collector.Title()) - opts.CollectorProgressCallback(opts.ProgressChan, msg) - - // Aggregate the results + span.End() for k, v := range result { - collectedData[k] = v + allCollectedData[k] = v } + } - span.End() + return allCollectedData +} + +// getExecOutputs executes `collect -` with collector data passed to stdin and returns stdout, stderr and error +func getExecOutputs( + ctx context.Context, clientConfig *rest.Config, client *kubernetes.Clientset, pod corev1.Pod, collectorData []byte, +) ([]byte, []byte, error) { + container := pod.Spec.Containers[0].Name + + req := client.CoreV1().RESTClient().Post().Resource("pods").Name(pod.Name).Namespace(pod.Namespace).SubResource("exec") + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return nil, nil, err } - return nil + + parameterCodec := runtime.NewParameterCodec(scheme) + req.VersionedParams(&corev1.PodExecOptions{ + Command: []string{"/troubleshoot/collect", "-", "--chroot", "/host", "--format", "raw"}, + Container: container, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + }, parameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(clientConfig, "POST", req.URL()) + if err != nil { + return nil, nil, err + } + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: bytes.NewBuffer(collectorData), + Stdout: stdout, + Stderr: stderr, + Tty: false, + }) + + if err != nil { + return stdout.Bytes(), stderr.Bytes(), err + } + return stdout.Bytes(), stderr.Bytes(), nil } -// collectHost runs host collectors sequentially -func collectHost(ctx context.Context, collectSpecs []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts, collectedData map[string][]byte) error { - // Run local collectors sequentially - for _, spec := range collectSpecs { - collector, ok := collect.GetHostCollector(spec, bundlePath) +func runRemoteHostCollectors(ctx context.Context, hostCollectors []*troubleshootv1beta2.HostCollect, bundlePath string, opts SupportBundleCreateOpts) (map[string][]byte, error) { + output := collect.NewResult() + + clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) + if err != nil { + return nil, err + } + + // TODO: rbac check + + // create remote pod for each node + labels := map[string]string{ + "troubleshoot.sh/remote-collector": "true", + } + + var mu sync.Mutex + nodeLogs := make(map[string]map[string][]byte) + + ds, err := createHostCollectorDS(ctx, clientset, labels, "default") + if err != nil { + return nil, err + } + + // wait for at least one pod to be scheduled + err = waitForDS(ctx, clientset, ds) + if err != nil { + return nil, err + } + + klog.V(2).Infof("Created Remote Host Collector Daemonset %s", ds.Name) + pods, err := clientset.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selectorLabelKey + "=" + ds.Name, + // use the default logs collector timeout for now + TimeoutSeconds: ptr.To(int64(defaultTimeout)), + Limit: 0, + }) + if err != nil { + return nil, err + } + + var eg errgroup.Group + + if err := saveNodeList(output, opts, bundlePath); err != nil { + return nil, err + } + + for _, collectorSpec := range hostCollectors { + collector, ok := collect.GetHostCollector(collectorSpec, bundlePath) if !ok { opts.ProgressChan <- "Host collector not found" continue } - // Send progress event: starting the collector - opts.ProgressChan <- fmt.Sprintf("[%s] Running host collector...", collector.Title()) - // Start a span for tracing _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, collector.Title()) - span.SetAttributes(attribute.String("type", reflect.TypeOf(collector).String())) + span.SetAttributes(attribute.String("type", "Collect")) isExcluded, _ := collector.IsExcluded() if isExcluded { - opts.ProgressChan <- fmt.Sprintf("[%s] Excluding host collector", collector.Title()) + msg := fmt.Sprintf("[%s] Excluding host collector", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) span.SetAttributes(attribute.Bool(constants.EXCLUDED, true)) span.End() continue } - // Run local collector sequentially - result, err := collector.Collect(opts.ProgressChan) + // Send progress event: starting the collector + msg := fmt.Sprintf("[%s] Running host collector...", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + + // convert host collectors into a HostCollector spec + spec := createHostCollectorsSpec([]*troubleshootv1beta2.HostCollect{collectorSpec}) + specJSON, err := json.Marshal(spec) if err != nil { - span.SetStatus(codes.Error, err.Error()) - opts.ProgressChan <- fmt.Sprintf("[%s] Error: %v", collector.Title(), err) + return nil, err } + klog.V(2).Infof("HostCollector spec: %s", specJSON) + for _, pod := range pods.Items { + eg.Go(func() error { + if err := waitForPodRunning(ctx, clientset, &pod); err != nil { + return err + } - // Send progress event: completed successfully - opts.ProgressChan <- fmt.Sprintf("[%s] Completed host collector", collector.Title()) + stdout, _, err := getExecOutputs(ctx, opts.KubernetesRestConfig, clientset, pod, specJSON) + if err != nil { + // span.SetStatus(codes.Error, err.Error()) + msg := fmt.Sprintf("[%s] Error: %v", collector.Title(), err) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + return errors.Wrap(err, "failed to run remote host collector") + } - // Aggregate the results - for k, v := range result { - collectedData[k] = v + result := map[string]string{} + if err := json.Unmarshal(stdout, &result); err != nil { + return err + } + + // Send progress event: completed successfully + msg = fmt.Sprintf("[%s] Completed host collector", collector.Title()) + opts.CollectorProgressCallback(opts.ProgressChan, msg) + + // Aggregate the results + mu.Lock() + for file, data := range result { + if nodeLogs[pod.Spec.NodeName] == nil { + nodeLogs[pod.Spec.NodeName] = make(map[string][]byte) + } + nodeLogs[pod.Spec.NodeName][file] = []byte(data) + } + mu.Unlock() + return nil + }) } + err = eg.Wait() + if err != nil { + return nil, err + } span.End() } - return nil + + klog.V(2).Infof("All remote host collectors completed") + + defer func() { + // TODO: + // delete the config map + // delete the remote pods + // check if the daemonset still exists + if ds == nil || ds.Name == "" { + return + } + + if err := clientset.AppsV1().DaemonSets(ds.Namespace).Delete(ctx, ds.Name, metav1.DeleteOptions{}); err != nil { + if kuberneteserrors.IsNotFound(err) { + klog.Errorf("Remote host collector daemonset %s not found", ds.Name) + } else { + klog.Errorf("Failed to delete remote host collector daemonset %s: %v", ds.Name, err) + } + return + } + }() + + for node, logs := range nodeLogs { + for file, data := range logs { + // trim host-collectors/ prefix + file = strings.TrimPrefix(file, "host-collectors/") + err := output.SaveResult(bundlePath, fmt.Sprintf("host-collectors/%s/%s", node, file), bytes.NewBuffer(data)) + if err != nil { + // TODO: error handling + return nil, err + } + } + } + + return output, nil +} + +func createHostCollectorsSpec(hostCollectors []*troubleshootv1beta2.HostCollect) *troubleshootv1beta2.HostCollector { + return &troubleshootv1beta2.HostCollector{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "troubleshoot.sh/v1beta2", + Kind: "HostCollector", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "remoteHostCollector", + }, + Spec: troubleshootv1beta2.HostCollectorSpec{ + Collectors: hostCollectors, + }, + } } -func redactResults(ctx context.Context, bundlePath string, collectedData collect.CollectorResult, redactors []*troubleshootv1beta2.Redact) error { - _, span := otel.Tracer(constants.LIB_TRACER_NAME).Start(ctx, "Host collectors") - defer span.End() +func createHostCollectorDS(ctx context.Context, clientset kubernetes.Interface, labels map[string]string, ns string) (*appsv1.DaemonSet, error) { + dsName := names.SimpleNameGenerator.GenerateName("remote-host-collector" + "-") + imageName := "replicated/troubleshoot:latest" + imagePullPolicy := corev1.PullIfNotPresent + + labels[selectorLabelKey] = dsName + + podSpec := corev1.PodSpec{ + HostNetwork: true, + HostPID: true, + HostIPC: true, + Containers: []corev1.Container{ + { + Image: imageName, + ImagePullPolicy: imagePullPolicy, + Name: "remote-collector", + Command: []string{"tail", "-f", "/dev/null"}, + SecurityContext: &corev1.SecurityContext{ + Privileged: ptr.To(true), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "host-root", + MountPath: "/host", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "host-root", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/", + }, + }, + }, + }, + } + + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: dsName, + Namespace: ns, + Labels: labels, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: selectorLabelKey, + Operator: "In", + Values: []string{dsName}, + }, + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: podSpec, + }, + }, + } + + createdDS, err := clientset.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) - err := collect.RedactResult(bundlePath, collectedData, redactors) if err != nil { - span.SetStatus(codes.Error, err.Error()) - return errors.Wrap(err, "failed to redact host collector results") + return nil, errors.Wrap(err, "failed to create Remote Host Collector Pod") } - return nil + + return createdDS, nil } -// getGlobalRedactors returns the global redactors from the support bundle spec -func getGlobalRedactors(additionalRedactors *troubleshootv1beta2.Redactor) []*troubleshootv1beta2.Redact { - if additionalRedactors != nil { - return additionalRedactors.Spec.Redactors +func waitForPodRunning(ctx context.Context, clientset kubernetes.Interface, pod *corev1.Pod) error { + timeoutCh := time.After(defaultTimeout * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeoutCh: + return fmt.Errorf("timed out waiting for pod %s to be running", pod.Name) + case <-ticker.C: + currentPod, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", pod.Name, err) + } + + // Check container status + for _, containerStatus := range currentPod.Status.ContainerStatuses { + if containerStatus.Name == "remote-collector" { + if containerStatus.State.Running != nil && containerStatus.State.Terminated == nil && containerStatus.Ready { + return nil + } + } + } + } } - return []*troubleshootv1beta2.Redact{} } -func saveNodeList(opts SupportBundleCreateOpts, bundlePath string) error { - result := make(collect.CollectorResult) +func waitForDS(ctx context.Context, clientset kubernetes.Interface, ds *appsv1.DaemonSet) error { + timeoutCh := time.After(defaultTimeout * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeoutCh: + return fmt.Errorf("timed out waiting for DaemonSet %s to be ready", ds.Name) + case <-ticker.C: + currentDS, err := clientset.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get DaemonSet %s: %w", ds.Name, err) + } + + if currentDS.Status.NumberReady > 0 && currentDS.Status.DesiredNumberScheduled == currentDS.Status.NumberReady { + return nil + } + } + } +} +func saveNodeList(result collect.CollectorResult, opts SupportBundleCreateOpts, bundlePath string) error { clientset, err := kubernetes.NewForConfig(opts.KubernetesRestConfig) if err != nil { return errors.Wrap(err, "failed to create kubernetes clientset to run host collectors in pod") diff --git a/test/e2e/support-bundle/host_remote_collector_e2e_test.go b/test/e2e/support-bundle/host_remote_collector_e2e_test.go index f8538e254..a4a2346a1 100644 --- a/test/e2e/support-bundle/host_remote_collector_e2e_test.go +++ b/test/e2e/support-bundle/host_remote_collector_e2e_test.go @@ -16,12 +16,14 @@ func TestHostRemoteCollector(t *testing.T) { feature := features.New("Host OS Remote Collector Test"). Assess("run support bundle command successfully", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { var out bytes.Buffer + var errOut bytes.Buffer supportbundleName := "host-os-remote-collector" cmd := exec.CommandContext(ctx, sbBinary(), "spec/remoteHostCollectors.yaml", "--interactive=false", fmt.Sprintf("-o=%s", supportbundleName)) cmd.Stdout = &out + cmd.Stderr = &errOut err := cmd.Run() if err != nil { - t.Fatalf("Failed to run the binary: %v", err) + t.Fatalf("Failed to run the binary: %v\n%s", err, errOut.String()) } defer func() {