diff --git a/go.mod b/go.mod index 13b5d0a9..662c4010 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ replace ( require ( github.com/Masterminds/sprig/v3 v3.2.3 + github.com/alitto/pond v1.8.3 github.com/argoproj/gitops-engine v0.7.1-0.20230906152414-b0fffe419a0f github.com/go-logr/logr v1.2.4 github.com/orcaman/concurrent-map/v2 v2.0.1 diff --git a/go.sum b/go.sum index 4d575d1a..1f91131d 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs= +github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -541,12 +543,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= -github.com/pluralsh/console-client-go v0.0.5 h1:+L7I3QLMWNBiuZlWe/YJfUMMZGnpKhEAlbs9sL+hiSc= -github.com/pluralsh/console-client-go v0.0.5/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU= -github.com/pluralsh/console-client-go v0.0.8 h1:BwWOt1ggBX/fxzY2+01dk8sBTz1jqT57o2y1Iz9Zxzk= -github.com/pluralsh/console-client-go v0.0.8/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU= -github.com/pluralsh/console-client-go v0.0.11 h1:2fchZE6qlSQmHTeuH54hAzJJpgKpx2Kbl8HhJNugbns= -github.com/pluralsh/console-client-go v0.0.11/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU= github.com/pluralsh/console-client-go v0.0.14 h1:vpvC6SR7A0MIrpeyR78hM6IreOLKgg+moRIEjyUnKZo= github.com/pluralsh/console-client-go v0.0.14/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU= github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index aca80c88..8595bfb5 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -4,13 +4,13 @@ import ( "fmt" "time" + "github.com/alitto/pond" "github.com/argoproj/gitops-engine/pkg/cache" "github.com/argoproj/gitops-engine/pkg/engine" "github.com/pluralsh/deployment-operator/pkg/client" "github.com/pluralsh/deployment-operator/pkg/manifests" deploysync "github.com/pluralsh/deployment-operator/pkg/sync" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" @@ -26,7 +26,6 @@ type Agent struct { discoveryClient *discovery.DiscoveryClient engine *deploysync.Engine deathChan chan interface{} - svcQueue workqueue.RateLimitingInterface cleanup engine.StopFunc refresh time.Duration } @@ -74,7 +73,6 @@ func New(clientConfig clientcmd.ClientConfig, refresh time.Duration, consoleUrl, consoleClient: consoleClient, engine: engine, deathChan: deathChan, - svcQueue: svcQueue, cleanup: cleanup, refresh: refresh, }, nil @@ -82,38 +80,37 @@ func New(clientConfig clientcmd.ClientConfig, refresh time.Duration, consoleUrl, func (agent *Agent) Run() { defer agent.cleanup() - defer agent.svcQueue.ShutDown() defer agent.engine.WipeCache() - go func() { - for { - go agent.engine.ControlLoop() - failure := <-agent.deathChan - fmt.Printf("recovered from panic %v\n", failure) - } - }() + panicHandler := func(p interface{}) { + fmt.Printf("Task panicked: %v", p) + } - wait.PollInfinite(agent.refresh, func() (done bool, err error) { + for { log.Info("fetching services for cluster") svcs, err := agent.consoleClient.GetServices() if err != nil { - log.Error(err, "failed to fetch service list from deployments service") - return false, nil + log.Error(err, "failed to fetch service list from deployments service %v", err) + time.Sleep(agent.refresh) + continue } - + pool := pond.New(20, 100, pond.MinWorkers(20), pond.PanicHandler(panicHandler)) for _, svc := range svcs { - log.Info("sending update for", "service", svc.ID) - agent.svcQueue.Add(svc.ID) + log.Info("sending update for", "service", svc.ID, "namespace", svc.Namespace, "name", svc.Name) + pool.TrySubmit(func() { + if err := agent.engine.ProcessItem(svc.ID); err != nil { + log.Error(err, "found unprocessable error") + } + }) } - + pool.StopAndWait() info, err := agent.discoveryClient.ServerVersion() if err != nil { log.Error(err, "failed to fetch cluster version") - return false, nil } v := fmt.Sprintf("%s.%s", info.Major, info.Minor) if err := agent.consoleClient.Ping(v); err != nil { log.Error(err, "failed to ping cluster after scheduling syncs") } - return false, nil - }) + time.Sleep(agent.refresh) + } } diff --git a/pkg/sync/loop.go b/pkg/sync/loop.go index dc0308ee..2ff7be6c 100644 --- a/pkg/sync/loop.go +++ b/pkg/sync/loop.go @@ -3,48 +3,15 @@ package sync import ( "context" "fmt" - "runtime/debug" "github.com/argoproj/gitops-engine/pkg/sync" "github.com/argoproj/gitops-engine/pkg/sync/common" "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/util/wait" ) -func (engine *Engine) ControlLoop() { - if engine.deathChan != nil { - defer func() { - if r := recover(); r != nil { - engine.deathChan <- r - fmt.Printf("panic: %s\n", string(debug.Stack())) - } - }() - } - - engine.RegisterHandlers() - - wait.PollInfinite(syncDelay, func() (done bool, err error) { - log.Info("Polling for new service updates") - - item, shutdown := engine.svcQueue.Get() - if shutdown { - return true, nil - } - - if err := engine.processItem(item); err != nil { - log.Error(err, "found unprocessable error") - } - - engine.syncing = "" - - return false, nil - }) -} - -func (engine *Engine) processItem(item interface{}) error { - defer engine.svcQueue.Done(item) +func (engine *Engine) ProcessItem(item interface{}) error { id := item.(string) if id == "" { @@ -52,7 +19,6 @@ func (engine *Engine) processItem(item interface{}) error { } log.Info("attempting to sync service", "id", id) - engine.syncing = id svc, err := engine.svcCache.Get(id) if err != nil { fmt.Printf("failed to fetch service from cache: %s, ignoring for now", err) @@ -74,7 +40,7 @@ func (engine *Engine) processItem(item interface{}) error { if manErr != nil { if err := engine.updateStatus(svc.ID, results, errorAttributes("manifests", manErr)); err != nil { - log.Error(err, "Failed to update service status, ignoring for now") + log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name) } log.Error(manErr, "failed to parse manifests") return manErr @@ -87,7 +53,7 @@ func (engine *Engine) processItem(item interface{}) error { diff, err := engine.diff(manifests, svc.Namespace, svc.ID) checkModifications := sync.WithResourceModificationChecker(true, diff) if err != nil { - log.Error(err, "could not build diff list, ignoring for now") + log.Error(err, "could not build diff list for service, ignoring for now", "namespace", svc.Namespace, "name", svc.Name) checkModifications = sync.WithResourceModificationChecker(false, nil) } @@ -113,7 +79,7 @@ func (engine *Engine) processItem(item interface{}) error { } if err := engine.updateStatus(svc.ID, results, errorAttributes("sync", err)); err != nil { - log.Error(err, "Failed to update service status, ignoring for now") + log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name) } return nil