Skip to content

Commit

Permalink
support for older kubectl (#258)
Browse files Browse the repository at this point in the history
* warns in the logs and the console if there is an old version of k8s but it will adapt the arguments around kubectl cp to not use retries.
  • Loading branch information
rsvihladremio authored Sep 9, 2024
1 parent 5a83b07 commit 3e1b384
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 14 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## [3.2.4] - 2024-09-09

* added support for using older kubectl clients since the kubectl cp interface is stable, by checking client version we can safely check if retries are supported and only add them if the are

## [3.2.3] - 2024-09-06

### Added
Expand Down Expand Up @@ -768,6 +772,7 @@ someone has added the PAT which is always available

- able to capture logs, configuration and diagnostic data from Dremio clusters deployed on Kubernetes and on-prem

[3.2.4]: https://github.com/dremio/dremio-diagnostic-collector/compare/v3.2.3...v3.2.4
[3.2.3]: https://github.com/dremio/dremio-diagnostic-collector/compare/v3.2.2...v3.2.3
[3.2.2]: https://github.com/dremio/dremio-diagnostic-collector/compare/v3.2.1...v3.2.2
[3.2.1]: https://github.com/dremio/dremio-diagnostic-collector/compare/v3.2.0...v3.2.1
Expand Down
86 changes: 72 additions & 14 deletions cmd/root/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package kubectl

import (
"context"
"encoding/json"
"fmt"
"os/exec"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -50,23 +52,68 @@ func NewKubectlK8sActions(hook shutdown.CancelHook, namespace, k8sContext string
}
k8sContext = strings.TrimSpace(k8sContextRaw)
}
retriesEnabled, err := CanRetryTransfers(kubectl)
if err != nil {
return &CliK8sActions{}, fmt.Errorf("unable to run kubectl version so disabling kubectl: %v", err)
}
return &CliK8sActions{
cli: cliInstance,
kubectlPath: kubectl,
namespace: namespace,
k8sContext: k8sContext,
pidHosts: make(map[string]string),
cli: cliInstance,
kubectlPath: kubectl,
namespace: namespace,
k8sContext: k8sContext,
pidHosts: make(map[string]string),
retriesEnabled: retriesEnabled,
}, nil
}

// CliK8sActions provides a way to collect and copy files using kubectl
type CliK8sActions struct {
cli cli.CmdExecutor
kubectlPath string
namespace string
k8sContext string
pidHosts map[string]string
m sync.Mutex
cli cli.CmdExecutor
kubectlPath string
namespace string
k8sContext string
pidHosts map[string]string
m sync.Mutex
retriesEnabled bool
}

func CanRetryTransfers(kubectlPath string) (bool, error) {
kubectlExec := exec.Command(kubectlPath, "version", "-o", "json")
out, err := kubectlExec.Output()
if err != nil {
return false, err
}
var results k8sVersion
err = json.Unmarshal([]byte(out), &results)
if err != nil {
return false, err
}

if results.ClientVersion.Major == "1" {
parsed, err := strconv.ParseInt(results.ClientVersion.Minor, 10, 32)
if err != nil {
return false, err
}
// retries flag starts showing up in 1.23.0
if parsed > 22 {
return true, nil
}
msg := fmt.Sprintf("kubectl version %v no retries available, consider upgrading", results.ClientVersion.GitVersion)
consoleprint.AddWarningToConsole(msg)
simplelog.Warning(msg)
return false, nil
}
return false, nil
}

type k8sVersion struct {
ClientVersion clientVersion `json:"clientVersion"`
}

type clientVersion struct {
Major string `json:"major"`
Minor string `json:"minor"`
GitVersion string `json:"gitVersion"`
}

