Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): use GetInfo for setup #509

Merged
merged 16 commits into from
Sep 25, 2023
3 changes: 1 addition & 2 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ func (w *Worker) exec(index int, config *library.Worker) error {
if err != nil {
logger.Errorf("unable to update worker: %v", err)
}

}()

// capture the configured build timeout
Expand Down Expand Up @@ -297,7 +296,7 @@ func (w *Worker) exec(index int, config *library.Worker) error {
}

// getWorkerStatusFromConfig is a helper function
// to determine the appropriate worker status
// to determine the appropriate worker status.
func (w *Worker) getWorkerStatusFromConfig(config *library.Worker) string {
switch rb := len(config.GetRunningBuildIDs()); {
case rb == 0:
Expand Down
65 changes: 39 additions & 26 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

Expand All @@ -22,12 +22,10 @@ import (
// queue and execute Vela pipelines.
func (w *Worker) operate(ctx context.Context) error {
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
var err error

// create the errgroup for managing operator subprocesses
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group
executors, gctx := errgroup.WithContext(ctx)

// Define the database representation of the worker
// and register itself in the database
registryWorker := new(library.Worker)
Expand All @@ -43,13 +41,50 @@ func (w *Worker) operate(ctx context.Context) error {
token := <-w.RegisterToken

logrus.Trace("received register token")

logrus.Trace("setting up vela client")
// setup the vela client with the token
w.VelaClient, err = setupClient(w.Config.Server, token)
if err != nil {
return err
}

logrus.Trace("getting queue creds")

// fetching queue credentials using registration token
creds, _, err := w.VelaClient.Queue.GetInfo()
if err != nil {
logrus.Trace("error getting creds")

return err
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
}

// set queue address and public key using credentials received from server
w.Config.Queue.Address = creds.GetQueueAddress()
w.Config.Queue.PublicKey = creds.GetPublicKey()

// setup the queue
//
// https://pkg.go.dev/github.com/go-vela/server/queue?tab=doc#New
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}

return err
}

// spawn goroutine for phoning home
executors.Go(func() error {
for {
Expand All @@ -61,7 +96,6 @@ func (w *Worker) operate(ctx context.Context) error {
// check in attempt loop
for {
// register or update the worker
//nolint:contextcheck // ignore passing context
w.CheckedIn, token, err = w.checkIn(registryWorker)
// check in failed
if err != nil {
Expand Down Expand Up @@ -113,27 +147,6 @@ func (w *Worker) operate(ctx context.Context) error {
}
})

// setup the queue
//
// https://pkg.go.dev/github.com/go-vela/server/queue?tab=doc#New
//nolint:contextcheck // ignore passing context
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)
if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("status update response is nil")
}
if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}
return err
}

// iterate till the configured build limit
for i := 0; i < w.Config.Build.Limit; i++ {
// evaluate and capture i at each iteration
Expand Down
10 changes: 4 additions & 6 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ func run(c *cli.Context) error {
},
// queue configuration
Queue: &queue.Setup{
Driver: c.String("queue.driver"),
Address: c.String("queue.addr"),
Cluster: c.Bool("queue.cluster"),
Routes: c.StringSlice("queue.routes"),
Timeout: c.Duration("queue.pop.timeout"),
PublicKey: c.String("queue.public-key"),
Driver: c.String("queue.driver"),
Cluster: c.Bool("queue.cluster"),
Routes: c.StringSlice("queue.routes"),
Timeout: c.Duration("queue.pop.timeout"),
},
// server configuration
Server: &Server{
Expand Down
10 changes: 1 addition & 9 deletions cmd/vela-worker/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,10 @@ func (w *Worker) Validate() error {
return fmt.Errorf("no worker executor driver provided")
}

// verify the queue configuration
//
// https://godoc.org/github.com/go-vela/server/queue#Setup.Validate
err := w.Config.Queue.Validate()
if err != nil {
return err
}

// verify the runtime configuration
//
// https://godoc.org/github.com/go-vela/worker/runtime#Setup.Validate
err = w.Config.Runtime.Validate()
err := w.Config.Runtime.Validate()
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ services:
environment:
EXECUTOR_DRIVER: linux
QUEUE_DRIVER: redis
QUEUE_ADDR: 'redis://redis:6379'
QUEUE_PUBLIC_KEY: 'DXsJkoTSkHlG26d75LyHJG+KQsXPr8VKPpmH/78zmko='
VELA_BUILD_LIMIT: 1
VELA_BUILD_TIMEOUT: 30m
VELA_LOG_LEVEL: trace
VELA_RUNTIME_DRIVER: docker
VELA_RUNTIME_PRIVILEGED_IMAGES: 'target/vela-docker'
VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS: 'true'
VELA_SERVER_ADDR: 'http://server:8080'
# comment the line below to use registration flow
# comment the VELA_SERVER_SECRET line below to use registration flow
VELA_SERVER_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
WORKER_ADDR: 'http://worker:8080'
WORKER_CHECK_IN: 2m
Expand Down Expand Up @@ -73,6 +71,7 @@ services:
VELA_LOG_LEVEL: trace
# comment the line below to use registration flow
VELA_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
QUEUE_PUBLIC_KEY: 'DXsJkoTSkHlG26d75LyHJG+KQsXPr8VKPpmH/78zmko='
VELA_SERVER_PRIVATE_KEY: 'F534FF2A080E45F38E05DC70752E6787'
VELA_USER_REFRESH_TOKEN_DURATION: 90m
VELA_USER_ACCESS_TOKEN_DURATION: 60m
Expand Down
44 changes: 22 additions & 22 deletions executor/linux/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,104 +1010,104 @@ func TestLinux_Secret_injectSecret(t *testing.T) {
name: "secret with matching push event ACL injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"push"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"FOO": "foo", "BUILD_EVENT": "push"},
Environment: map[string]string{"FOO": "foo", "VELA_BUILD_EVENT": "push"},
},
},
{
name: "secret with non-matching push event ACL not injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"deployment"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
},
},
{ // pull_request event checks
name: "secret with matching pull_request event ACL injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "pull_request"},
Environment: map[string]string{"VELA_BUILD_EVENT": "pull_request"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"pull_request"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"FOO": "foo", "BUILD_EVENT": "pull_request"},
Environment: map[string]string{"FOO": "foo", "VELA_BUILD_EVENT": "pull_request"},
},
},
{
name: "secret with non-matching pull_request event ACL not injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "pull_request"},
Environment: map[string]string{"VELA_BUILD_EVENT": "pull_request"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"deployment"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "pull_request"},
Environment: map[string]string{"VELA_BUILD_EVENT": "pull_request"},
},
},
{ // tag event checks
name: "secret with matching tag event ACL injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "tag"},
Environment: map[string]string{"VELA_BUILD_EVENT": "tag"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"tag"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"FOO": "foo", "BUILD_EVENT": "tag"},
Environment: map[string]string{"FOO": "foo", "VELA_BUILD_EVENT": "tag"},
},
},
{
name: "secret with non-matching tag event ACL not injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "tag"},
Environment: map[string]string{"VELA_BUILD_EVENT": "tag"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"deployment"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "tag"},
Environment: map[string]string{"VELA_BUILD_EVENT": "tag"},
},
},
{ // deployment event checks
name: "secret with matching deployment event ACL injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "deployment"},
Environment: map[string]string{"VELA_BUILD_EVENT": "deployment"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"deployment"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"FOO": "foo", "BUILD_EVENT": "deployment"},
Environment: map[string]string{"FOO": "foo", "VELA_BUILD_EVENT": "deployment"},
},
},
{
name: "secret with non-matching deployment event ACL not injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "deployment"},
Environment: map[string]string{"VELA_BUILD_EVENT": "deployment"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"tag"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "deployment"},
Environment: map[string]string{"VELA_BUILD_EVENT": "deployment"},
},
},

