Skip to content

Commit

Permalink
Fix panic in lite-engine (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
shubham149 authored Dec 10, 2021
1 parent 36fdc7f commit 61270a0
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 19 deletions.
2 changes: 2 additions & 0 deletions cli/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (c *HTTPClient) RetryPollStep(ctx context.Context, in *api.PollStepRequest,
Trace("RetryPollStep: step completed")
return step, pollError
}
time.Sleep(time.Millisecond * 10) // nolint:gomnd
}
}

Expand Down Expand Up @@ -130,6 +131,7 @@ func (c *HTTPClient) RetryHealth(ctx context.Context, timeout time.Duration) (he
Trace("RetryHealth: health check completed")
return healthResponse, pollError
}
time.Sleep(time.Millisecond * 100) // nolint:gomnd
}
}

Expand Down
24 changes: 20 additions & 4 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"sync"

"github.com/drone/runner-go/pipeline/runtime"
"github.com/harness/lite-engine/engine/docker"
Expand All @@ -16,6 +17,7 @@ import (
type Engine struct {
pipelineConfig *spec.PipelineConfig
docker *docker.Docker
mu sync.Mutex
}

func NewEnv(opts docker.Opts) (*Engine, error) {
Expand All @@ -30,8 +32,11 @@ func NewEnv(opts docker.Opts) (*Engine, error) {
}

func (e *Engine) Setup(ctx context.Context, pipelineConfig *spec.PipelineConfig) error {
e.mu.Lock()
e.pipelineConfig = pipelineConfig
for _, vol := range e.pipelineConfig.Volumes {
e.mu.Unlock()

for _, vol := range pipelineConfig.Volumes {
if vol != nil && vol.HostPath != nil {
if err := os.MkdirAll(vol.HostPath.Path, 0777); err != nil { // nolint:gomnd
return errors.Wrap(err,
Expand All @@ -44,16 +49,27 @@ func (e *Engine) Setup(ctx context.Context, pipelineConfig *spec.PipelineConfig)
}

func (e *Engine) Destroy(ctx context.Context) error {
return e.docker.Destroy(ctx, e.pipelineConfig)
e.mu.Lock()
cfg := e.pipelineConfig
e.mu.Unlock()

return e.docker.Destroy(ctx, cfg)
}

func (e *Engine) Run(ctx context.Context, step *spec.Step, output io.Writer) (*runtime.State, error) {
for k, v := range e.pipelineConfig.Envs {
e.mu.Lock()
cfg := e.pipelineConfig
e.mu.Unlock()

if step.Envs == nil {
step.Envs = make(map[string]string)
}
for k, v := range cfg.Envs {
step.Envs[k] = v
}

if step.Image != "" {
return e.docker.Run(ctx, e.pipelineConfig, step, output)
return e.docker.Run(ctx, cfg, step, output)
}

return exec.Run(ctx, step, output)
Expand Down
2 changes: 2 additions & 0 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/harness/lite-engine/pipeline/runtime"

"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
)

// Handler returns an http.Handler that exposes the service resources.
func Handler(config *config.Config, engine *engine.Engine, stepExecutor *runtime.StepExecutor) http.Handler {
r := chi.NewRouter()
r.Use(logger.Middleware)
r.Use(middleware.Recoverer)

// Setup stage endpoint
r.Mount("/setup", func() http.Handler {
Expand Down
12 changes: 7 additions & 5 deletions livelog/livelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type Writer struct {

client logstream.Client // client

key string // Unique key to identify in storage
key string // Unique key to identify in storage
name string // Human readable name of the key

num int
now time.Time
Expand All @@ -48,10 +49,11 @@ type Writer struct {
}

// New returns a new writer
func New(client logstream.Client, key string, nudges []logstream.Nudge) *Writer {
func New(client logstream.Client, key, name string, nudges []logstream.Nudge) *Writer {
b := &Writer{
client: client,
key: key,
name: name,
now: time.Now(),
limit: defaultLimit,
interval: defaultInterval,
Expand Down Expand Up @@ -106,7 +108,7 @@ func (b *Writer) Write(p []byte) (n int, err error) {
Timestamp: time.Now(),
ElaspedTime: int64(time.Since(b.now).Seconds()),
}
logrus.WithField("key", b.key).Infoln(line.Message)
logrus.WithField("name", b.name).Infoln(line.Message)

for b.size+len(part) > b.limit {
// Keep streaming even after the limit, but only upload last `b.limit` data to the store
Expand All @@ -116,7 +118,7 @@ func (b *Writer) Write(p []byte) (n int, err error) {

hline, err := json.Marshal(b.history[0])
if err != nil {
logrus.WithError(err).WithField("key", b.key).Errorln("could not marshal log")
logrus.WithError(err).WithField("name", b.name).Errorln("could not marshal log")
}
b.size -= len(hline)
b.history = b.history[1:]
Expand Down Expand Up @@ -152,7 +154,7 @@ func (b *Writer) Open() error {
b.stop() // stop trying to stream if we could not open the stream
return err
}
logrus.WithField("key", b.key).Infoln("successfully opened log stream")
logrus.WithField("name", b.name).Infoln("successfully opened log stream")
b.opened = true
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion livelog/livelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestLineWriterSingle(t *testing.T) {
client := new(mockClient)
w := New(client, "1", nil)
w := New(client, "1", "1", nil)
w.SetInterval(time.Duration(0))
w.num = 4
w.Write([]byte("foo\nbar\n")) // nolint:errcheck
Expand Down
13 changes: 8 additions & 5 deletions pipeline/runtime/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"errors"
"fmt"
"io"
"os"
"strings"

Expand Down Expand Up @@ -34,11 +35,14 @@ func getOutputVarCmd(outputVars []string, outputFile string) string {

// Fetches map of env variable and value from OutputFile.
// OutputFile stores all env variable and value
func fetchOutputVariables(outputFile string) (map[string]string, error) {
func fetchOutputVariables(outputFile string, out io.Writer) (map[string]string, error) {
log := logrus.New()
log.Out = out

outputs := make(map[string]string)
f, err := os.Open(outputFile)
if err != nil {
logrus.WithError(err).WithField("outputFile", outputFile).Errorln("Failed to open output file")
log.WithError(err).WithField("outputFile", outputFile).Errorln("failed to open output file")
return nil, err
}
defer f.Close()
Expand All @@ -48,14 +52,13 @@ func fetchOutputVariables(outputFile string) (map[string]string, error) {
line := s.Text()
sa := strings.Split(line, " ")
if len(sa) < 2 { // nolint:gomnd
logrus.WithField("variable", sa[0]).Warnln(
"output variable does not exist")
log.WithField("variable", sa[0]).Warnln("output variable does not exist")
} else {
outputs[sa[0]] = line[len(sa[0])+1:]
}
}
if err := s.Err(); err != nil {
logrus.WithError(err).Errorln("failed to create scanner from output file")
log.WithError(err).Errorln("failed to create scanner from output file")
return nil, err
}
return outputs, nil
Expand Down
4 changes: 2 additions & 2 deletions pipeline/runtime/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func executeRunStep(ctx context.Context, engine *engine.Engine, r *api.StartStep
step.Command = r.Run.Command
step.Entrypoint = r.Run.Entrypoint

if len(r.OutputVars) > 0 && len(step.Entrypoint) == 0 || len(step.Command) == 0 {
if len(r.OutputVars) > 0 && (len(step.Entrypoint) == 0 || len(step.Command) == 0) {
return nil, nil, fmt.Errorf("output variable should not be set for unset entrypoint or command")
}

Expand All @@ -36,7 +36,7 @@ func executeRunStep(ctx context.Context, engine *engine.Engine, r *api.StartStep

if len(r.OutputVars) > 0 {
if exited != nil && exited.Exited && exited.ExitCode == 0 {
outputs, err := fetchOutputVariables(outputFile) // nolint:govet
outputs, err := fetchOutputVariables(outputFile, out) // nolint:govet
if err != nil {
return exited, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/runtime/runtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func executeRunTestStep(ctx context.Context, engine *engine.Engine, r *api.Start

if len(r.OutputVars) > 0 {
if exited != nil && exited.Exited && exited.ExitCode == 0 {
outputs, err := fetchOutputVariables(outputFile) // nolint:govet
outputs, err := fetchOutputVariables(outputFile, out) // nolint:govet
if err != nil {
return exited, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/runtime/step_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (e *StepExecutor) executeStep(r *api.StartStepRequest) (*runtime.State, map

// Create a log stream for step logs
client := state.GetLogStreamClient()
wc := livelog.New(client, r.LogKey, getNudges())
wc := livelog.New(client, r.LogKey, r.Name, getNudges())
wr := logstream.NewReplacer(wc, secrets)
go wr.Open() // nolint:errcheck

Expand Down
3 changes: 3 additions & 0 deletions setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ func installDocker(instanceInfo InstanceInfo) {
Error("get docker install script failed")
return
}

cmd = exec.Command("sudo", "sh", "get-docker.sh")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
dockerInstallErr := cmd.Run()
if dockerInstallErr != nil {
logrus.
Expand Down

0 comments on commit 61270a0

Please sign in to comment.