diff --git a/cli/client/http.go b/cli/client/http.go index d95c3106..8483f433 100644 --- a/cli/client/http.go +++ b/cli/client/http.go @@ -103,6 +103,7 @@ func (c *HTTPClient) RetryPollStep(ctx context.Context, in *api.PollStepRequest, Trace("RetryPollStep: step completed") return step, pollError } + time.Sleep(time.Millisecond * 10) // nolint:gomnd } } @@ -130,6 +131,7 @@ func (c *HTTPClient) RetryHealth(ctx context.Context, timeout time.Duration) (he Trace("RetryHealth: health check completed") return healthResponse, pollError } + time.Sleep(time.Millisecond * 100) // nolint:gomnd } } diff --git a/engine/engine.go b/engine/engine.go index 26a915b9..5dc55fcd 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "sync" "github.com/drone/runner-go/pipeline/runtime" "github.com/harness/lite-engine/engine/docker" @@ -16,6 +17,7 @@ import ( type Engine struct { pipelineConfig *spec.PipelineConfig docker *docker.Docker + mu sync.Mutex } func NewEnv(opts docker.Opts) (*Engine, error) { @@ -30,8 +32,11 @@ func NewEnv(opts docker.Opts) (*Engine, error) { } func (e *Engine) Setup(ctx context.Context, pipelineConfig *spec.PipelineConfig) error { + e.mu.Lock() e.pipelineConfig = pipelineConfig - for _, vol := range e.pipelineConfig.Volumes { + e.mu.Unlock() + + for _, vol := range pipelineConfig.Volumes { if vol != nil && vol.HostPath != nil { if err := os.MkdirAll(vol.HostPath.Path, 0777); err != nil { // nolint:gomnd return errors.Wrap(err, @@ -44,16 +49,27 @@ func (e *Engine) Setup(ctx context.Context, pipelineConfig *spec.PipelineConfig) } func (e *Engine) Destroy(ctx context.Context) error { - return e.docker.Destroy(ctx, e.pipelineConfig) + e.mu.Lock() + cfg := e.pipelineConfig + e.mu.Unlock() + + return e.docker.Destroy(ctx, cfg) } func (e *Engine) Run(ctx context.Context, step *spec.Step, output io.Writer) (*runtime.State, error) { - for k, v := range e.pipelineConfig.Envs { + e.mu.Lock() + cfg := e.pipelineConfig + e.mu.Unlock() + + if step.Envs == nil { + step.Envs = make(map[string]string) + } + for k, v := range cfg.Envs { step.Envs[k] = v } if step.Image != "" { - return e.docker.Run(ctx, e.pipelineConfig, step, output) + return e.docker.Run(ctx, cfg, step, output) } return exec.Run(ctx, step, output) diff --git a/handler/handler.go b/handler/handler.go index 161d0f6d..7973a2e6 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -9,12 +9,14 @@ import ( "github.com/harness/lite-engine/pipeline/runtime" "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" ) // Handler returns an http.Handler that exposes the service resources. func Handler(config *config.Config, engine *engine.Engine, stepExecutor *runtime.StepExecutor) http.Handler { r := chi.NewRouter() r.Use(logger.Middleware) + r.Use(middleware.Recoverer) // Setup stage endpoint r.Mount("/setup", func() http.Handler { diff --git a/livelog/livelog.go b/livelog/livelog.go index 7a36ae29..579f89d5 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -27,7 +27,8 @@ type Writer struct { client logstream.Client // client - key string // Unique key to identify in storage + key string // Unique key to identify in storage + name string // Human readable name of the key num int now time.Time @@ -48,10 +49,11 @@ type Writer struct { } // New returns a new writer -func New(client logstream.Client, key string, nudges []logstream.Nudge) *Writer { +func New(client logstream.Client, key, name string, nudges []logstream.Nudge) *Writer { b := &Writer{ client: client, key: key, + name: name, now: time.Now(), limit: defaultLimit, interval: defaultInterval, @@ -106,7 +108,7 @@ func (b *Writer) Write(p []byte) (n int, err error) { Timestamp: time.Now(), ElaspedTime: int64(time.Since(b.now).Seconds()), } - logrus.WithField("key", b.key).Infoln(line.Message) + logrus.WithField("name", b.name).Infoln(line.Message) for b.size+len(part) > b.limit { // Keep streaming even after the limit, but only upload last `b.limit` data to the store @@ -116,7 +118,7 @@ func (b *Writer) Write(p []byte) (n int, err error) { hline, err := json.Marshal(b.history[0]) if err != nil { - logrus.WithError(err).WithField("key", b.key).Errorln("could not marshal log") + logrus.WithError(err).WithField("name", b.name).Errorln("could not marshal log") } b.size -= len(hline) b.history = b.history[1:] @@ -152,7 +154,7 @@ func (b *Writer) Open() error { b.stop() // stop trying to stream if we could not open the stream return err } - logrus.WithField("key", b.key).Infoln("successfully opened log stream") + logrus.WithField("name", b.name).Infoln("successfully opened log stream") b.opened = true return nil } diff --git a/livelog/livelog_test.go b/livelog/livelog_test.go index 757b8f19..6bf70a76 100644 --- a/livelog/livelog_test.go +++ b/livelog/livelog_test.go @@ -12,7 +12,7 @@ import ( func TestLineWriterSingle(t *testing.T) { client := new(mockClient) - w := New(client, "1", nil) + w := New(client, "1", "1", nil) w.SetInterval(time.Duration(0)) w.num = 4 w.Write([]byte("foo\nbar\n")) // nolint:errcheck diff --git a/pipeline/runtime/common.go b/pipeline/runtime/common.go index 059b6119..926e3aa5 100644 --- a/pipeline/runtime/common.go +++ b/pipeline/runtime/common.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "fmt" + "io" "os" "strings" @@ -34,11 +35,14 @@ func getOutputVarCmd(outputVars []string, outputFile string) string { // Fetches map of env variable and value from OutputFile. // OutputFile stores all env variable and value -func fetchOutputVariables(outputFile string) (map[string]string, error) { +func fetchOutputVariables(outputFile string, out io.Writer) (map[string]string, error) { + log := logrus.New() + log.Out = out + outputs := make(map[string]string) f, err := os.Open(outputFile) if err != nil { - logrus.WithError(err).WithField("outputFile", outputFile).Errorln("Failed to open output file") + log.WithError(err).WithField("outputFile", outputFile).Errorln("failed to open output file") return nil, err } defer f.Close() @@ -48,14 +52,13 @@ func fetchOutputVariables(outputFile string) (map[string]string, error) { line := s.Text() sa := strings.Split(line, " ") if len(sa) < 2 { // nolint:gomnd - logrus.WithField("variable", sa[0]).Warnln( - "output variable does not exist") + log.WithField("variable", sa[0]).Warnln("output variable does not exist") } else { outputs[sa[0]] = line[len(sa[0])+1:] } } if err := s.Err(); err != nil { - logrus.WithError(err).Errorln("failed to create scanner from output file") + log.WithError(err).Errorln("failed to create scanner from output file") return nil, err } return outputs, nil diff --git a/pipeline/runtime/run.go b/pipeline/runtime/run.go index 6035510d..a7f8ee35 100644 --- a/pipeline/runtime/run.go +++ b/pipeline/runtime/run.go @@ -20,7 +20,7 @@ func executeRunStep(ctx context.Context, engine *engine.Engine, r *api.StartStep step.Command = r.Run.Command step.Entrypoint = r.Run.Entrypoint - if len(r.OutputVars) > 0 && len(step.Entrypoint) == 0 || len(step.Command) == 0 { + if len(r.OutputVars) > 0 && (len(step.Entrypoint) == 0 || len(step.Command) == 0) { return nil, nil, fmt.Errorf("output variable should not be set for unset entrypoint or command") } @@ -36,7 +36,7 @@ func executeRunStep(ctx context.Context, engine *engine.Engine, r *api.StartStep if len(r.OutputVars) > 0 { if exited != nil && exited.Exited && exited.ExitCode == 0 { - outputs, err := fetchOutputVariables(outputFile) // nolint:govet + outputs, err := fetchOutputVariables(outputFile, out) // nolint:govet if err != nil { return exited, nil, err } diff --git a/pipeline/runtime/runtest.go b/pipeline/runtime/runtest.go index 53f2a2cb..2c6f7e79 100644 --- a/pipeline/runtime/runtest.go +++ b/pipeline/runtime/runtest.go @@ -51,7 +51,7 @@ func executeRunTestStep(ctx context.Context, engine *engine.Engine, r *api.Start if len(r.OutputVars) > 0 { if exited != nil && exited.Exited && exited.ExitCode == 0 { - outputs, err := fetchOutputVariables(outputFile) // nolint:govet + outputs, err := fetchOutputVariables(outputFile, out) // nolint:govet if err != nil { return exited, nil, err } diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index 85c5f1ed..4d8b8257 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -116,7 +116,7 @@ func (e *StepExecutor) executeStep(r *api.StartStepRequest) (*runtime.State, map // Create a log stream for step logs client := state.GetLogStreamClient() - wc := livelog.New(client, r.LogKey, getNudges()) + wc := livelog.New(client, r.LogKey, r.Name, getNudges()) wr := logstream.NewReplacer(wc, secrets) go wr.Open() // nolint:errcheck diff --git a/setup/setup.go b/setup/setup.go index 5c732a82..810dcf2b 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -140,7 +140,10 @@ func installDocker(instanceInfo InstanceInfo) { Error("get docker install script failed") return } + cmd = exec.Command("sudo", "sh", "get-docker.sh") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr dockerInstallErr := cmd.Run() if dockerInstallErr != nil { logrus.