diff --git a/go.mod b/go.mod index c60da3a2..8fe6e036 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/gin-gonic/gin v1.7.7 github.com/go-logr/logr v1.3.0 github.com/gofrs/flock v0.8.1 - github.com/google/uuid v1.6.0 github.com/mitchellh/mapstructure v1.5.0 github.com/onsi/ginkgo/v2 v2.12.1 github.com/onsi/gomega v1.27.10 @@ -21,7 +20,7 @@ require ( github.com/pluralsh/console-client-go v0.1.0 github.com/pluralsh/controller-reconcile-helper v0.0.4 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 - github.com/pluralsh/polly v0.1.4 + github.com/pluralsh/polly v0.1.6 github.com/samber/lo v1.39.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 @@ -60,6 +59,7 @@ require ( github.com/Masterminds/squirrel v1.5.3 // indirect github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/chigopher/pathlib v0.19.1 // indirect @@ -101,6 +101,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/gosuri/uitable v0.0.4 // indirect diff --git a/go.sum b/go.sum index 2a6e2ec6..d459ea5c 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembj github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 h1:nvj0OLI3YqYXer/kZD8Ri1aaunCxIEsOst1BVJswV0o= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -579,16 +581,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= -github.com/pluralsh/console-client-go v0.0.95 h1:JxZ4FSGDo9Mxu1947i2ipCZquDQ+iWW6FGx3s4/onpA= -github.com/pluralsh/console-client-go v0.0.95/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo= github.com/pluralsh/console-client-go v0.1.0 h1:Nvu1ch2Q5X0UqndUaCuTC6lL/yzm7ANjKB0xejsNS9I= github.com/pluralsh/console-client-go v0.1.0/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo= github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E= github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34/go.mod h1:IagWXKFYu6NTHzcJx2dJyrIlZ1Sv2PH3fhOtplA9qOs= -github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY= -github.com/pluralsh/polly v0.1.4/go.mod h1:Yo1/jcW+4xwhWG+ZJikZy4J4HJkMNPZ7sq5auL2c/tY= +github.com/pluralsh/polly v0.1.6 h1:hkZYwQ+r04SaxXJIPnOT39opImpDraiStMQQNMdeEzA= +github.com/pluralsh/polly v0.1.6/go.mod h1:Yo1/jcW+4xwhWG+ZJikZy4J4HJkMNPZ7sq5auL2c/tY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= diff --git a/internal/controller/pipelinegate_controller.go b/internal/controller/pipelinegate_controller.go index 17e8667c..9cb02dcc 100644 --- a/internal/controller/pipelinegate_controller.go +++ b/internal/controller/pipelinegate_controller.go @@ -317,22 +317,3 @@ func genJobObjectMeta(gate *v1alpha1.PipelineGate) *batchv1.Job { }, } } - -func gateUpdateAttributes(fragment *console.PipelineGateFragment) console.GateUpdateAttributes { - var jobRef *console.NamespacedName - if fragment.Status != nil && fragment.Status.JobRef != nil { - jobRef = &console.NamespacedName{ - Name: fragment.Status.JobRef.Name, - Namespace: fragment.Status.JobRef.Namespace, - } - } else { - jobRef = &console.NamespacedName{} - } - - return console.GateUpdateAttributes{ - State: &fragment.State, - Status: &console.GateStatusAttributes{ - JobRef: jobRef, - }, - } -} diff --git a/pkg/controller/consts.go b/pkg/controller/consts.go new file mode 100644 index 00000000..b88a04ba --- /dev/null +++ b/pkg/controller/consts.go @@ -0,0 +1,5 @@ +package controller + +const ( + DefaultPageSize = int64(100) +) diff --git a/pkg/controller/controller_manager.go b/pkg/controller/controller_manager.go index 42ef8704..d8db98ba 100644 --- a/pkg/controller/controller_manager.go +++ b/pkg/controller/controller_manager.go @@ -83,7 +83,7 @@ func (cm *ControllerManager) Start() error { go func() { defer controller.Do.ShutdownQueue() defer controller.Do.WipeCache() - _ = wait.PollImmediateInfinite(cm.Refresh, func() (done bool, err error) { + _ = wait.PollUntilContextTimeout(cm.ctx, cm.Refresh, time.Minute, true, func(ctx context.Context) (done bool, err error) { return controller.Do.Poll(cm.ctx) }) }() @@ -94,7 +94,7 @@ func (cm *ControllerManager) Start() error { } go func() { - _ = wait.PollImmediateInfinite(cm.Refresh, func() (done bool, err error) { + _ = wait.PollUntilContextTimeout(cm.ctx, cm.Refresh, time.Minute, true, func(ctx context.Context) (done bool, err error) { return false, cm.Socket.Join() }) }() diff --git a/pkg/controller/pipelinegates/reconciler.go b/pkg/controller/pipelinegates/reconciler.go index d04b16a1..c94663ac 100644 --- a/pkg/controller/pipelinegates/reconciler.go +++ b/pkg/controller/pipelinegates/reconciler.go @@ -9,8 +9,10 @@ import ( "github.com/pluralsh/deployment-operator/api/v1alpha1" "github.com/pluralsh/deployment-operator/internal/utils" "github.com/pluralsh/deployment-operator/pkg/client" + "github.com/pluralsh/deployment-operator/pkg/controller" "github.com/pluralsh/deployment-operator/pkg/ping" "github.com/pluralsh/deployment-operator/pkg/websocket" + "github.com/pluralsh/polly/algorithms" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" @@ -83,26 +85,39 @@ func (s *GateReconciler) ShutdownQueue() { s.GateQueue.ShutDown() } +func (s *GateReconciler) ListGates(ctx context.Context) *algorithms.Pager[*console.PipelineGateEdgeFragment] { + logger := log.FromContext(ctx) + logger.Info("create pipeline gate pager") + fetch := func(page *string, size int64) ([]*console.PipelineGateEdgeFragment, *algorithms.PageInfo, error) { + resp, err := s.ConsoleClient.GetClusterGates(page, &size) + if err != nil { + logger.Error(err, "failed to fetch gates") + return nil, nil, err + } + pageInfo := &algorithms.PageInfo{ + HasNext: resp.PagedClusterGates.PageInfo.HasNextPage, + After: resp.PagedClusterGates.PageInfo.EndCursor, + PageSize: size, + } + return resp.PagedClusterGates.Edges, pageInfo, nil + } + return algorithms.NewPager[*console.PipelineGateEdgeFragment](controller.DefaultPageSize, fetch) +} + func (s *GateReconciler) Poll(ctx context.Context) (done bool, err error) { logger := log.FromContext(ctx) logger.V(1).Info("fetching gates for cluster") - var after *string - var pageSize int64 - pageSize = 100 - hasNextPage := true + pager := s.ListGates(ctx) - for hasNextPage { - resp, err := s.ConsoleClient.GetClusterGates(after, &pageSize) + for pager.HasNext() { + gates, err := pager.NextPage() if err != nil { logger.Error(err, "failed to fetch gates list") return false, nil } - hasNextPage = resp.PagedClusterGates.PageInfo.HasNextPage - after = resp.PagedClusterGates.PageInfo.EndCursor - - for _, gate := range resp.PagedClusterGates.Edges { + for _, gate := range gates { logger.V(2).Info("sending update for", "gate", gate.Node.ID) s.GateQueue.Add(gate.Node.ID) } diff --git a/pkg/controller/service/reconciler.go b/pkg/controller/service/reconciler.go index 748aeaf0..924edbb4 100644 --- a/pkg/controller/service/reconciler.go +++ b/pkg/controller/service/reconciler.go @@ -8,6 +8,8 @@ import ( console "github.com/pluralsh/console-client-go" clienterrors "github.com/pluralsh/deployment-operator/internal/errors" + "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/polly/algorithms" "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -176,13 +178,28 @@ func (s *ServiceReconciler) ShutdownQueue() { s.SvcQueue.ShutDown() } +func (s *ServiceReconciler) ListServices(ctx context.Context) *algorithms.Pager[*console.ServiceDeploymentEdgeFragment] { + logger := log.FromContext(ctx) + logger.Info("create service pager") + fetch := func(page *string, size int64) ([]*console.ServiceDeploymentEdgeFragment, *algorithms.PageInfo, error) { + resp, err := s.ConsoleClient.GetServices(page, &size) + if err != nil { + logger.Error(err, "failed to fetch service list from deployments service") + return nil, nil, err + } + pageInfo := &algorithms.PageInfo{ + HasNext: resp.PagedClusterServices.PageInfo.HasNextPage, + After: resp.PagedClusterServices.PageInfo.EndCursor, + PageSize: size, + } + return resp.PagedClusterServices.Edges, pageInfo, nil + } + return algorithms.NewPager[*console.ServiceDeploymentEdgeFragment](controller.DefaultPageSize, fetch) +} + func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) { logger := log.FromContext(ctx) logger.Info("fetching services for cluster") - var after *string - var pageSize int64 - pageSize = 100 - hasNextPage := true restore, err := s.isClusterRestore(ctx) if err != nil { @@ -194,16 +211,15 @@ func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) { return false, nil } - for hasNextPage { - resp, err := s.ConsoleClient.GetServices(after, &pageSize) + pager := s.ListServices(ctx) + + for pager.HasNext() { + services, err := pager.NextPage() if err != nil { logger.Error(err, "failed to fetch service list from deployments service") return false, nil } - - hasNextPage = resp.PagedClusterServices.PageInfo.HasNextPage - after = resp.PagedClusterServices.PageInfo.EndCursor - for _, svc := range resp.PagedClusterServices.Edges { + for _, svc := range services { logger.Info("sending update for", "service", svc.Node.ID) s.SvcQueue.Add(svc.Node.ID) } diff --git a/pkg/manifests/template/helm.go b/pkg/manifests/template/helm.go index 1fb8798f..3ffcdae6 100644 --- a/pkg/manifests/template/helm.go +++ b/pkg/manifests/template/helm.go @@ -15,6 +15,7 @@ import ( "github.com/gofrs/flock" "github.com/pkg/errors" console "github.com/pluralsh/console-client-go" + "github.com/pluralsh/polly/algorithms" "github.com/pluralsh/polly/fs" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart" @@ -157,7 +158,7 @@ func (h *helm) values(svc *console.GetServiceDeploymentForAgent_ServiceDeploymen if err != nil { return currentMap, err } - currentMap = merge(currentMap, nextMap) + currentMap = algorithms.Merge(currentMap, nextMap) } } @@ -166,28 +167,7 @@ func (h *helm) values(svc *console.GetServiceDeploymentForAgent_ServiceDeploymen return currentMap, nil } - return merge(currentMap, overrides), nil -} - -func merge(m1, m2 map[string]interface{}) map[string]interface{} { - // lifted from helm's merge code - out := make(map[string]interface{}, len(m1)) - for k, v := range m1 { - out[k] = v - } - - for k, v := range m2 { - if v, ok := v.(map[string]interface{}); ok { - if bv, ok := out[k]; ok { - if bv, ok := bv.(map[string]interface{}); ok { - out[k] = merge(bv, v) - continue - } - } - } - out[k] = v - } - return out + return algorithms.Merge(currentMap, overrides), nil } func (h *helm) valuesFile(svc *console.GetServiceDeploymentForAgent_ServiceDeployment, filename string) (map[string]interface{}, error) { diff --git a/pkg/manifests/template/raw_test.go b/pkg/manifests/template/raw_test.go index 1c7b3e86..023afcb5 100644 --- a/pkg/manifests/template/raw_test.go +++ b/pkg/manifests/template/raw_test.go @@ -1,12 +1,12 @@ package template import ( - "github.com/samber/lo" "path/filepath" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" console "github.com/pluralsh/console-client-go" + "github.com/samber/lo" ) var _ = Describe("Raw template", func() {