Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: nested api/worker #567

Merged
merged 8 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@ import (
"context"
"encoding/json"
"net/http"
"strconv"
"sync"
"time"

"github.com/sirupsen/logrus"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
"github.com/go-vela/worker/version"
"github.com/sirupsen/logrus"
)

// exec is a helper function to poll the queue
// and execute Vela pipelines for the Worker.
//
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker
func (w *Worker) exec(index int, config *library.Worker) error {
func (w *Worker) exec(index int, config *api.Worker) error {
var err error

// setup the version
Expand Down Expand Up @@ -103,14 +103,14 @@ func (w *Worker) exec(index int, config *library.Worker) error {
"version": v.Semantic(),
})

// lock and append the build to the RunningBuildIDs list
w.RunningBuildIDsMutex.Lock()
// lock and append the build to the list
w.RunningBuildsMutex.Lock()

w.RunningBuildIDs = append(w.RunningBuildIDs, strconv.FormatInt(item.Build.GetID(), 10))
w.RunningBuilds = append(w.RunningBuilds, item.Build)

config.SetRunningBuildIDs(w.RunningBuildIDs)
config.SetRunningBuilds(w.RunningBuilds)

w.RunningBuildIDsMutex.Unlock()
w.RunningBuildsMutex.Unlock()

// set worker status
updateStatus := w.getWorkerStatusFromConfig(config)
Expand Down Expand Up @@ -208,18 +208,18 @@ func (w *Worker) exec(index int, config *library.Worker) error {

logger.Info("completed build")

// lock and remove the build from the RunningBuildIDs list
w.RunningBuildIDsMutex.Lock()
// lock and remove the build from the list
w.RunningBuildsMutex.Lock()

for i, v := range w.RunningBuildIDs {
if v == strconv.FormatInt(item.Build.GetID(), 10) {
w.RunningBuildIDs = append(w.RunningBuildIDs[:i], w.RunningBuildIDs[i+1:]...)
for i, v := range w.RunningBuilds {
if v.GetID() == item.Build.GetID() {
w.RunningBuilds = append(w.RunningBuilds[:i], w.RunningBuilds[i+1:]...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we upgrade to Go 1.22, we could use slices.Concat here.... just noting 😄

}
}

config.SetRunningBuildIDs(w.RunningBuildIDs)
config.SetRunningBuilds(w.RunningBuilds)

w.RunningBuildIDsMutex.Unlock()
w.RunningBuildsMutex.Unlock()

// set worker status
updateStatus := w.getWorkerStatusFromConfig(config)
Expand Down Expand Up @@ -303,8 +303,8 @@ func (w *Worker) exec(index int, config *library.Worker) error {

// getWorkerStatusFromConfig is a helper function
// to determine the appropriate worker status.
func (w *Worker) getWorkerStatusFromConfig(config *library.Worker) string {
switch rb := len(config.GetRunningBuildIDs()); {
func (w *Worker) getWorkerStatusFromConfig(config *api.Worker) string {
switch rb := len(config.GetRunningBuilds()); {
case rb == 0:
return constants.WorkerStatusIdle
case rb < w.Config.Build.Limit:
Expand Down
19 changes: 14 additions & 5 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"time"

"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
)

// operate is a helper function to initiate all
Expand All @@ -27,7 +27,7 @@ func (w *Worker) operate(ctx context.Context) error {
executors, gctx := errgroup.WithContext(ctx)
// Define the database representation of the worker
// and register itself in the database
registryWorker := new(library.Worker)
registryWorker := new(api.Worker)
registryWorker.SetHostname(w.Config.API.Address.Hostname())
registryWorker.SetAddress(w.Config.API.Address.String())
registryWorker.SetRoutes(w.Config.Queue.Routes)
Expand Down Expand Up @@ -118,13 +118,16 @@ func (w *Worker) operate(ctx context.Context) error {

continue
}

w.QueueCheckedIn, err = w.queueCheckIn(gctx, registryWorker)

if err != nil {
// queue check in failed, retry
logrus.Errorf("unable to ping queue %v", err)
logrus.Info("retrying check-in...")

time.Sleep(5 * time.Second)

continue
}

Expand Down Expand Up @@ -166,19 +169,22 @@ func (w *Worker) operate(ctx context.Context) error {
if !w.CheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("worker not checked in, skipping queue read")

continue
}
// do not pull from queue unless queue setup is done and connected
if !w.QueueCheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("queue ping failed, skipping queue read")

continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
"id": id,
}).Info("completed looping on worker executor")

return nil
default:
logrus.WithFields(logrus.Fields{
Expand All @@ -197,16 +203,19 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Errorf("failing worker executor: %v", err)
registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}

return err
}
}
Expand Down
13 changes: 7 additions & 6 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"fmt"
"net/http"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"

api "github.com/go-vela/server/api/types"
"github.com/go-vela/types/constants"
)

// checkIn is a helper function to phone home to the server.
func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
func (w *Worker) checkIn(config *api.Worker) (bool, string, error) {
// check to see if the worker already exists in the database
logrus.Infof("retrieving worker %s from the server", config.GetHostname())

Expand Down Expand Up @@ -48,7 +49,7 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
}

// register is a helper function to register the worker with the server.
func (w *Worker) register(config *library.Worker) (bool, string, error) {
func (w *Worker) register(config *api.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

// status Idle will be set for worker upon first time registration
Expand All @@ -68,7 +69,7 @@ func (w *Worker) register(config *library.Worker) (bool, string, error) {
}

// queueCheckIn is a helper function to phone home to the redis.
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worker) (bool, error) {
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *api.Worker) (bool, error) {
pErr := w.Queue.Ping(ctx)
if pErr != nil {
logrus.Errorf("worker %s unable to contact the queue: %v", registryWorker.GetHostname(), pErr)
Expand All @@ -86,7 +87,7 @@ func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worke

// updateWorkerStatus is a helper function to update worker status
// logs the error if it can't update status.
func (w *Worker) updateWorkerStatus(config *library.Worker, status string) {
func (w *Worker) updateWorkerStatus(config *api.Worker, status string) {
config.SetStatus(status)
_, resp, logErr := w.VelaClient.Worker.Update(config.GetHostname(), config)

Expand Down
13 changes: 6 additions & 7 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ import (
"net/url"

"github.com/gin-gonic/gin"

"github.com/go-vela/server/queue"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"

"github.com/sirupsen/logrus"

"github.com/urfave/cli/v2"

_ "github.com/joho/godotenv/autoload"

"github.com/go-vela/server/queue"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
)

// run executes the worker based
Expand Down Expand Up @@ -137,7 +136,7 @@ func run(c *cli.Context) error {

RegisterToken: make(chan string, 1),

RunningBuildIDs: make([]string, 0),
RunningBuilds: make([]*library.Build, 0),
}

// set the worker address if no flag was provided
Expand Down
21 changes: 11 additions & 10 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/server/queue"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
)
Expand Down Expand Up @@ -61,15 +62,15 @@ type (
// Worker represents all configuration and
// system processes for the worker.
Worker struct {
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
QueueCheckedIn bool
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
RunningBuilds []*library.Build
QueueCheckedIn bool
RunningBuildsMutex sync.Mutex
}
)
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/docker/docker v24.0.9+incompatible
github.com/docker/go-units v0.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-vela/sdk-go v0.23.2
github.com/go-vela/server v0.23.2
github.com/go-vela/sdk-go v0.23.3-0.20240319181130-4a7c245c93ae
github.com/go-vela/server v0.23.4-0.20240319161125-1809638e7e72
github.com/go-vela/types v0.23.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/go-cmp v0.6.0
Expand All @@ -33,7 +33,7 @@ require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis/v2 v2.31.1 // indirect
github.com/alicebob/miniredis/v2 v2.32.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buildkite/yaml v0.0.0-20230306222819-0e4e032d4835 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
Expand Down Expand Up @@ -79,7 +79,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/reflectwalk v1.0.1 // indirect
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect
Expand All @@ -102,8 +102,8 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
go.starlark.net v0.0.0-20240311180835-efac67204ba7 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.starlark.net v0.0.0-20240314022150-ee8ed142361c // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/mod v0.14.0 // indirect
Expand All @@ -115,7 +115,7 @@ require (
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.16.1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading
Loading