Skip to content

Commit

Permalink
fix raw renderer and update error status
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz committed Oct 26, 2023
1 parent bd4ac24 commit 5bc0c6a
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 14 deletions.
74 changes: 62 additions & 12 deletions pkg/manifests/template/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package template
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand All @@ -11,9 +12,12 @@ import (
"github.com/osteele/liquid"
console "github.com/pluralsh/console-client-go"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/manifestreader"
"sigs.k8s.io/kustomize/kyaml/kio"
"sigs.k8s.io/kustomize/kyaml/kio/kioutil"
)

var (
Expand Down Expand Up @@ -54,6 +58,16 @@ func renderLiquid(input []byte, svc *console.ServiceDeploymentExtended) ([]byte,

func (r *raw) Render(svc *console.ServiceDeploymentExtended, utilFactory util.Factory) ([]*unstructured.Unstructured, error) {
res := make([]*unstructured.Unstructured, 0)
mapper, err := utilFactory.ToRESTMapper()
if err != nil {
return nil, err
}
readerOptions := ReaderOptions{
Mapper: mapper,
Namespace: svc.Namespace,
EnforceNamespace: true,
}

if err := filepath.Walk(r.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
Expand Down Expand Up @@ -81,22 +95,12 @@ func (r *raw) Render(svc *console.ServiceDeploymentExtended, utilFactory util.Fa

r := bytes.NewReader(rendered)

mapper, err := utilFactory.ToRESTMapper()
if err != nil {
return err
}

readerOptions := manifestreader.ReaderOptions{
Mapper: mapper,
Namespace: svc.Namespace,
EnforceNamespace: true,
}
mReader := &manifestreader.StreamManifestReader{
mReader := &StreamManifestReader{
ReaderName: "raw",
Reader: r,
ReaderOptions: readerOptions,
}
items, err := mReader.Read()
items, err := mReader.Read(res)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", rpath, err)
}
Expand All @@ -109,3 +113,49 @@ func (r *raw) Render(svc *console.ServiceDeploymentExtended, utilFactory util.Fa

return res, nil
}

// ReaderOptions defines the shared inputs for the different
// implementations of the ManifestReader interface.
type ReaderOptions struct {
Mapper meta.RESTMapper
Validate bool
Namespace string
EnforceNamespace bool
}

// StreamManifestReader reads manifest from the provided io.Reader
// and returns them as Info objects. The returned Infos will not have
// client or mapping set.
type StreamManifestReader struct {
ReaderName string
Reader io.Reader

ReaderOptions
}

// Read reads the manifests and returns them as Info objects.
func (r *StreamManifestReader) Read(objs []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
nodes, err := (&kio.ByteReader{
Reader: r.Reader,
}).Read()
if err != nil {
return objs, err
}

for _, n := range nodes {
err = manifestreader.RemoveAnnotations(n, kioutil.IndexAnnotation)
if err != nil {
return objs, err
}
u, err := manifestreader.KyamlNodeToUnstructured(n)
if err != nil {
return objs, err
}
objs = append(objs, u)
}

objs = manifestreader.FilterLocalConfig(objs)

err = manifestreader.SetNamespaces(r.Mapper, objs, r.Namespace, r.EnforceNamespace)
return objs, err
}
6 changes: 5 additions & 1 deletion pkg/sync/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (engine *Engine) workerLoop() {
err := engine.processItem(item)
if err != nil {
log.Error(err, "process item")
id := item.(string)
if id != "" {
engine.UpdateErrorStatus(id, err)
}
}
time.Sleep(syncDelay)
}
Expand Down Expand Up @@ -129,7 +133,7 @@ func (engine *Engine) processItem(item interface{}) error {
InventoryPolicy: inventory.PolicyAdoptIfNoInventory,
})

return engine.UpdateStatus(id, svc.Name, svc.Namespace, ch, false)
return engine.UpdateApplyStatus(id, svc.Name, svc.Namespace, ch, false)
}

func (engine *Engine) splitObjects(id string, objs []*unstructured.Unstructured) (*unstructured.Unstructured, []*unstructured.Unstructured, error) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/sync/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,13 @@ func (engine *Engine) UpdatePruneStatus(id, name, namespace string, ch <-chan ev
return nil
}

func (engine *Engine) UpdateStatus(id, name, namespace string, ch <-chan event.Event, printStatus bool) error {
func (engine *Engine) UpdateErrorStatus(id string, err error) {
if err := engine.updateStatus(id, []*console.ComponentAttributes{}, errorAttributes("sync", err)); err != nil {
log.Error(err, "Failed to update service status, ignoring for now")
}
}

func (engine *Engine) UpdateApplyStatus(id, name, namespace string, ch <-chan event.Event, printStatus bool) error {
var statsCollector stats.Stats
var err error
components := []*console.ComponentAttributes{}
Expand Down

0 comments on commit 5bc0c6a

Please sign in to comment.