Skip to content

Commit

Permalink
feat: add namespace agent (#157)
Browse files Browse the repository at this point in the history
* add namespace agent

* fix imports
  • Loading branch information
zreigz authored Apr 5, 2024
1 parent af0e979 commit aee91d2
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 9 deletions.
18 changes: 12 additions & 6 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"os"
"time"

"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/namespaces"
"github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates"
"github.com/pluralsh/deployment-operator/pkg/controller/restore"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
"github.com/samber/lo"
"golang.org/x/net/context"
"k8s.io/client-go/rest"

"github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/restore"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) {
Expand Down Expand Up @@ -63,6 +62,13 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient
Queue: rr.RestoreQueue,
})

ns := namespaces.NewNamespaceReconciler(mgr.GetClient(), k8sClient, r)
mgr.AddController(&controller.Controller{
Name: "Managed Namespace Controller",
Do: ns,
Queue: ns.NamespaceQueue,
})

if err := mgr.Start(); err != nil {
setupLog.Error(err, "unable to start controller manager")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/osteele/liquid v1.3.2
github.com/pkg/errors v0.9.1
github.com/pluralsh/console-client-go v0.1.8
github.com/pluralsh/console-client-go v0.1.11
github.com/pluralsh/controller-reconcile-helper v0.0.4
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
github.com/pluralsh/polly v0.1.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pluralsh/console-client-go v0.1.8 h1:/MLVzacjCuqbb8bGsZornvya7ubMt2jeNM0dxPJduQU=
github.com/pluralsh/console-client-go v0.1.8/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/console-client-go v0.1.11 h1:QqbLOtEBQtfj/7gg7mDCwYLX3F7wCAiol9T5zCVAKq4=
github.com/pluralsh/console-client-go v0.1.11/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E=
github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s=
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ type Client interface {
GetClusterGates(after *string, first *int64) (*console.PagedClusterGates, error)
UpdateGate(id string, attributes console.GateUpdateAttributes) error
UpsertConstraints(constraints []*console.PolicyConstraintAttributes) (*console.UpsertPolicyConstraints, error)
GetNamespace(id string) (*console.ManagedNamespaceFragment, error)
ListNamespaces(after *string, first *int64) (*console.ListClusterNamespaces_ClusterManagedNamespaces, error)
}
27 changes: 27 additions & 0 deletions pkg/client/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package client

import (
"fmt"

console "github.com/pluralsh/console-client-go"
)

func (c *client) GetNamespace(id string) (*console.ManagedNamespaceFragment, error) {
restore, err := c.consoleClient.GetNamespace(c.ctx, id)
if err != nil {
return nil, err
}

return restore.ManagedNamespace, nil
}

func (c *client) ListNamespaces(after *string, first *int64) (*console.ListClusterNamespaces_ClusterManagedNamespaces, error) {
resp, err := c.consoleClient.ListClusterNamespaces(c.ctx, after, first, nil, nil)
if err != nil {
return nil, err
}
if resp.ClusterManagedNamespaces == nil {
return nil, fmt.Errorf("the response from ListNamespaces is nil")
}
return resp.ClusterManagedNamespaces, nil
}
164 changes: 164 additions & 0 deletions pkg/controller/namespaces/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package namespaces

import (
"context"
"fmt"
"reflect"
"time"

console "github.com/pluralsh/console-client-go"
clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/websocket"
"github.com/pluralsh/polly/algorithms"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type NamespaceReconciler struct {
ConsoleClient client.Client
K8sClient ctrlclient.Client
NamespaceQueue workqueue.RateLimitingInterface
NamespaceCache *client.Cache[console.ManagedNamespaceFragment]
}

func NewNamespaceReconciler(consoleClient client.Client, k8sClient ctrlclient.Client, refresh time.Duration) *NamespaceReconciler {
return &NamespaceReconciler{
ConsoleClient: consoleClient,
K8sClient: k8sClient,
NamespaceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
NamespaceCache: client.NewCache[console.ManagedNamespaceFragment](refresh, func(id string) (*console.ManagedNamespaceFragment, error) {
return consoleClient.GetNamespace(id)
}),
}
}

func (n *NamespaceReconciler) GetPublisher() (string, websocket.Publisher) {
return "namespace.event", &socketPublisher{
restoreQueue: n.NamespaceQueue,
restoreCache: n.NamespaceCache,
}
}

func (n *NamespaceReconciler) WipeCache() {
n.NamespaceCache.Wipe()
}

func (n *NamespaceReconciler) ShutdownQueue() {
n.NamespaceQueue.ShutDown()
}

func (n *NamespaceReconciler) ListNamespaces(ctx context.Context) *algorithms.Pager[*console.ManagedNamespaceEdgeFragment] {
logger := log.FromContext(ctx)
logger.Info("create namespace pager")
fetch := func(page *string, size int64) ([]*console.ManagedNamespaceEdgeFragment, *algorithms.PageInfo, error) {
resp, err := n.ConsoleClient.ListNamespaces(page, &size)
if err != nil {
logger.Error(err, "failed to fetch namespaces")
return nil, nil, err
}
pageInfo := &algorithms.PageInfo{
HasNext: resp.PageInfo.HasNextPage,
After: resp.PageInfo.EndCursor,
PageSize: size,
}
return resp.Edges, pageInfo, nil
}
return algorithms.NewPager[*console.ManagedNamespaceEdgeFragment](controller.DefaultPageSize, fetch)
}

func (n *NamespaceReconciler) Poll(ctx context.Context) (done bool, err error) {
logger := log.FromContext(ctx)
logger.Info("fetching namespaces")
pager := n.ListNamespaces(ctx)

for pager.HasNext() {
namespaces, err := pager.NextPage()
if err != nil {
logger.Error(err, "failed to fetch namespace list")
return false, nil
}
for _, namespace := range namespaces {
logger.Info("sending update for", "namespace", namespace.Node.ID)
n.NamespaceQueue.Add(namespace.Node.ID)
}
}

return false, nil
}

func (n *NamespaceReconciler) Reconcile(ctx context.Context, id string) (reconcile.Result, error) {
logger := log.FromContext(ctx)
logger.Info("attempting to sync namespace", "id", id)
namespace, err := n.NamespaceCache.Get(id)
if err != nil {
if clienterrors.IsNotFound(err) {
logger.Info("namespace already deleted", "id", id)
return reconcile.Result{}, nil
}
logger.Error(err, fmt.Sprintf("failed to fetch namespace: %s, ignoring for now", id))
return reconcile.Result{}, err
}
logger.Info("upsert namespace", "name", namespace.Name)
if err = n.UpsertNamespace(ctx, namespace); err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}

func (n *NamespaceReconciler) UpsertNamespace(ctx context.Context, fragment *console.ManagedNamespaceFragment) error {
var labels map[string]string
var annotations map[string]string

if fragment.Labels != nil {
labels = convertMap(fragment.Labels)
}
if fragment.Annotations != nil {
annotations = convertMap(fragment.Annotations)
}
existing := &v1.Namespace{}
err := n.K8sClient.Get(ctx, ctrlclient.ObjectKey{Name: fragment.Name}, existing)
if err != nil {
if apierrors.IsNotFound(err) {
if err := n.K8sClient.Create(ctx, &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: fragment.Name,
Labels: labels,
Annotations: annotations,
},
}); err != nil {
return err
}
return nil
}
return err
}

if !reflect.DeepEqual(labels, existing.Labels) || !reflect.DeepEqual(annotations, existing.Annotations) {
existing.Labels = labels
existing.Annotations = annotations
if err := n.K8sClient.Update(ctx, existing); err != nil {
return err
}
}

return nil
}

func convertMap(in map[string]interface{}) map[string]string {
res := make(map[string]string)
for k, v := range in {
value, ok := v.(string)
if ok {
res[k] = value
}
}
return res
}
17 changes: 17 additions & 0 deletions pkg/controller/namespaces/socket_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package namespaces

import (
console "github.com/pluralsh/console-client-go"
"github.com/pluralsh/deployment-operator/pkg/client"
"k8s.io/client-go/util/workqueue"
)

type socketPublisher struct {
restoreQueue workqueue.RateLimitingInterface
restoreCache *client.Cache[console.ManagedNamespaceFragment]
}

func (sp *socketPublisher) Publish(id string) {
sp.restoreCache.Expire(id)
sp.restoreQueue.Add(id)
}

0 comments on commit aee91d2

Please sign in to comment.