diff --git a/pb/api.proto b/pb/api.proto index 0922bca1..9bde7fba 100644 --- a/pb/api.proto +++ b/pb/api.proto @@ -69,6 +69,7 @@ message Job { string sshPrivateKey = 21; bool sshClone = 22; string branch = 23; + repeated string archive = 24; } message Command { diff --git a/server/api/api.go b/server/api/api.go index a3bbf232..81bba5e1 100644 --- a/server/api/api.go +++ b/server/api/api.go @@ -109,8 +109,10 @@ func (r Router) Handler() http.Handler { cors := cors.New(corsOpts) router.Use(cors.Handler) - router.Mount("/api/v1", r.apiRouter()) + + router.Mount("/archive/{id}/", build.HandleArchive(r.Jobs, r.Config)) + router.Mount("/archive/{username}/{repo}/{branch}/{buildid}/{pull}/{platform}/{matrixid}", build.HandleArchiveRedirect(r.Jobs, r.Repos, r.Builds, r.Config)) router.Get("/ws", ws.UpstreamHandler(r.Config.Websocket.Addr)) router.Get("/badge/{token}", badge.HandleBadge(r.Builds)) router.Mount("/uploads", r.fileServer()) @@ -247,6 +249,7 @@ func (r Router) workersRouter() *chi.Mux { router.Post("/auth", worker.HandleAuth(r.Workers, r.Config, r.WS.App)) router.Post("/cache", worker.HandleUploadCache(r.Config)) router.Get("/cache", worker.HandleDownloadCache(r.Config)) + router.Post("/archive", worker.HandleUploadArchive(r.Config)) }) return router diff --git a/server/api/build/archive.go b/server/api/build/archive.go new file mode 100644 index 00000000..5fe96ca9 --- /dev/null +++ b/server/api/build/archive.go @@ -0,0 +1,112 @@ +package build + +import ( + "math" + "net/http" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/bleenco/abstruse/server/api/render" + "github.com/bleenco/abstruse/server/config" + "github.com/bleenco/abstruse/server/core" + "github.com/go-chi/chi" +) + +func HandleArchive(jobs core.JobStore, config *config.Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + id, err := strconv.Atoi(chi.URLParam(r, "id")) + if err != nil { + render.BadRequestError(w, err.Error()) + return + } + + file := path.Join(config.DataDir, "archive", strconv.Itoa(id), r.URL.Path[len(strconv.Itoa(id))+9:]) + f, err := os.Open(file) + if err != nil { + render.NotFoundError(w, err.Error()) + return + } + defer f.Close() + http.ServeContent(w, r, "", time.Now(), f) + } +} + +func HandleArchiveRedirect(jobs core.JobStore, repos core.RepositoryStore, builds core.BuildStore, config *config.Config) http.HandlerFunc { + //{username}/{repo}/{branch}/{buildid} + return func(w http.ResponseWriter, r *http.Request) { + buildid, err := strconv.Atoi(chi.URLParam(r, "buildid")) + if err != nil && chi.URLParam(r, "buildid") != "latest" { + render.BadRequestError(w, err.Error()) + return + } + matrixid, err := strconv.Atoi(chi.URLParam(r, "matrixid")) + if err != nil { + render.BadRequestError(w, err.Error()) + return + } + username := chi.URLParam(r, "username") + repo := chi.URLParam(r, "repo") + branch := chi.URLParam(r, "branch") + //platform := chi.URLParam(r, "platform") + pull := chi.URLParam(r, "pull") + repository, err := repos.FindArchive(username, repo) + if err != nil { + render.BadRequestError(w, err.Error()) + return + } + builds, err := builds.List(core.BuildFilter{ + RepositoryID: int(repository.ID), + UserID: repository.UserID, // TOOD:? + Limit: math.MaxInt, + }) + if err != nil { + render.BadRequestError(w, err.Error()) + return + } + var j *core.Job + found := false + for i := range builds { + if builds[i].ID != uint(buildid) && chi.URLParam(r, "buildid") != "latest" { + continue + } + if strconv.Itoa(builds[i].PR) != pull { + // TODO: + continue + } + if strings.ReplaceAll(builds[i].Branch, "/", "-") != branch { + continue + } + if matrixid >= len(builds[i].Jobs) { + if err != nil { + render.BadRequestError(w, "matrixid is bigger than len(builds[i].Jobs)") + return + } + } + // Waiting for #563 to get merged + //for k := range builds[i].Jobs { + // if platform == "0" || platform == "" || platform == builds[i].Jobs[k].Platform { + // + // } + //} + j = builds[i].Jobs[matrixid] + found = true + break + } + if !found { + render.BadRequestError(w, "Unable to find requested job.") + return + } + // render.JSON(w, 200, j) + u := strings.Split(r.RequestURI, "/") + if len(u) > 9 { + w.Header().Set("Location", "/archive/"+strconv.Itoa(int(j.ID))+"/"+strings.Join(u[9:], "/")) + } else { + w.Header().Set("Location", "/archive/"+strconv.Itoa(int(j.ID))+"/") + } + w.WriteHeader(302) + return + } +} diff --git a/server/api/worker/upload_archive.go b/server/api/worker/upload_archive.go new file mode 100644 index 00000000..72974319 --- /dev/null +++ b/server/api/worker/upload_archive.go @@ -0,0 +1,59 @@ +package worker + +import ( + "io" + "log" + "net/http" + "os" + "path" + "path/filepath" + "strings" + + "github.com/bleenco/abstruse/server/api/render" + "github.com/bleenco/abstruse/server/config" + "github.com/mholt/archiver/v3" +) + +// HandleUploadArchive returns http.handlerFunc that writes JSON encoded +// result about uploading cache to the http response body. +func HandleUploadArchive(config *config.Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + r.ParseMultipartForm(1000 << 20) + + src, handler, err := r.FormFile("file") + if err != nil { + render.InternalServerError(w, err.Error()) + return + } + defer src.Close() + os.MkdirAll(filepath.Join(config.DataDir, "archive"), 0750) + filePath := filepath.Join(config.DataDir, "archive", handler.Filename) + dst, err := os.Create(filePath) + if err != nil { + render.InternalServerError(w, err.Error()) + return + } + defer dst.Close() + + if _, err := io.Copy(dst, src); err != nil { + render.InternalServerError(w, err.Error()) + return + } + + if err = treatArchive(filePath, config.DataDir); err != nil { + render.InternalServerError(w, err.Error()) + return + } + render.JSON(w, http.StatusOK, render.BoolResponse{Status: true}) + } +} + +func treatArchive(filePath, datadir string) error { + idSplit := strings.Split(strings.ReplaceAll(filePath, "/", "."), ".") + // The job id + targetDir := path.Join(datadir, "archive", idSplit[len(idSplit)-2]) + os.MkdirAll(targetDir, 0750) + log.Println(filePath, "/", targetDir) + defer os.RemoveAll(filePath) + return archiver.Unarchive(filePath, targetDir) +} diff --git a/server/core/job.go b/server/core/job.go index d6f5c593..030de221 100644 --- a/server/core/job.go +++ b/server/core/job.go @@ -16,6 +16,7 @@ type ( Log string `gorm:"size:16777216" json:"-"` Stage string `json:"stage"` Cache string `json:"cache"` + Archive string `json:"string"` Build *Build `gorm:"preload:false" json:"build,omitempty"` BuildID uint `json:"buildID"` Timestamp @@ -29,6 +30,9 @@ type ( // FindUser returns job by id and user id. FindUser(uint, uint) (*Job, error) + // FindBuild return the jobs for a specific build id + FindBuild(uint) ([]Job, error) + // List returns jobs based bu from and to dates. List(time.Time, time.Time) ([]*Job, error) diff --git a/server/core/repo.go b/server/core/repo.go index 343c5717..9331f0c2 100644 --- a/server/core/repo.go +++ b/server/core/repo.go @@ -62,6 +62,9 @@ type ( // FindToken returns repository by token. FindToken(string) (*Repository, error) + // FindArchive return repository by username and repo name + FindArchive(string, string) (Repository, error) + // List returns list of repositories from the datastore. List(RepositoryFilter) ([]Repository, int, error) diff --git a/server/parser/parser.go b/server/parser/parser.go index d7aaaa62..0ffa9caf 100644 --- a/server/parser/parser.go +++ b/server/parser/parser.go @@ -31,6 +31,7 @@ type RepoConfig struct { AfterDeploy []string `yaml:"after_deploy"` AfterScript []string `yaml:"after_script"` Cache []string `yaml:"cache"` + Archive []string `yaml:"archive"` } // MatrixConfig defines structure for matrix job config in .abstruse.yml file. @@ -54,15 +55,17 @@ type JobConfig struct { Title string `json:"title"` Commands *api.CommandList `json:"commands"` Cache []string `json:"cache"` + Archive []string `json:"archive"` } // ConfigParser defines repository configuration parser. type ConfigParser struct { - Raw string - Branch string - Parsed RepoConfig - Env []string - Mount []string + Raw string + Branch string + Parsed RepoConfig + Env []string + Mount []string + Archive []string } // NewConfigParser returns new config parser instance. @@ -94,6 +97,7 @@ func (c *ConfigParser) Parse() ([]*JobConfig, error) { if len(c.Parsed.Matrix) > 0 { for _, item := range c.Parsed.Matrix { job := &JobConfig{} + job.Archive = c.Parsed.Archive // set image if item.Image != "" { @@ -123,7 +127,6 @@ func (c *ConfigParser) Parse() ([]*JobConfig, error) { } job.Commands = c.generateCommands() job.Cache = c.Parsed.Cache - jobs = append(jobs, job) } } else { @@ -135,6 +138,7 @@ func (c *ConfigParser) Parse() ([]*JobConfig, error) { Title: strings.Join(c.Parsed.Script, " "), Commands: c.generateCommands(), Cache: c.Parsed.Cache, + Archive: c.Parsed.Archive, } if job.Image == "" { return jobs, fmt.Errorf("image not specified") @@ -152,6 +156,7 @@ func (c *ConfigParser) Parse() ([]*JobConfig, error) { Title: strings.Join(c.Parsed.Deploy, " "), Commands: c.generateDeployCommands(), Cache: c.Parsed.Cache, + Archive: c.Parsed.Archive, } if job.Image == "" { return jobs, fmt.Errorf("image not specified") diff --git a/server/scheduler/scheduler.go b/server/scheduler/scheduler.go index f2eb1065..3217e9dd 100644 --- a/server/scheduler/scheduler.go +++ b/server/scheduler/scheduler.go @@ -330,6 +330,7 @@ func (s *scheduler) startJob(job *core.Job, worker *core.Worker) { Action: pb.Job_JobStart, WorkerId: worker.ID, Cache: strings.Split(job.Cache, ","), + Archive: strings.Split(job.Archive, ","), Mount: strings.Split(job.Mount, ","), SshPrivateKey: job.Build.Repository.SSHPrivateKey, SshClone: job.Build.Repository.UseSSH, diff --git a/server/store/build/build.go b/server/store/build/build.go index 30f5e27f..eefa42e5 100644 --- a/server/store/build/build.go +++ b/server/store/build/build.go @@ -206,6 +206,7 @@ func (s buildStore) GenerateBuild(repo *core.Repository, base *core.GitHook) ([] BuildID: build.ID, Mount: strings.Join(mnts, ","), Cache: strings.Join(j.Cache, ","), + Archive: strings.Join(j.Archive, ","), } if err := s.jobs.Create(job); err != nil { return nil, 0, err @@ -332,6 +333,7 @@ func (s buildStore) TriggerBuild(opts core.TriggerBuildOpts) ([]*core.Job, error Stage: j.Stage, BuildID: build.ID, Cache: strings.Join(j.Cache, ","), + Archive: strings.Join(j.Archive, ","), } if err := s.jobs.Create(job); err != nil { return nil, err diff --git a/server/store/job/job.go b/server/store/job/job.go index d8a8fe73..835effaa 100644 --- a/server/store/job/job.go +++ b/server/store/job/job.go @@ -26,6 +26,15 @@ func (s jobStore) Find(id uint) (*core.Job, error) { return &job, err } +func (s jobStore) FindBuild(id uint) ([]core.Job, error) { + var jobs []core.Job + err := s.db.Model(&jobs).Where("build_id = ?", id). + Preload("Build.Repository.Provider"). + Preload("Build.Repository.EnvVariables"). + Find(&jobs).Error + return jobs, err +} + func (s jobStore) FindUser(id, userID uint) (*core.Job, error) { var job core.Job err := s.db.Model(&job).Where("id = ?", id). diff --git a/server/store/repo/repo.go b/server/store/repo/repo.go index 3628d6dc..05f9f390 100644 --- a/server/store/repo/repo.go +++ b/server/store/repo/repo.go @@ -58,6 +58,12 @@ func (s repositoryStore) FindToken(token string) (*core.Repository, error) { return &repo, err } +func (s repositoryStore) FindArchive(username string, reponame string) (core.Repository, error) { + var repo core.Repository + err := s.db.Where("namespace = ? AND name = ?", username, reponame).First(&repo).Error + return repo, err +} + func (s repositoryStore) List(filters core.RepositoryFilter) ([]core.Repository, int, error) { var repos []core.Repository var count int diff --git a/web/abstruse/ngsw-config.json b/web/abstruse/ngsw-config.json index 05063faa..99730c1f 100644 --- a/web/abstruse/ngsw-config.json +++ b/web/abstruse/ngsw-config.json @@ -4,7 +4,7 @@ "dataGroups": [ { "name": "api", - "urls": ["/api", "/badge", "/uploads"], + "urls": ["/api", "/badge", "/uploads", "/archive"], "cacheConfig": { "maxSize": 0, "maxAge": "0u", diff --git a/web/abstruse/proxy.conf.json b/web/abstruse/proxy.conf.json index bfa2f026..7bfe2a52 100644 --- a/web/abstruse/proxy.conf.json +++ b/web/abstruse/proxy.conf.json @@ -12,6 +12,10 @@ "target": "http://localhost", "secure": false }, + "/archive": { + "target": "http://localhost", + "secure": false + }, "/uploads": { "target": "http://localhost", "secure": false diff --git a/worker/archive/archive.go b/worker/archive/archive.go new file mode 100644 index 00000000..241d566f --- /dev/null +++ b/worker/archive/archive.go @@ -0,0 +1,95 @@ +package archive + +import ( + "archive/tar" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + api "github.com/bleenco/abstruse/pb" + "github.com/bleenco/abstruse/pkg/fs" + gzip "github.com/klauspost/pgzip" +) + +func SaveArchive(job *api.Job, dir string) (string, error) { + fileName := fmt.Sprintf("%d.tgz", job.GetId()) + out := filepath.Join(dir, fileName) + + if fs.Exists(out) { + if err := os.RemoveAll(out); err != nil { + return out, err + } + } + + return out, createArchive(job.GetArchive(), out) +} + +func createArchive(folders []string, outPath string) error { + out, err := os.Create(outPath) + if err != nil { + return err + } + defer out.Close() + + gw, err := gzip.NewWriterLevel(out, gzip.BestSpeed) + if err != nil { + return err + } + defer gw.Close() + + tw := tar.NewWriter(gw) + defer tw.Close() + + for _, folder := range folders { + folder = filepath.Join(filepath.Dir(outPath), folder) + if err := filepath.Walk(folder, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + + if err != nil { + return err + } + + link := "" + if info.Mode()&os.ModeSymlink != 0 { + link, err = os.Readlink(path) + if err != nil { + return err + } + } + + header, err := tar.FileInfoHeader(info, link) + if err != nil { + return err + } + + header.Name = strings.TrimPrefix(path, fmt.Sprintf("%s/", filepath.Dir(outPath))) + + if err := tw.WriteHeader(header); err != nil { + return err + } + + switch header.Typeflag { + case tar.TypeLink, tar.TypeSymlink, tar.TypeChar, tar.TypeBlock, tar.TypeDir, tar.TypeFifo: + default: + file, err := os.Open(path) + if err != nil { + return err + } + + if _, err := io.Copy(tw, file); err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + } + + return nil +} diff --git a/worker/archive/upload.go b/worker/archive/upload.go new file mode 100644 index 00000000..21d8155f --- /dev/null +++ b/worker/archive/upload.go @@ -0,0 +1,92 @@ +package archive + +import ( + "bytes" + "context" + "fmt" + "io" + "mime/multipart" + "os" + "path/filepath" + + "github.com/bleenco/abstruse/internal/auth" + "github.com/bleenco/abstruse/pkg/lib" + "github.com/bleenco/abstruse/server/api/render" + "github.com/bleenco/abstruse/worker/config" + "github.com/bleenco/abstruse/worker/http" +) + +func UploadArchive(config *config.Config, filePath string) error { + type response struct { + Status bool `json:"status"` + } + + file, err := os.Open(filePath) + if err != nil { + return err + } + defer file.Close() + + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", filepath.Base(filePath)) + if err != nil { + return err + } + + if _, err := io.Copy(part, file); err != nil { + return err + } + + if err := writer.Close(); err != nil { + return err + } + + req := &http.Request{ + Method: "POST", + Path: "/api/v1/workers/archive", + Body: body, + Header: map[string][]string{ + "Content-Type": {writer.FormDataContentType()}, + }, + } + + token, err := auth.JWT.CreateWorkerJWT(auth.WorkerClaims{ + ID: config.ID, + Addr: config.GRPC.Addr, + }) + if err != nil { + return err + } + + client, err := http.NewClient(config.Server.Addr, token) + if err != nil { + return err + } + + resp, err := client.Req(context.Background(), req, nil) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.Status == 200 { + var r response + if err := lib.DecodeJSON(resp.Body, &r); err != nil { + return err + } + + if r.Status { + return nil + } + + return fmt.Errorf("unknown status") + } + + var r render.Error + if err := lib.DecodeJSON(resp.Body, &r); err != nil { + return err + } + + return fmt.Errorf("error uploading archive to abstruse server: %s", r.Message) +} diff --git a/worker/docker/docker.go b/worker/docker/docker.go index a1742139..54974f6c 100644 --- a/worker/docker/docker.go +++ b/worker/docker/docker.go @@ -10,6 +10,7 @@ import ( api "github.com/bleenco/abstruse/pb" "github.com/bleenco/abstruse/pkg/fs" "github.com/bleenco/abstruse/pkg/lib" + "github.com/bleenco/abstruse/worker/archive" "github.com/bleenco/abstruse/worker/cache" "github.com/bleenco/abstruse/worker/config" "github.com/docker/docker/api/types" @@ -175,6 +176,32 @@ func RunContainer(name, image string, job *api.Job, config *config.Config, env [ logch <- []byte(red(err.Error())) } } + } + // save archive. + if len(job.GetArchive()) > 0 { + logch <- []byte(yellow("\r==> Saving files to archive... ")) + archiveFile, err := archive.SaveArchive(job, dir) + + if err != nil { + logch <- []byte(yellow(fmt.Sprintf("%s\r\n", err.Error()))) + } else { + info, err := os.Stat(archiveFile) + if err != nil { + return err + } + + logch <- []byte(yellow("done\r\n")) + logch <- []byte(yellow(fmt.Sprintf("\r==> Uploading archive (%s) to abstruse server... ", humanize.Bytes(uint64(info.Size()))))) + if err := archive.UploadArchive(config, archiveFile); err != nil { + logch <- []byte(yellow(fmt.Sprintf("%s\r\n", err.Error()))) + } else { + logch <- []byte(yellow("done\r\n")) + } + + os.RemoveAll(archiveFile) + } + } + if exitCode == 0 { return nil } return fmt.Errorf("errored: %d", exitCode)