Skip to content

Commit

Permalink
feat: use polly lib (#142)
Browse files Browse the repository at this point in the history
* use polly lib

* fix linter

* fix linter

* dix linter
  • Loading branch information
zreigz authored Mar 13, 2024
1 parent 038ee2c commit 9e43ac7
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 71 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/gin-gonic/gin v1.7.7
github.com/go-logr/logr v1.3.0
github.com/gofrs/flock v0.8.1
github.com/google/uuid v1.6.0
github.com/mitchellh/mapstructure v1.5.0
github.com/onsi/ginkgo/v2 v2.12.1
github.com/onsi/gomega v1.27.10
Expand All @@ -21,7 +20,7 @@ require (
github.com/pluralsh/console-client-go v0.1.0
github.com/pluralsh/controller-reconcile-helper v0.0.4
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
github.com/pluralsh/polly v0.1.4
github.com/pluralsh/polly v0.1.6
github.com/samber/lo v1.39.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -60,6 +59,7 @@ require (
github.com/Masterminds/squirrel v1.5.3 // indirect
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/chigopher/pathlib v0.19.1 // indirect
Expand Down Expand Up @@ -101,6 +101,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/gosuri/uitable v0.0.4 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembj
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 h1:nvj0OLI3YqYXer/kZD8Ri1aaunCxIEsOst1BVJswV0o=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down Expand Up @@ -579,16 +581,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pluralsh/console-client-go v0.0.95 h1:JxZ4FSGDo9Mxu1947i2ipCZquDQ+iWW6FGx3s4/onpA=
github.com/pluralsh/console-client-go v0.0.95/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/console-client-go v0.1.0 h1:Nvu1ch2Q5X0UqndUaCuTC6lL/yzm7ANjKB0xejsNS9I=
github.com/pluralsh/console-client-go v0.1.0/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E=
github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s=
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34/go.mod h1:IagWXKFYu6NTHzcJx2dJyrIlZ1Sv2PH3fhOtplA9qOs=
github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY=
github.com/pluralsh/polly v0.1.4/go.mod h1:Yo1/jcW+4xwhWG+ZJikZy4J4HJkMNPZ7sq5auL2c/tY=
github.com/pluralsh/polly v0.1.6 h1:hkZYwQ+r04SaxXJIPnOT39opImpDraiStMQQNMdeEzA=
github.com/pluralsh/polly v0.1.6/go.mod h1:Yo1/jcW+4xwhWG+ZJikZy4J4HJkMNPZ7sq5auL2c/tY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
Expand Down
19 changes: 0 additions & 19 deletions internal/controller/pipelinegate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,22 +317,3 @@ func genJobObjectMeta(gate *v1alpha1.PipelineGate) *batchv1.Job {
},
}
}

func gateUpdateAttributes(fragment *console.PipelineGateFragment) console.GateUpdateAttributes {
var jobRef *console.NamespacedName
if fragment.Status != nil && fragment.Status.JobRef != nil {
jobRef = &console.NamespacedName{
Name: fragment.Status.JobRef.Name,
Namespace: fragment.Status.JobRef.Namespace,
}
} else {
jobRef = &console.NamespacedName{}
}

return console.GateUpdateAttributes{
State: &fragment.State,
Status: &console.GateStatusAttributes{
JobRef: jobRef,
},
}
}
5 changes: 5 additions & 0 deletions pkg/controller/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package controller

const (
DefaultPageSize = int64(100)
)
4 changes: 2 additions & 2 deletions pkg/controller/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (cm *ControllerManager) Start() error {
go func() {
defer controller.Do.ShutdownQueue()
defer controller.Do.WipeCache()
_ = wait.PollImmediateInfinite(cm.Refresh, func() (done bool, err error) {
_ = wait.PollUntilContextTimeout(cm.ctx, cm.Refresh, time.Minute, true, func(ctx context.Context) (done bool, err error) {
return controller.Do.Poll(cm.ctx)
})
}()
Expand All @@ -94,7 +94,7 @@ func (cm *ControllerManager) Start() error {
}

go func() {
_ = wait.PollImmediateInfinite(cm.Refresh, func() (done bool, err error) {
_ = wait.PollUntilContextTimeout(cm.ctx, cm.Refresh, time.Minute, true, func(ctx context.Context) (done bool, err error) {
return false, cm.Socket.Join()
})
}()
Expand Down
35 changes: 25 additions & 10 deletions pkg/controller/pipelinegates/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/ping"
"github.com/pluralsh/deployment-operator/pkg/websocket"
"github.com/pluralsh/polly/algorithms"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -83,26 +85,39 @@ func (s *GateReconciler) ShutdownQueue() {
s.GateQueue.ShutDown()
}

func (s *GateReconciler) ListGates(ctx context.Context) *algorithms.Pager[*console.PipelineGateEdgeFragment] {
logger := log.FromContext(ctx)
logger.Info("create pipeline gate pager")
fetch := func(page *string, size int64) ([]*console.PipelineGateEdgeFragment, *algorithms.PageInfo, error) {
resp, err := s.ConsoleClient.GetClusterGates(page, &size)
if err != nil {
logger.Error(err, "failed to fetch gates")
return nil, nil, err
}
pageInfo := &algorithms.PageInfo{
HasNext: resp.PagedClusterGates.PageInfo.HasNextPage,
After: resp.PagedClusterGates.PageInfo.EndCursor,
PageSize: size,
}
return resp.PagedClusterGates.Edges, pageInfo, nil
}
return algorithms.NewPager[*console.PipelineGateEdgeFragment](controller.DefaultPageSize, fetch)
}

func (s *GateReconciler) Poll(ctx context.Context) (done bool, err error) {
logger := log.FromContext(ctx)
logger.V(1).Info("fetching gates for cluster")

var after *string
var pageSize int64
pageSize = 100
hasNextPage := true
pager := s.ListGates(ctx)

for hasNextPage {
resp, err := s.ConsoleClient.GetClusterGates(after, &pageSize)
for pager.HasNext() {
gates, err := pager.NextPage()
if err != nil {
logger.Error(err, "failed to fetch gates list")
return false, nil
}

hasNextPage = resp.PagedClusterGates.PageInfo.HasNextPage
after = resp.PagedClusterGates.PageInfo.EndCursor

for _, gate := range resp.PagedClusterGates.Edges {
for _, gate := range gates {
logger.V(2).Info("sending update for", "gate", gate.Node.ID)
s.GateQueue.Add(gate.Node.ID)
}
Expand Down
36 changes: 26 additions & 10 deletions pkg/controller/service/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

console "github.com/pluralsh/console-client-go"
clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/polly/algorithms"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -176,13 +178,28 @@ func (s *ServiceReconciler) ShutdownQueue() {
s.SvcQueue.ShutDown()
}

func (s *ServiceReconciler) ListServices(ctx context.Context) *algorithms.Pager[*console.ServiceDeploymentEdgeFragment] {
logger := log.FromContext(ctx)
logger.Info("create service pager")
fetch := func(page *string, size int64) ([]*console.ServiceDeploymentEdgeFragment, *algorithms.PageInfo, error) {
resp, err := s.ConsoleClient.GetServices(page, &size)
if err != nil {
logger.Error(err, "failed to fetch service list from deployments service")
return nil, nil, err
}
pageInfo := &algorithms.PageInfo{
HasNext: resp.PagedClusterServices.PageInfo.HasNextPage,
After: resp.PagedClusterServices.PageInfo.EndCursor,
PageSize: size,
}
return resp.PagedClusterServices.Edges, pageInfo, nil
}
return algorithms.NewPager[*console.ServiceDeploymentEdgeFragment](controller.DefaultPageSize, fetch)
}

func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) {
logger := log.FromContext(ctx)
logger.Info("fetching services for cluster")
var after *string
var pageSize int64
pageSize = 100
hasNextPage := true

restore, err := s.isClusterRestore(ctx)
if err != nil {
Expand All @@ -194,16 +211,15 @@ func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) {
return false, nil
}

for hasNextPage {
resp, err := s.ConsoleClient.GetServices(after, &pageSize)
pager := s.ListServices(ctx)

for pager.HasNext() {
services, err := pager.NextPage()
if err != nil {
logger.Error(err, "failed to fetch service list from deployments service")
return false, nil
}

hasNextPage = resp.PagedClusterServices.PageInfo.HasNextPage
after = resp.PagedClusterServices.PageInfo.EndCursor
for _, svc := range resp.PagedClusterServices.Edges {
for _, svc := range services {
logger.Info("sending update for", "service", svc.Node.ID)
s.SvcQueue.Add(svc.Node.ID)
}
Expand Down
26 changes: 3 additions & 23 deletions pkg/manifests/template/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gofrs/flock"
"github.com/pkg/errors"
console "github.com/pluralsh/console-client-go"
"github.com/pluralsh/polly/algorithms"
"github.com/pluralsh/polly/fs"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
Expand Down Expand Up @@ -157,7 +158,7 @@ func (h *helm) values(svc *console.GetServiceDeploymentForAgent_ServiceDeploymen
if err != nil {
return currentMap, err
}
currentMap = merge(currentMap, nextMap)
currentMap = algorithms.Merge(currentMap, nextMap)
}
}

Expand All @@ -166,28 +167,7 @@ func (h *helm) values(svc *console.GetServiceDeploymentForAgent_ServiceDeploymen
return currentMap, nil
}

return merge(currentMap, overrides), nil
}

func merge(m1, m2 map[string]interface{}) map[string]interface{} {
// lifted from helm's merge code
out := make(map[string]interface{}, len(m1))
for k, v := range m1 {
out[k] = v
}

for k, v := range m2 {
if v, ok := v.(map[string]interface{}); ok {
if bv, ok := out[k]; ok {
if bv, ok := bv.(map[string]interface{}); ok {
out[k] = merge(bv, v)
continue
}
}
}
out[k] = v
}
return out
return algorithms.Merge(currentMap, overrides), nil
}

func (h *helm) valuesFile(svc *console.GetServiceDeploymentForAgent_ServiceDeployment, filename string) (map[string]interface{}, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/manifests/template/raw_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package template

import (
"github.com/samber/lo"
"path/filepath"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
console "github.com/pluralsh/console-client-go"
"github.com/samber/lo"
)

var _ = Describe("Raw template", func() {
Expand Down

0 comments on commit 9e43ac7

Please sign in to comment.