func (c *CliK8sActions) cleanLocal(rawDest string) string {
Expand Down Expand Up @@ -122,9 +169,17 @@ func (c *CliK8sActions) CopyFromHost(hostString string, source, destination stri
if err != nil {
return "", fmt.Errorf("unable to get container name: %v", err)
}
return c.cli.Execute(false, c.kubectlPath, "cp", "-n", c.namespace, "--context", c.k8sContext, "-c", container, "--retries", "50", fmt.Sprintf("%v:%v", hostString, source), c.cleanLocal(destination))
args := []string{c.kubectlPath, "cp", "-n", c.namespace, "--context", c.k8sContext, "-c", container}
args = c.addRetries(args)
args = append(args, fmt.Sprintf("%v:%v", hostString, source), c.cleanLocal(destination))
return c.cli.Execute(false, args...)
}
func (c *CliK8sActions) addRetries(args []string) []string {
if c.retriesEnabled {
args = append(args, "--retries", "50")
}
return args
}

func (c *CliK8sActions) CopyToHost(hostString string, source, destination string) (out string, err error) {
if strings.HasPrefix(source, `C:`) {
// Fix problem seen in https://github.com/kubernetes/kubernetes/issues/77310
Expand All @@ -135,7 +190,10 @@ func (c *CliK8sActions) CopyToHost(hostString string, source, destination string
if err != nil {
return "", fmt.Errorf("unable to get container name: %v", err)
}
return c.cli.Execute(false, c.kubectlPath, "cp", "-n", c.namespace, "--context", c.k8sContext, "-c", container, "--retries", "50", c.cleanLocal(source), fmt.Sprintf("%v:%v", hostString, destination))
args := []string{c.kubectlPath, "cp", "-n", c.namespace, "--context", c.k8sContext, "-c", container}
args = c.addRetries(args)
args = append(args, c.cleanLocal(source), fmt.Sprintf("%v:%v", hostString, destination))
return c.cli.Execute(false, args...)
}

func (c *CliK8sActions) GetCoordinators() (podName []string, err error) {
Expand Down
64 changes: 64 additions & 0 deletions cmd/root/kubectl/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ package kubectl

import (
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"testing"

"github.com/dremio/dremio-diagnostic-collector/v3/pkg/tests"
Expand Down Expand Up @@ -166,3 +171,62 @@ func TestKubectCopyFromWindowsHost(t *testing.T) {
t.Errorf("\nexpected call\n%v\nbut got\n%v", expectedCall, calls[1])
}
}

func TestKubectlCtrlVersion(t *testing.T) {
if testing.Short() {
t.Skip("skipping testing in short mode")
}
tmpDir := t.TempDir()
goos := runtime.GOOS
goarch := runtime.GOARCH
oldVersion := fmt.Sprintf("https://dl.k8s.io/release/v1.22.0/bin/%v/%v/kubectl", goos, goarch)
newVersion := fmt.Sprintf("https://dl.k8s.io/release/v1.23.0/bin/%v/%v/kubectl", goos, goarch)
downloadFile := func(url string, outFile string) error {
out, err := os.Create(outFile)
if err != nil {
return err
}
defer out.Close()

// Get the data
resp, err := http.Get(url) //nolint
if err != nil {
return err
}
defer resp.Body.Close()

// Write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
if err := os.Chmod(outFile, 0700); err != nil {
return err
}
return nil
}
oldExec := path.Join(tmpDir, "kubectlOld")
if err := downloadFile(oldVersion, oldExec); err != nil {
t.Fatalf("unable to download file %v %v: ", oldExec, err)
}

newExec := path.Join(tmpDir, "kubectlNew")
if err := downloadFile(newVersion, newExec); err != nil {
t.Fatalf("unable to download file %v %v: ", newExec, err)
}

result, err := CanRetryTransfers(oldExec)
if err != nil {
t.Errorf("unable to execute file %v %v: ", oldExec, err)
}
if result {
t.Error("failed should not be able to retry on old exec")
}
result, err = CanRetryTransfers(newExec)
if err != nil {
t.Errorf("unable to execute file %v %v: ", newExec, err)
}
if !result {
t.Error("failed should be able to retry on new exec")
}
}

0 comments on commit 3e1b384

Please sign in to comment.