From 17d254b70973d50edc6860768889830f75b87fa1 Mon Sep 17 00:00:00 2001 From: Marcin Maciaszczyk Date: Thu, 5 Dec 2024 11:58:39 +0100 Subject: [PATCH] remove exec, logs and portforward code --- pkg/kubernetes/exec/exec.go | 91 ------------ pkg/kubernetes/logs/logs.go | 119 ---------------- pkg/kubernetes/logs/logsforobject.go | 158 --------------------- pkg/kubernetes/portforward/helpers.go | 161 ---------------------- pkg/kubernetes/portforward/portforward.go | 92 ------------- pkg/kubernetes/utils/utils.go | 63 --------- 6 files changed, 684 deletions(-) delete mode 100644 pkg/kubernetes/exec/exec.go delete mode 100644 pkg/kubernetes/logs/logs.go delete mode 100644 pkg/kubernetes/logs/logsforobject.go delete mode 100644 pkg/kubernetes/portforward/helpers.go delete mode 100644 pkg/kubernetes/portforward/portforward.go delete mode 100644 pkg/kubernetes/utils/utils.go diff --git a/pkg/kubernetes/exec/exec.go b/pkg/kubernetes/exec/exec.go deleted file mode 100644 index d0b5de625..000000000 --- a/pkg/kubernetes/exec/exec.go +++ /dev/null @@ -1,91 +0,0 @@ -package exec - -import ( - "context" - "fmt" - "io" - "net/url" - "os" - - "github.com/pluralsh/plural-cli/pkg/kubernetes" - "github.com/pluralsh/plural-cli/pkg/kubernetes/utils" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/client-go/tools/remotecommand" - "k8s.io/kubectl/pkg/cmd/util/podcmd" - "k8s.io/kubectl/pkg/scheme" - "k8s.io/kubectl/pkg/util/term" -) - -func Exec(namespace, resource string, commands []string) error { - obj, pod, err := utils.GetPodWithObject(namespace, resource) - if err != nil { - return err - } - if meta.IsListType(obj) { - return fmt.Errorf("cannot exec into multiple objects at a time") - } - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - return fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase) - } - - container, err := podcmd.FindOrDefaultContainerByName(pod, "", true, os.Stderr) - if err != nil { - return err - } - - t := setupTTY() - sizeQueue := t.MonitorSize(t.GetSize()) - - kube, err := kubernetes.Kubernetes() - if err != nil { - return err - } - - fn := func() error { - req := kube.GetRestClient().Post(). - Resource("pods"). - Name(pod.Name). - Namespace(pod.Namespace). - SubResource("exec") - req.VersionedParams(&corev1.PodExecOptions{ - Container: container.Name, - Command: commands, - Stdin: true, - Stdout: true, - TTY: t.Raw, - }, scheme.ParameterCodec) - - return execute("POST", req.URL(), os.Stdin, os.Stdout, os.Stderr, t.Raw, sizeQueue) - } - - return t.Safe(fn) -} - -func execute(method string, url *url.URL, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - config, err := kubernetes.KubeConfig() - if err != nil { - return err - } - exec, err := remotecommand.NewSPDYExecutor(config, method, url) - if err != nil { - return err - } - return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Tty: tty, - TerminalSizeQueue: terminalSizeQueue, - }) -} - -func setupTTY() term.TTY { - t := term.TTY{ - Out: os.Stdout, - In: os.Stdin, - Raw: true, - } - - return t -} diff --git a/pkg/kubernetes/logs/logs.go b/pkg/kubernetes/logs/logs.go deleted file mode 100644 index 388db01d8..000000000 --- a/pkg/kubernetes/logs/logs.go +++ /dev/null @@ -1,119 +0,0 @@ -package logs - -import ( - "bufio" - "context" - "fmt" - "io" - "os" - "sync" - "time" - - corev1 "k8s.io/api/core/v1" - "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/rest" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - "k8s.io/kubectl/pkg/scheme" -) - -const ( - defaultPodLogsTimeout = 20 * time.Second -) - -func Logs(namespace, resource string, tailLines int64) error { - matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag().WithDiscoveryBurst(300).WithDiscoveryQPS(50.0)) - f := cmdutil.NewFactory(matchVersionKubeConfigFlags) - - builder := f.NewBuilder(). - WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). - NamespaceParam(namespace).DefaultNamespace(). - SingleResourceType() - builder.ResourceNames("pods", resource) - infos, err := builder.Do().Infos() - if err != nil { - return err - } - if len(infos) != 1 { - return fmt.Errorf("expected a resource") - } - object := infos[0].Object - - options, err := logOptions(tailLines) - if err != nil { - return err - } - requests, err := logsForObject(object, options, defaultPodLogsTimeout, false) - if err != nil { - return err - } - if len(requests) > 1 { - return parallelConsumeRequest(requests) - } - - return sequentialConsumeRequest(requests) -} - -func parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error { - reader, writer := io.Pipe() - wg := &sync.WaitGroup{} - wg.Add(len(requests)) - for objRef, request := range requests { - go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) { - defer wg.Done() - if err := defaultConsumeRequest(request, os.Stdout); err != nil { - fmt.Fprintf(writer, "error: %v\n", err) - } - - }(objRef, request) - } - - go func() { - wg.Wait() - writer.Close() - }() - - _, err := io.Copy(os.Stdout, reader) - return err -} - -func sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error { - for _, request := range requests { - if err := defaultConsumeRequest(request, os.Stdout); err != nil { - return err - } - } - - return nil -} - -func logOptions(tailLines int64) (*corev1.PodLogOptions, error) { - logOptions := &corev1.PodLogOptions{ - Follow: true, - TailLines: &tailLines, - } - - return logOptions, nil -} - -func defaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error { - readCloser, err := request.Stream(context.TODO()) - if err != nil { - return err - } - defer readCloser.Close() - - r := bufio.NewReader(readCloser) - for { - bytes, err := r.ReadBytes('\n') - if _, err := out.Write(bytes); err != nil { - return err - } - - if err != nil { - if err != io.EOF { - return err - } - return nil - } - } -} diff --git a/pkg/kubernetes/logs/logsforobject.go b/pkg/kubernetes/logs/logsforobject.go deleted file mode 100644 index 3575af04e..000000000 --- a/pkg/kubernetes/logs/logsforobject.go +++ /dev/null @@ -1,158 +0,0 @@ -package logs - -import ( - "errors" - "fmt" - "os" - "sort" - "time" - - "github.com/pluralsh/plural-cli/pkg/kubernetes" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/reference" - "k8s.io/kubectl/pkg/cmd/util/podcmd" - "k8s.io/kubectl/pkg/polymorphichelpers" - "k8s.io/kubectl/pkg/scheme" - "k8s.io/kubectl/pkg/util/podutils" -) - -func logsForObject(object, options runtime.Object, timeout time.Duration, allContainers bool) (map[corev1.ObjectReference]rest.ResponseWrapper, error) { - config, err := kubernetes.KubeConfig() - if err != nil { - return nil, err - } - - clientset, err := corev1client.NewForConfig(config) - if err != nil { - return nil, err - } - return logsForObjectWithClient(clientset, object, options, timeout, allContainers) -} - -// this is split for easy test-ability -func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, options runtime.Object, timeout time.Duration, allContainers bool) (map[corev1.ObjectReference]rest.ResponseWrapper, error) { - opts, ok := options.(*corev1.PodLogOptions) - if !ok { - return nil, errors.New("provided options object is not a PodLogOptions") - } - - switch t := object.(type) { - case *corev1.PodList: - ret := make(map[corev1.ObjectReference]rest.ResponseWrapper) - for i := range t.Items { - currRet, err := logsForObjectWithClient(clientset, &t.Items[i], options, timeout, allContainers) - if err != nil { - return nil, err - } - for k, v := range currRet { - ret[k] = v - } - } - return ret, nil - - case *corev1.Pod: - return logsForPod(t, opts, clientset, timeout, allContainers) - } - - namespace, selector, err := polymorphichelpers.SelectorsForObject(object) - if err != nil { - return nil, fmt.Errorf("cannot get the logs from %T: %w", object, err) - } - - sortBy := func(pods []*corev1.Pod) sort.Interface { return podutils.ByLogging(pods) } - pod, numPods, err := polymorphichelpers.GetFirstPod(clientset, namespace, selector.String(), timeout, sortBy) - if err != nil { - return nil, err - } - if numPods > 1 { - fmt.Fprintf(os.Stderr, "Found %v pods, using pod/%v\n", numPods, pod.Name) - } - - return logsForObjectWithClient(clientset, pod, options, timeout, allContainers) -} - -func logsForPod(t *corev1.Pod, opts *corev1.PodLogOptions, clientset corev1client.CoreV1Interface, timeout time.Duration, allContainers bool) (map[corev1.ObjectReference]rest.ResponseWrapper, error) { - // if allContainers is true, then we're going to locate all containers and then iterate through them. At that point, "allContainers" is false - if !allContainers { - currOpts := new(corev1.PodLogOptions) - if opts != nil { - opts.DeepCopyInto(currOpts) - } - // in case the "kubectl.kubernetes.io/default-container" annotation is present, we preset the opts.Containers to default to selected - // container. This gives users ability to preselect the most interesting container in pod. - if annotations := t.GetAnnotations(); annotations != nil && currOpts.Container == "" { - var defaultContainer string - if len(annotations[podcmd.DefaultContainerAnnotationName]) > 0 { - defaultContainer = annotations[podcmd.DefaultContainerAnnotationName] - } - if len(defaultContainer) > 0 { - if exists, _ := podcmd.FindContainerByName(t, defaultContainer); exists == nil { - fmt.Fprintf(os.Stderr, "Default container name %q not found in pod %s\n", defaultContainer, t.Name) - } else { - currOpts.Container = defaultContainer - } - } - } - - if currOpts.Container == "" { - // Default to the first container name(aligning behavior with `kubectl exec'). - currOpts.Container = t.Spec.Containers[0].Name - if len(t.Spec.Containers) > 1 || len(t.Spec.InitContainers) > 0 || len(t.Spec.EphemeralContainers) > 0 { - fmt.Fprintf(os.Stderr, "Defaulted container %q out of: %s\n", currOpts.Container, podcmd.AllContainerNames(t)) - } - } - - container, fieldPath := podcmd.FindContainerByName(t, currOpts.Container) - if container == nil { - return nil, fmt.Errorf("container %s is not valid for pod %s", currOpts.Container, t.Name) - } - ref, err := reference.GetPartialReference(scheme.Scheme, t, fieldPath) - if err != nil { - return nil, fmt.Errorf("Unable to construct reference to '%#v': %w", t, err) - } - - ret := make(map[corev1.ObjectReference]rest.ResponseWrapper, 1) - ret[*ref] = clientset.Pods(t.Namespace).GetLogs(t.Name, currOpts) - return ret, nil - } - - ret := make(map[corev1.ObjectReference]rest.ResponseWrapper) - for _, c := range t.Spec.InitContainers { - currOpts := opts.DeepCopy() - currOpts.Container = c.Name - currRet, err := logsForObjectWithClient(clientset, t, currOpts, timeout, false) - if err != nil { - return nil, err - } - for k, v := range currRet { - ret[k] = v - } - } - for _, c := range t.Spec.Containers { - currOpts := opts.DeepCopy() - currOpts.Container = c.Name - currRet, err := logsForObjectWithClient(clientset, t, currOpts, timeout, false) - if err != nil { - return nil, err - } - for k, v := range currRet { - ret[k] = v - } - } - for _, c := range t.Spec.EphemeralContainers { - currOpts := opts.DeepCopy() - currOpts.Container = c.Name - currRet, err := logsForObjectWithClient(clientset, t, currOpts, timeout, false) - if err != nil { - return nil, err - } - for k, v := range currRet { - ret[k] = v - } - } - - return ret, nil -} diff --git a/pkg/kubernetes/portforward/helpers.go b/pkg/kubernetes/portforward/helpers.go deleted file mode 100644 index 792a08dca..000000000 --- a/pkg/kubernetes/portforward/helpers.go +++ /dev/null @@ -1,161 +0,0 @@ -package portforward - -import ( - "fmt" - "math" - "strconv" - "strings" - - "github.com/pluralsh/polly/containers" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubectl/pkg/util" -) - -// splitPort splits port string which is in form of [LOCAL PORT]:REMOTE PORT -// and returns local and remote ports separately -func splitPort(port string) (local, remote string) { - parts := strings.Split(port, ":") - if len(parts) == 2 { - return parts[0], parts[1] - } - - return parts[0], parts[0] -} - -// Translates service port to target port -// It rewrites ports as needed if the Service port declares targetPort. -// It returns an error when a named targetPort can't find a match in the pod, or the Service did not declare -// the port. -func translateServicePortToTargetPort(ports []string, svc corev1.Service, pod corev1.Pod) ([]string, error) { - var translated []string - for _, port := range ports { - localPort, remotePort := splitPort(port) - - portnum, err := strconv.Atoi(remotePort) - if err != nil { - svcPort, err := util.LookupServicePortNumberByName(svc, remotePort) - if err != nil { - return nil, err - } - portnum = int(svcPort) - - if localPort == remotePort { - localPort = strconv.Itoa(portnum) - } - } - if portnum > 0 && portnum <= math.MaxInt32 { - containerPort, err := util.LookupContainerPortNumberByServicePort(svc, pod, int32(portnum)) - if err != nil { - // can't resolve a named port, or Service did not declare this port, return an error - return nil, err - } - - // convert the resolved target port back to a string - remotePort = strconv.Itoa(int(containerPort)) - - if localPort != remotePort { - translated = append(translated, fmt.Sprintf("%s:%s", localPort, remotePort)) - } else { - translated = append(translated, remotePort) - } - } else { - return nil, fmt.Errorf("Incorrect conversion between integer types") - } - - } - return translated, nil -} - -// convertPodNamedPortToNumber converts named ports into port numbers -// It returns an error when a named port can't be found in the pod containers -func convertPodNamedPortToNumber(ports []string, pod corev1.Pod) ([]string, error) { - var converted []string - for _, port := range ports { - localPort, remotePort := splitPort(port) - - containerPortStr := remotePort - _, err := strconv.Atoi(remotePort) - if err != nil { - containerPort, err := util.LookupContainerPortNumberByName(pod, remotePort) - if err != nil { - return nil, err - } - - containerPortStr = strconv.Itoa(int(containerPort)) - } - - if localPort != remotePort { - converted = append(converted, fmt.Sprintf("%s:%s", localPort, containerPortStr)) - } else { - converted = append(converted, containerPortStr) - } - } - - return converted, nil -} - -func checkUDPPorts(udpOnlyPorts containers.Set[int], ports []string, obj metav1.Object) error { - for _, port := range ports { - _, remotePort := splitPort(port) - portNum, err := strconv.Atoi(remotePort) - if err != nil { - switch v := obj.(type) { - case *corev1.Service: - svcPort, err := util.LookupServicePortNumberByName(*v, remotePort) - if err != nil { - return err - } - portNum = int(svcPort) - - case *corev1.Pod: - ctPort, err := util.LookupContainerPortNumberByName(*v, remotePort) - if err != nil { - return err - } - portNum = int(ctPort) - - default: - return fmt.Errorf("unknown object: %v", obj) - } - } - if udpOnlyPorts.Has(portNum) { - return fmt.Errorf("UDP protocol is not supported for %s", remotePort) - } - } - return nil -} - -// checkUDPPortInService returns an error if remote port in Service is a UDP port -func checkUDPPortInService(ports []string, svc *corev1.Service) error { - udpPorts := containers.NewSet[int]() - tcpPorts := containers.NewSet[int]() - for _, port := range svc.Spec.Ports { - portNum := int(port.Port) - switch port.Protocol { - case corev1.ProtocolUDP: - udpPorts.Add(portNum) - case corev1.ProtocolTCP: - tcpPorts.Add(portNum) - } - } - return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, svc) -} - -// checkUDPPortInPod returns an error if remote port in Pod is a UDP port -func checkUDPPortInPod(ports []string, pod *corev1.Pod) error { - udpPorts := containers.NewSet[int]() - tcpPorts := containers.NewSet[int]() - for _, ct := range pod.Spec.Containers { - for _, ctPort := range ct.Ports { - portNum := int(ctPort.ContainerPort) - switch ctPort.Protocol { - case corev1.ProtocolUDP: - udpPorts.Add(portNum) - case corev1.ProtocolTCP: - tcpPorts.Add(portNum) - } - } - } - return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, pod) -} diff --git a/pkg/kubernetes/portforward/portforward.go b/pkg/kubernetes/portforward/portforward.go deleted file mode 100644 index 5e722f777..000000000 --- a/pkg/kubernetes/portforward/portforward.go +++ /dev/null @@ -1,92 +0,0 @@ -package portforward - -import ( - "context" - "fmt" - "net/http" - "net/url" - "os" - - "github.com/pluralsh/plural-cli/pkg/kubernetes" - "github.com/pluralsh/plural-cli/pkg/kubernetes/utils" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" -) - -func PortForward(namespace, resource string, ports []string, stopChan, readyChan chan struct{}) error { - obj, forwardablePod, err := utils.GetPodWithObject(namespace, resource) - if err != nil { - return err - } - podName := forwardablePod.Name - if len(podName) == 0 { - return fmt.Errorf("pod name or resource type/name must be specified") - } - - var podPorts []string - // handle service port mapping to target port if needed - switch t := obj.(type) { - case *corev1.Service: - err = checkUDPPortInService(ports, t) - if err != nil { - return err - } - podPorts, err = translateServicePortToTargetPort(ports, *t, *forwardablePod) - if err != nil { - return err - } - default: - err = checkUDPPortInPod(ports, forwardablePod) - if err != nil { - return err - } - podPorts, err = convertPodNamedPortToNumber(ports, *forwardablePod) - if err != nil { - return err - } - } - if len(podPorts) < 1 { - return fmt.Errorf("at least 1 PORT is required for port-forward") - } - kube, err := kubernetes.Kubernetes() - if err != nil { - return err - } - - pod, err := kube.GetClient().CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return err - } - if pod.Status.Phase != corev1.PodRunning { - return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase) - } - - req := kube.GetRestClient().Post(). - Resource("pods"). - Namespace(namespace). - Name(pod.Name). - SubResource("portforward") - - return forwardPorts(http.MethodPost, req.URL(), podPorts, stopChan, readyChan) -} - -func forwardPorts(method string, url *url.URL, ports []string, stopChan, readyChan chan struct{}) error { - - clientConfig, err := kubernetes.KubeConfig() - if err != nil { - return err - } - - transport, upgrader, err := spdy.RoundTripperFor(clientConfig) - if err != nil { - return err - } - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url) - fw, err := portforward.New(dialer, ports, stopChan, readyChan, os.Stdout, os.Stderr) - if err != nil { - return err - } - return fw.ForwardPorts() -} diff --git a/pkg/kubernetes/utils/utils.go b/pkg/kubernetes/utils/utils.go deleted file mode 100644 index 94ef17a3e..000000000 --- a/pkg/kubernetes/utils/utils.go +++ /dev/null @@ -1,63 +0,0 @@ -package utils - -import ( - "fmt" - "sort" - "time" - - "github.com/pluralsh/plural-cli/pkg/kubernetes" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/cli-runtime/pkg/genericclioptions" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - "k8s.io/kubectl/pkg/polymorphichelpers" - "k8s.io/kubectl/pkg/scheme" - "k8s.io/kubectl/pkg/util/podutils" -) - -const ( - // Amount of time to wait until at least one pod is running - defaultPodWaitTimeout = 60 * time.Second -) - -func GetPodWithObject(namespace, resource string) (runtime.Object, *corev1.Pod, error) { - matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag().WithDiscoveryBurst(300).WithDiscoveryQPS(50.0)) - f := cmdutil.NewFactory(matchVersionKubeConfigFlags) - - builder := f.NewBuilder(). - WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). - NamespaceParam(namespace).DefaultNamespace(). - SingleResourceType() - builder.ResourceNames("pods", resource) - obj, err := builder.Do().Object() - if err != nil { - return nil, nil, err - } - - return attachablePodForObject(obj, defaultPodWaitTimeout) - -} - -func attachablePodForObject(object runtime.Object, timeout time.Duration) (runtime.Object, *corev1.Pod, error) { - if t, ok := object.(*corev1.Pod); ok { - return object, t, nil - } - - clientConfig, err := kubernetes.KubeConfig() - if err != nil { - return nil, nil, err - } - clientset, err := corev1client.NewForConfig(clientConfig) - if err != nil { - return nil, nil, err - } - - namespace, selector, err := polymorphichelpers.SelectorsForObject(object) - if err != nil { - return nil, nil, fmt.Errorf("cannot attach to %T: %w", object, err) - } - sortBy := func(pods []*corev1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) } - pod, _, err := polymorphichelpers.GetFirstPod(clientset, namespace, selector.String(), timeout, sortBy) - return object, pod, err -}