Expand All @@ -1116,39 +1116,39 @@ func TestLinux_Secret_injectSecret(t *testing.T) {
name: "secret with matching event ACL and non-matching image ACL not injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"push"}, Images: &[]string{"centos"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
},
},
{
name: "secret with non-matching event ACL and matching image ACL not injected",
step: &pipeline.Container{
Image: "centos:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"pull_request"}, Images: &[]string{"centos"}}},
want: &pipeline.Container{
Image: "centos:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
},
},
{
name: "secret with matching event ACL and matching image ACL injected",
step: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"BUILD_EVENT": "push"},
Environment: map[string]string{"VELA_BUILD_EVENT": "push"},
Secrets: pipeline.StepSecretSlice{{Source: "FOO", Target: "FOO"}},
},
msec: map[string]*library.Secret{"FOO": {Name: &v, Value: &v, Events: &[]string{"push"}, Images: &[]string{"alpine"}}},
want: &pipeline.Container{
Image: "alpine:latest",
Environment: map[string]string{"FOO": "foo", "BUILD_EVENT": "push"},
Environment: map[string]string{"FOO": "foo", "VELA_BUILD_EVENT": "push"},
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ require (
github.com/docker/docker v20.10.25+incompatible
github.com/docker/go-units v0.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-vela/sdk-go v0.20.2-0.20230824133536-0b0212b996f5
github.com/go-vela/server v0.20.1-0.20230831135557-46337cfce67d
github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d
github.com/go-vela/sdk-go v0.20.2-0.20230925160153-e6031fe704d7
github.com/go-vela/server v0.20.1-0.20230925142408-ab85e96f27c4
github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/go-cmp v0.5.9
github.com/joho/godotenv v1.5.1
Expand Down
14 changes: 7 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91
github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js=
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-vela/sdk-go v0.20.2-0.20230824133536-0b0212b996f5 h1:PfGWo9Yzv6xfHYSYYArlsrWfs3cvOlBopCjHyHi/SPs=
github.com/go-vela/sdk-go v0.20.2-0.20230824133536-0b0212b996f5/go.mod h1:j7DFviaUd+XArpFr9KoHLWwUXRyFYAiZvPPF42gktoA=
github.com/go-vela/server v0.20.1-0.20230831135557-46337cfce67d h1:xgrejztRM+VM2uOLNGZubkIk11wbsg0qTcyAHO5Kpbo=
github.com/go-vela/server v0.20.1-0.20230831135557-46337cfce67d/go.mod h1:h4i/+HUpfh+kViIpf2016fJXZxVFz6B9EECA28qp/fg=
github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d h1:ag6trc3Ev+7hzifeWy0M9rHHjrO9nFCYgW8dlKdZ4j4=
github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d/go.mod h1:AXO4oQSygOBQ02fPapsKjQHkx2aQO3zTu7clpvVbXBY=
github.com/go-vela/sdk-go v0.20.2-0.20230925160153-e6031fe704d7 h1:VdWX61yJZ0+NFQvzrskISSF3hZvCbpCX54O7gK6mz2U=
github.com/go-vela/sdk-go v0.20.2-0.20230925160153-e6031fe704d7/go.mod h1:l2ZvSlZICJiKqDxkeGvXXmWebUsFcacvEt4PVMLqV5Y=
github.com/go-vela/server v0.20.1-0.20230925142408-ab85e96f27c4 h1:SZBRTSE2RWutnzz1PN+n05YwoVI4gIFJc/xEb2KUklc=
github.com/go-vela/server v0.20.1-0.20230925142408-ab85e96f27c4/go.mod h1:Ey05oMaWSYmMnajDCBtBYvCbx34N9o3PBf+pGJlp6hE=
github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d h1:PoGQfHM1Lq3cCttrQ9s5Cp9bHc1xXNWNYLGArPvHqRo=
github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d/go.mod h1:fVmUP4y7Cw8cG6CBWTdjIdgXNNrpVo25yoE9NmNBdOg=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down Expand Up @@ -323,7 +323,7 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH
github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0NiuqvtfMY=
github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
Expand Down