-
Notifications
You must be signed in to change notification settings - Fork 136
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: add initial `testkube devbox` command * feat: clean up development tool a bit * fix: GZip the binaries before sending * fix: small issues with devbox, add dashboard link, add README * feat: parallelize devbox better * chore: add links for CRD Sync workflows and templates * chore: adjust devbox messages * chore: reduce size of binaries for devbox * fix: reduce Init Process size from 35MB to 5MB * chore: round time in devbox * fix: generate properly slug for devbox environment * chore: add option to open dashboard * fix: delete debug * chore: avoid logs for canceled operation * chore: increase timeout for build bucket * fix: restarting pod * fix: restarting pod * chore: clean messages and add transfer size * feat: add basic binary storage to avoid transferring too much data in devbox * feat: make BinaryPatch more stable * fix: make binary patch more stable * chore: adjust a bit binary patch constants * chore: reuse buffer better * fix: corner cases where binary patch was stuck * fix: avoid unnecessary container for agent in devbox
- Loading branch information
Showing
24 changed files
with
3,725 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
// Copyright 2024 Testkube. | ||
// | ||
// Licensed as a Testkube Pro file under the Testkube Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt | ||
|
||
package main | ||
|
||
import ( | ||
"bytes" | ||
"compress/gzip" | ||
"crypto/sha256" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"os" | ||
"os/signal" | ||
"path/filepath" | ||
"strings" | ||
"sync" | ||
"syscall" | ||
|
||
"github.com/dustin/go-humanize" | ||
|
||
"github.com/kubeshop/testkube/cmd/tcl/kubectl-testkube/devbox/devutils" | ||
) | ||
|
||
var ( | ||
locks = make(map[string]*sync.RWMutex) | ||
locksMu sync.Mutex | ||
hashCache = make(map[string]string) | ||
) | ||
|
||
func getLock(filePath string) *sync.RWMutex { | ||
locksMu.Lock() | ||
defer locksMu.Unlock() | ||
if locks[filePath] == nil { | ||
locks[filePath] = new(sync.RWMutex) | ||
} | ||
return locks[filePath] | ||
} | ||
|
||
func rebuildHash(filePath string) { | ||
hashCache[filePath] = "" | ||
f, err := os.Open(filePath) | ||
if err != nil { | ||
return | ||
} | ||
defer f.Close() | ||
|
||
h := sha256.New() | ||
if _, err := io.Copy(h, f); err == nil { | ||
hashCache[filePath] = fmt.Sprintf("%x", h.Sum(nil)) | ||
} | ||
} | ||
|
||
func getHash(filePath string) string { | ||
if hashCache[filePath] == "" { | ||
rebuildHash(filePath) | ||
} | ||
return hashCache[filePath] | ||
} | ||
|
||
func main() { | ||
storagePath := "/storage" | ||
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusOK) | ||
}) | ||
|
||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | ||
filePath := filepath.Clean(strings.TrimPrefix(r.URL.Path, "/")) | ||
if filePath == "" { | ||
w.WriteHeader(http.StatusNotFound) | ||
return | ||
} | ||
localPath := filepath.Join(storagePath, filePath) | ||
if r.Method == http.MethodGet { | ||
getLock(filePath).RLock() | ||
defer getLock(filePath).RUnlock() | ||
|
||
file, err := os.Open(localPath) | ||
if err != nil { | ||
w.WriteHeader(http.StatusNotFound) | ||
return | ||
} | ||
stat, err := file.Stat() | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
return | ||
} | ||
w.Header().Set("Content-Type", "application/octet-stream") | ||
w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size())) | ||
w.WriteHeader(http.StatusOK) | ||
io.Copy(w, file) | ||
return | ||
} else if r.Method == http.MethodPost { | ||
getLock(filePath).Lock() | ||
defer getLock(filePath).Unlock() | ||
|
||
body, err := io.ReadAll(r.Body) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading body", err) | ||
return | ||
} | ||
if r.ContentLength != int64(len(body)) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
return | ||
} | ||
if r.Header.Get("Content-Encoding") == "gzip" { | ||
gz, err := gzip.NewReader(bytes.NewBuffer(body)) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading body into gzip", err) | ||
return | ||
} | ||
body, err = io.ReadAll(gz) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading back data from gzip stream", err) | ||
return | ||
} | ||
} | ||
|
||
err = os.WriteFile(localPath, body, 0666) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed to write file", err) | ||
return | ||
} | ||
|
||
h := sha256.New() | ||
if _, err := io.Copy(h, bytes.NewBuffer(body)); err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed to build hash", err) | ||
} | ||
hashCache[filePath] = fmt.Sprintf("%x", h.Sum(nil)) | ||
|
||
fmt.Println("saved file", filePath, humanize.Bytes(uint64(len(body)))) | ||
w.WriteHeader(http.StatusOK) | ||
return | ||
} else if r.Method == http.MethodPatch { | ||
getLock(filePath).Lock() | ||
defer getLock(filePath).Unlock() | ||
|
||
body, err := io.ReadAll(r.Body) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading body", err) | ||
return | ||
} | ||
if r.ContentLength != int64(len(body)) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
return | ||
} | ||
if r.Header.Get("Content-Encoding") == "gzip" { | ||
gz, err := gzip.NewReader(bytes.NewBuffer(body)) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading body into gzip", err) | ||
return | ||
} | ||
body, err = io.ReadAll(gz) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading back data from gzip stream", err) | ||
return | ||
} | ||
} | ||
|
||
// Verify if patch can be applied | ||
if r.Header.Get("X-Prev-Hash") != getHash(filePath) { | ||
w.WriteHeader(http.StatusConflict) | ||
return | ||
} | ||
|
||
// Apply patch | ||
prevFile, err := os.ReadFile(localPath) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed reading existing file", err) | ||
return | ||
} | ||
patch := devutils.NewBinaryPatchFromBytes(body) | ||
file := patch.Apply(prevFile) | ||
|
||
h := sha256.New() | ||
if _, err := io.Copy(h, bytes.NewBuffer(file)); err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed to build hash", err) | ||
return | ||
} | ||
|
||
// Validate hash | ||
nextHash := fmt.Sprintf("%x", h.Sum(nil)) | ||
if r.Header.Get("X-Hash") != nextHash { | ||
w.WriteHeader(http.StatusBadRequest) | ||
fmt.Println("after applying patch result has different hash than expected", err) | ||
return | ||
} | ||
fmt.Println("Expected hash", r.Header.Get("X-Hash"), "got", nextHash) | ||
err = os.WriteFile(localPath, file, 0666) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
fmt.Println("failed to write file", err) | ||
return | ||
} | ||
hashCache[filePath] = nextHash | ||
w.WriteHeader(http.StatusOK) | ||
return | ||
} | ||
w.WriteHeader(http.StatusMethodNotAllowed) | ||
}) | ||
|
||
stopSignal := make(chan os.Signal, 1) | ||
signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM) | ||
go func() { | ||
<-stopSignal | ||
os.Exit(0) | ||
}() | ||
|
||
fmt.Println("Starting server...") | ||
|
||
panic(http.ListenAndServe(":8080", nil)) | ||
} |
Oops, something went wrong.