diff --git a/.gitignore b/.gitignore index dc78265d..84021b67 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ - # Binaries for programs and plugins *.exe *.exe~ diff --git a/Makefile b/Makefile index a90f4be4..9b447c28 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,3 @@ - -# Image URL to use all building/pushing image targets BIN := ${PWD}/bin export PATH := $(BIN):$(PATH) @@ -14,7 +12,8 @@ KIND_CLUSTER ?= kind CI_MODE_ENABLED := "" NO_KIND_CLEANUP := "" -IMG ?= ghcr.io/kube-logging/telemetry-controller:0.0.11 +# Image URL to use all building/pushing image targets +IMG ?= controller:local # ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary. ENVTEST_K8S_VERSION = 1.28.0 @@ -80,8 +79,12 @@ fmt: ## Run go fmt against code. vet: ## Run go vet against code. go vet ./... +.PHONY: tidy +tidy: ## Tidy Go modules + find . -iname "go.mod" -not -path "./.devcontainer/*" | xargs -L1 sh -c 'cd $$(dirname $$0); go mod tidy' + .PHONY: test -test: manifests generate fmt vet envtest ## Run tests. +test: manifests generate fmt vet envtest ## Run verifications and tests. KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -v ./... -coverprofile cover.out GOLANGCI_LINT = $(shell pwd)/bin/golangci-lint @@ -93,13 +96,41 @@ golangci-lint: } .PHONY: lint -lint: golangci-lint ## Run golangci-lint linter & yamllint +lint: golangci-lint ## Run golangci-lint $(GOLANGCI_LINT) run .PHONY: lint-fix -lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes +lint-fix: golangci-lint ## Run golangci-lint and perform fixes $(GOLANGCI_LINT) run --fix +.PHONY: run-delve +run-delve: generate fmt vet manifests + go build -o bin/manager cmd/main.go + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./bin/manager + +.PHONY: e2e-test +e2e-test: ## Run e2e tests + cd e2e && export CI_MODE=$(CI_MODE_ENABLED) NO_KIND_CLEANUP=$(NO_KIND_CLEANUP) && $(TIMEOUT_CMD) --foreground 15m ./e2e_test.sh || (echo "E2E test failed"; exit 1) + +.PHONY: e2e-test-ci +e2e-test-ci: CI_MODE_ENABLED=1 +e2e-test-ci: NO_KIND_CLEANUP=1 +e2e-test-ci: IMG="controller:latest" ## Run e2e tests, telemetry collector runs inside k8s +e2e-test-ci: docker-build e2e-test + +.PHONY: check-diff +check-diff: generate + git diff --exit-code + +.PHONY: license-check +license-check: ${LICENSEI} .licensei.cache ## Run license check + ${LICENSEI} check + ${LICENSEI} header + +.PHONY: license-cache +license-cache: ${LICENSEI} ## Generate license cache + ${LICENSEI} cache + ##@ Build .PHONY: build @@ -107,7 +138,7 @@ build: manifests generate fmt vet ## Build manager binary. go build -o bin/manager cmd/main.go .PHONY: run -run: manifests generate fmt vet ## Run a controller from your host. +run: manifests generate fmt vet ## Run the controller from your host. go run ./cmd/main.go # If you wish to build the manager image targeting other platforms you can use the --platform flag. @@ -208,38 +239,6 @@ envtest: $(ENVTEST) crddir/github.com/open-telemetry/opentelemetry-operator ## D $(ENVTEST): $(LOCALBIN) test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest -.PHONY: run-delve -run-delve: generate fmt vet manifests - go build -o bin/manager cmd/main.go - dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./bin/manager - -.PHONY: tidy -tidy: ## Tidy Go modules - find . -iname "go.mod" -not -path "./.devcontainer/*" | xargs -L1 sh -c 'cd $$(dirname $$0); go mod tidy' - -.PHONY: e2e-test -e2e-test: ## Run e2e tests - cd e2e && export CI_MODE=$(CI_MODE_ENABLED) NO_KIND_CLEANUP=$(NO_KIND_CLEANUP) && $(TIMEOUT_CMD) --foreground 15m ./e2e_test.sh || (echo "E2E test failed"; exit 1) - -.PHONY: e2e-test-ci -e2e-test-ci: CI_MODE_ENABLED=1 -e2e-test-ci: NO_KIND_CLEANUP=1 -e2e-test-ci: IMG="controller:latest" ## Run e2e tests, telemetry collector runs inside k8s -e2e-test-ci: docker-build e2e-test - -.PHONY: check-diff -check-diff: generate - git diff --exit-code - -.PHONY: license-check -license-check: ${LICENSEI} .licensei.cache ## Run license check - ${LICENSEI} check - ${LICENSEI} header - -.PHONY: license-cache -license-cache: ${LICENSEI} ## Generate license cache - ${LICENSEI} cache - stern: | ${BIN} GOBIN=${BIN} go install github.com/stern/stern@latest diff --git a/PROJECT b/PROJECT index b6f49913..7c6b210e 100644 --- a/PROJECT +++ b/PROJECT @@ -11,7 +11,6 @@ repo: github.com/kube-logging/telemetry-controller resources: - api: crdVersion: v1 - namespaced: true controller: true domain: kube-logging.dev group: telemetry @@ -28,7 +27,6 @@ resources: version: v1alpha1 - api: crdVersion: v1 - namespaced: true domain: kube-logging.dev group: telemetry kind: Tenant @@ -42,4 +40,11 @@ resources: kind: Output path: github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + domain: kube-logging.dev + group: telemetry + kind: Bridge + path: github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1 + version: v1alpha1 version: "3" diff --git a/README.md b/README.md index 16fb15e8..34f90d04 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ Telemetry-controller can be configured using Custom Resources to set up an opini To get started with the Telemetry Controller, complete the following steps. Alternatively, see our [Telemetry Controller overview and quickstart blog post](https://axoflow.com/reinvent-kubernetes-logging-with-telemetry-controller/). ### Prerequisites + - go version v1.22+ - docker version 24+ - kubectl version v1.26+ @@ -32,6 +33,7 @@ minikube start --container-runtime=containerd ### Deployment steps for users Deploy latest telemetry-controller: + ```sh # Install telemetry-controller, and opentelemetry-operator as a sub-chart helm upgrade --install --wait --create-namespace --namespace telemetry-controller-system telemetry-controller oci://ghcr.io/kube-logging/helm-charts/telemetry-controller @@ -82,25 +84,30 @@ Open the UI at `localhost:5080`, navigate to the `Ingestion/OTEL Collector` tab, ![Openobserve auth](docs/assets/openobserve-auth.png) Paste this token to the example manifests: + ```sh sed -i '' -e "s/\/INSERT YOUR COPIED TOKEN HERE/" docs/examples/simple-demo/one_tenant_two_subscriptions.yaml ``` + ```sh # Deploy the pipeline definition kubectl apply -f docs/examples/simple-demo/one_tenant_two_subscriptions.yaml ``` -**Create a workload, which will generate logs for the pipeline:** +Create a workload, which will generate logs for the pipeline: + ```sh helm install --wait --create-namespace --namespace example-tenant-ns --generate-name oci://ghcr.io/kube-logging/helm-charts/log-generator ``` -**Open the Openobserve UI and inspect the generated log messages** +Open the Openobserve UI and inspect the generated log messages: Set up portforwarding for Openobserve UI + ```sh kubectl -n openobserve port-forward svc/openobserve 5080:5080 ``` + ![Openobserve logs](docs/assets/openobserve-logs.png) ### Sending logs to logging-operator (example) @@ -147,14 +154,13 @@ Apply the provided example resource for telemetry-controller: [telemetry-control kubectl apply -f telemetry-controller.yaml ``` - ## Contributing If you find this project useful, help us: - Support the development of this project and star this repo! :star: - Help new users with issues they may encounter :muscle: -- Send a pull request with your new features and bug fixes :rocket: +- Send a pull request with your new features and bug fixes :rocket: Please read the [Organisation's Code of Conduct](https://github.com/kube-logging/.github/blob/main/CODE_OF_CONDUCT.md)! @@ -162,16 +168,4 @@ Please read the [Organisation's Code of Conduct](https://github.com/kube-logging ## License -Copyright © 2024 Kube logging authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +The project is licensed under the [Apache 2.0 License](LICENSE). diff --git a/api/telemetry/v1alpha1/bridge_types.go b/api/telemetry/v1alpha1/bridge_types.go new file mode 100644 index 00000000..76c8d824 --- /dev/null +++ b/api/telemetry/v1alpha1/bridge_types.go @@ -0,0 +1,62 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// BridgeSpec defines the desired state of Bridge +type BridgeSpec struct { + SourceTenant string `json:"sourceTenant,omitempty"` + TargetTenant string `json:"targetTenant,omitempty"` + // The OTTL condition which must be satisfied in order to forward telemetry + // from the source tenant to the target tenant + OTTL string `json:"ottl,omitempty"` +} + +// BridgeStatus defines the observed state of Bridge +type BridgeStatus struct { + State State `json:"state,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:resource:scope=Cluster,categories=telemetry-all +//+kubebuilder:subresource:status +//+kubebuilder:printcolumn:name="Source Tenant",type=string,JSONPath=`.spec.sourceTenant` +//+kubebuilder:printcolumn:name="Target Tenant",type=string,JSONPath=`.spec.targetTenant` +//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` + +// Bridge is the Schema for the Bridges API +type Bridge struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec BridgeSpec `json:"spec,omitempty"` + Status BridgeStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// BridgeList contains a list of Bridge +type BridgeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Bridge `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Bridge{}, &BridgeList{}) +} diff --git a/api/telemetry/v1alpha1/common.go b/api/telemetry/v1alpha1/common.go index 24b27c65..2f902624 100644 --- a/api/telemetry/v1alpha1/common.go +++ b/api/telemetry/v1alpha1/common.go @@ -14,6 +14,13 @@ package v1alpha1 +type State string + +const ( + StateReady State = "ready" + StateFailed State = "failed" +) + type NamespacedName struct { Namespace string `json:"namespace"` Name string `json:"name"` diff --git a/api/telemetry/v1alpha1/output_types.go b/api/telemetry/v1alpha1/output_types.go index aca62b48..8b2224db 100644 --- a/api/telemetry/v1alpha1/output_types.go +++ b/api/telemetry/v1alpha1/output_types.go @@ -22,14 +22,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! -// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. - // OutputSpec defines the desired state of Output type OutputSpec struct { - // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster - // Important: Run "make" to regenerate code after modifying this file - OTLPGRPC *OTLPGRPC `json:"otlp,omitempty"` Fluentforward *Fluentforward `json:"fluentforward,omitempty"` OTLPHTTP *OTLPHTTP `json:"otlphttp,omitempty"` diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index 7cf7669b..80078fc1 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -29,12 +29,14 @@ type SubscriptionSpec struct { type SubscriptionStatus struct { Tenant string `json:"tenant,omitempty"` Outputs []NamespacedName `json:"outputs,omitempty"` + State State `json:"state,omitempty"` } // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="Tenant",type=string,JSONPath=`.status.tenant` // +kubebuilder:printcolumn:name="Outputs",type=string,JSONPath=`.status.outputs` +// +kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` // +kubebuilder:resource:categories=telemetry-all // Subscription is the Schema for the subscriptions API diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index d8eec6ab..8fa5280c 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -18,22 +18,44 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// Statement represents a single statement in a Transform processor +// ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor +type Statement struct { + // +kubebuilder:validation:Enum:=resource;scope;span;spanevent;metric;datapoint;log + Context string `json:"context,omitempty"` + Conditions []string `json:"conditions,omitempty"` + Statements []string `json:"statements,omitempty"` +} + +// Transform represents the Transform processor, which modifies telemetry based on its configuration +type Transform struct { + // Name of the Transform processor + Name string `json:"name,omitempty"` + + // +kubebuilder:validation:Enum:=ignore;silent;propagate + + // ErrorMode specifies how errors are handled while processing a statement + // vaid options are: ignore, silent, propagate; (default: propagate) + ErrorMode string `json:"errorMode,omitempty"` + + TraceStatements []Statement `json:"traceStatements,omitempty"` + MetricStatements []Statement `json:"metricStatements,omitempty"` + LogStatements []Statement `json:"logStatements,omitempty"` +} + // TenantSpec defines the desired state of Tenant type TenantSpec struct { SubscriptionNamespaceSelectors []metav1.LabelSelector `json:"subscriptionNamespaceSelectors,omitempty"` LogSourceNamespaceSelectors []metav1.LabelSelector `json:"logSourceNamespaceSelectors,omitempty"` + Transform Transform `json:"transform,omitempty"` } -const ( - StateReady = "ready" - StateFailed = "failed" -) - // TenantStatus defines the observed state of Tenant type TenantStatus struct { Subscriptions []NamespacedName `json:"subscriptions,omitempty"` LogSourceNamespaces []string `json:"logSourceNamespaces,omitempty"` - State string `json:"state,omitempty"` + ConnectedBridges []string `json:"connectedBridges,omitempty"` + State State `json:"state,omitempty"` } //+kubebuilder:object:root=true @@ -41,6 +63,7 @@ type TenantStatus struct { //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Subscriptions",type=string,JSONPath=`.status.subscriptions` //+kubebuilder:printcolumn:name="Logsource namespaces",type=string,JSONPath=`.status.logSourceNamespaces` +//+kubebuilder:printcolumn:name="Connected bridges",type=string,JSONPath=`.status.connectedBridges` //+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` // Tenant is the Schema for the tenants API diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index baa7a051..e7665846 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -97,6 +97,95 @@ func (in *BearerAuthConfig) DeepCopy() *BearerAuthConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Bridge) DeepCopyInto(out *Bridge) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Bridge. +func (in *Bridge) DeepCopy() *Bridge { + if in == nil { + return nil + } + out := new(Bridge) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Bridge) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BridgeList) DeepCopyInto(out *BridgeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Bridge, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BridgeList. +func (in *BridgeList) DeepCopy() *BridgeList { + if in == nil { + return nil + } + out := new(BridgeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *BridgeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BridgeSpec) DeepCopyInto(out *BridgeSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BridgeSpec. +func (in *BridgeSpec) DeepCopy() *BridgeSpec { + if in == nil { + return nil + } + out := new(BridgeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BridgeStatus) DeepCopyInto(out *BridgeStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BridgeStatus. +func (in *BridgeStatus) DeepCopy() *BridgeStatus { + if in == nil { + return nil + } + out := new(BridgeStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Collector) DeepCopyInto(out *Collector) { *out = *in @@ -555,6 +644,31 @@ func (in *QueueSettings) DeepCopy() *QueueSettings { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Statement) DeepCopyInto(out *Statement) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Statements != nil { + in, out := &in.Statements, &out.Statements + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Statement. +func (in *Statement) DeepCopy() *Statement { + if in == nil { + return nil + } + out := new(Statement) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Subscription) DeepCopyInto(out *Subscription) { *out = *in @@ -777,6 +891,7 @@ func (in *TenantSpec) DeepCopyInto(out *TenantSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + in.Transform.DeepCopyInto(&out.Transform) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TenantSpec. @@ -802,6 +917,11 @@ func (in *TenantStatus) DeepCopyInto(out *TenantStatus) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.ConnectedBridges != nil { + in, out := &in.ConnectedBridges, &out.ConnectedBridges + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TenantStatus. @@ -828,3 +948,39 @@ func (in *TimeoutSettings) DeepCopy() *TimeoutSettings { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Transform) DeepCopyInto(out *Transform) { + *out = *in + if in.TraceStatements != nil { + in, out := &in.TraceStatements, &out.TraceStatements + *out = make([]Statement, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.MetricStatements != nil { + in, out := &in.MetricStatements, &out.MetricStatements + *out = make([]Statement, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.LogStatements != nil { + in, out := &in.LogStatements, &out.LogStatements + *out = make([]Statement, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Transform. +func (in *Transform) DeepCopy() *Transform { + if in == nil { + return nil + } + out := new(Transform) + in.DeepCopyInto(out) + return out +} diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml new file mode 100644 index 00000000..7bd5356b --- /dev/null +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml @@ -0,0 +1,74 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: bridges.telemetry.kube-logging.dev +spec: + group: telemetry.kube-logging.dev + names: + categories: + - telemetry-all + kind: Bridge + listKind: BridgeList + plural: bridges + singular: bridge + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.sourceTenant + name: Source Tenant + type: string + - jsonPath: .spec.targetTenant + name: Target Tenant + type: string + - jsonPath: .status.state + name: State + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: Bridge is the Schema for the Bridges API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: BridgeSpec defines the desired state of Bridge + properties: + ottl: + description: |- + The OTTL condition which must be satisfied in order to forward telemetry + from the source tenant to the target tenant + type: string + sourceTenant: + type: string + targetTenant: + type: string + type: object + status: + description: BridgeStatus defines the observed state of Bridge + properties: + state: + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml index 046ec237..0d904f2f 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml @@ -23,6 +23,9 @@ spec: - jsonPath: .status.outputs name: Outputs type: string + - jsonPath: .status.state + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -80,6 +83,8 @@ spec: - namespace type: object type: array + state: + type: string tenant: type: string type: object diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml index 3316d03f..012f703d 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml @@ -23,6 +23,9 @@ spec: - jsonPath: .status.logSourceNamespaces name: Logsource namespaces type: string + - jsonPath: .status.connectedBridges + name: Connected bridges + type: string - jsonPath: .status.state name: State type: string @@ -153,10 +156,109 @@ spec: type: object x-kubernetes-map-type: atomic type: array + transform: + description: Transform represents the Transform processor, which modifies + telemetry based on its configuration + properties: + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + logStatements: + items: + description: |- + Statement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + metricStatements: + items: + description: |- + Statement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + name: + description: Name of the Transform processor + type: string + traceStatements: + items: + description: |- + Statement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + type: object type: object status: description: TenantStatus defines the observed state of Tenant properties: + connectedBridges: + items: + type: string + type: array logSourceNamespaces: items: type: string diff --git a/charts/telemetry-controller/templates/deployment.yaml b/charts/telemetry-controller/templates/deployment.yaml index de807604..1e9891e5 100644 --- a/charts/telemetry-controller/templates/deployment.yaml +++ b/charts/telemetry-controller/templates/deployment.yaml @@ -1,7 +1,7 @@ apiVersion: apps/v1 kind: Deployment metadata: - labels: + labels: {{ include "telemetry-controller.labels" . | indent 4 }} name: '{{ include "telemetry-controller.fullname" . }}' namespace: '{{ include "telemetry-controller.namespace" . }}' diff --git a/charts/telemetry-controller/templates/rbac.yaml b/charts/telemetry-controller/templates/rbac.yaml index 39e7dba0..c1464609 100644 --- a/charts/telemetry-controller/templates/rbac.yaml +++ b/charts/telemetry-controller/templates/rbac.yaml @@ -118,6 +118,7 @@ rules: - outputs - subscriptions - tenants + - bridges verbs: - create - delete @@ -139,6 +140,7 @@ rules: - outputs/status - subscriptions/status - tenants/status + - bridges/status verbs: - get - patch diff --git a/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml b/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml new file mode 100644 index 00000000..7bd5356b --- /dev/null +++ b/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml @@ -0,0 +1,74 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: bridges.telemetry.kube-logging.dev +spec: + group: telemetry.kube-logging.dev + names: + categories: + - telemetry-all + kind: Bridge + listKind: BridgeList + plural: bridges + singular: bridge + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.sourceTenant + name: Source Tenant + type: string + - jsonPath: .spec.targetTenant + name: Target Tenant + type: string + - jsonPath: .status.state + name: State + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: Bridge is the Schema for the Bridges API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: BridgeSpec defines the desired state of Bridge + properties: + ottl: + description: |- + The OTTL condition which must be satisfied in order to forward telemetry + from the source tenant to the target tenant + type: string + sourceTenant: + type: string + targetTenant: + type: string + type: object + status: + description: BridgeStatus defines the observed state of Bridge + properties: + state: + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index 046ec237..0d904f2f 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -23,6 +23,9 @@ spec: - jsonPath: .status.outputs name: Outputs type: string + - jsonPath: .status.state + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -80,6 +83,8 @@ spec: - namespace type: object type: array + state: + type: string tenant: type: string type: object diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index 3316d03f..012f703d 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -23,6 +23,9 @@ spec: - jsonPath: .status.logSourceNamespaces name: Logsource namespaces type: string + - jsonPath: .status.connectedBridges + name: Connected bridges + type: string - jsonPath: .status.state name: State type: string @@ -153,10 +156,109 @@ spec: type: object x-kubernetes-map-type: atomic type: array + transform: + description: Transform represents the Transform processor, which modifies + telemetry based on its configuration + properties: + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + logStatements: + items: + description: |- + Statement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + metricStatements: + items: + description: |- + Statement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + name: + description: Name of the Transform processor + type: string + traceStatements: + items: + description: |- + Statement represents a single statement in a Transform processor + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor + properties: + conditions: + items: + type: string + type: array + context: + enum: + - resource + - scope + - span + - spanevent + - metric + - datapoint + - log + type: string + statements: + items: + type: string + type: array + type: object + type: array + type: object type: object status: description: TenantStatus defines the observed state of Tenant properties: + connectedBridges: + items: + type: string + type: array logSourceNamespaces: items: type: string diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 42cacb5b..cab75664 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -6,23 +6,26 @@ resources: - bases/telemetry.kube-logging.dev_subscriptions.yaml - bases/telemetry.kube-logging.dev_tenants.yaml - bases/telemetry.kube-logging.dev_outputs.yaml +- bases/telemetry.kube-logging.dev_bridges.yaml #+kubebuilder:scaffold:crdkustomizeresource patches: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD -#- path: patches/webhook_in_collectors.yaml +#- path: patches/webhook_in_telemetry_collectors.yaml #- path: patches/webhook_in_telemetry_subscriptions.yaml #- path: patches/webhook_in_telemetry_tenants.yaml #- path: patches/webhook_in_telemetry_outputs.yaml +#- path: patches/webhook_in_telemetry_bridges.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD -#- path: patches/cainjection_in_collectors.yaml +#- path: patches/cainjection_in_telemetry_collectors.yaml #- path: patches/cainjection_in_telemetry_subscriptions.yaml #- path: patches/cainjection_in_telemetry_tenants.yaml #- path: patches/cainjection_in_telemetry_outputs.yaml +#- path: patches/cainjection_in_telemetry_bridges.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # [WEBHOOK] To enable webhook, uncomment the following section diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 9055e3ad..42b62124 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -86,6 +86,7 @@ rules: - apiGroups: - telemetry.kube-logging.dev resources: + - bridges - collectors - outputs - subscriptions @@ -101,12 +102,7 @@ rules: - apiGroups: - telemetry.kube-logging.dev resources: - - collectors/finalizers - verbs: - - update -- apiGroups: - - telemetry.kube-logging.dev - resources: + - bridges/status - collectors/status - outputs/status - subscriptions/status @@ -115,3 +111,9 @@ rules: - get - patch - update +- apiGroups: + - telemetry.kube-logging.dev + resources: + - collectors/finalizers + verbs: + - update diff --git a/config/rbac/telemetry_bridge_editor_role.yaml b/config/rbac/telemetry_bridge_editor_role.yaml new file mode 100644 index 00000000..0419ef17 --- /dev/null +++ b/config/rbac/telemetry_bridge_editor_role.yaml @@ -0,0 +1,31 @@ +# permissions for end users to edit tenants. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: bridge-editor-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: telemetry-controller + app.kubernetes.io/part-of: telemetry-controller + app.kubernetes.io/managed-by: kustomize + name: bridge-editor-role +rules: + - apiGroups: + - telemetry.kube-logging.dev + resources: + - bridges + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - telemetry.kube-logging.dev + resources: + - bridges/status + verbs: + - get diff --git a/config/rbac/telemetry_bridge_viewer_role.yaml b/config/rbac/telemetry_bridge_viewer_role.yaml new file mode 100644 index 00000000..e4250c72 --- /dev/null +++ b/config/rbac/telemetry_bridge_viewer_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to view tenants. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: bridge-viewer-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: telemetry-controller + app.kubernetes.io/part-of: telemetry-controller + app.kubernetes.io/managed-by: kustomize + name: bridge-viewer-role +rules: + - apiGroups: + - telemetry.kube-logging.dev + resources: + - bridges + verbs: + - get + - list + - watch + - apiGroups: + - telemetry.kube-logging.dev + resources: + - bridges/status + verbs: + - get diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index fc2a569a..2985513d 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -4,4 +4,5 @@ resources: - telemetry_v1alpha1_subscription.yaml - telemetry_v1alpha1_tenant.yaml - telemetry_v1alpha1_output.yaml +- telemetry_v1alpha1_bridge.yaml #+kubebuilder:scaffold:manifestskustomizesamples diff --git a/config/samples/telemetry_v1alpha1_bridge.yaml b/config/samples/telemetry_v1alpha1_bridge.yaml new file mode 100644 index 00000000..9bf42a8b --- /dev/null +++ b/config/samples/telemetry_v1alpha1_bridge.yaml @@ -0,0 +1,14 @@ +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Bridge +metadata: + labels: + app.kubernetes.io/name: bridge + app.kubernetes.io/instance: bridge-sample + app.kubernetes.io/part-of: telemetry-controller + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: telemetry-controller + name: bridge-sample +spec: + sourceTenant: shared + destinationTenant: tenantA + ottl: 'route() where attributes["parsed"]["method"] == "GET"' diff --git a/config/samples/telemetry_v1alpha1_collector.yaml b/config/samples/telemetry_v1alpha1_collector.yaml index f2e6d524..26f2374b 100644 --- a/config/samples/telemetry_v1alpha1_collector.yaml +++ b/config/samples/telemetry_v1alpha1_collector.yaml @@ -9,6 +9,7 @@ metadata: app.kubernetes.io/created-by: telemetry-controller name: collector-sample spec: + controlNamespace: collector tenantSelector: matchLabels: tenant: tenantA diff --git a/config/samples/telemetry_v1alpha1_tenant.yaml b/config/samples/telemetry_v1alpha1_tenant.yaml index de0a3083..ab1050a0 100644 --- a/config/samples/telemetry_v1alpha1_tenant.yaml +++ b/config/samples/telemetry_v1alpha1_tenant.yaml @@ -13,3 +13,6 @@ spec: subscriptionNamespaceSelector: matchLabels: nsTenant: tenantA + logSourceNamespaceSelector: + matchLabels: + nsTenant: tenantA diff --git a/docs/demos/openobserve/demo.yaml b/docs/demos/openobserve/demo.yaml index 162073dd..23cb08ec 100644 --- a/docs/demos/openobserve/demo.yaml +++ b/docs/demos/openobserve/demo.yaml @@ -65,7 +65,7 @@ spec: otlp: endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 headers: - # echo -n username:org_pwd | base64 + # echo -n username:org_pwd | base64 Authorization: "Basic cm9vdEBleGFtcGxlLmNvbTpkREN6Z213eVVkMTlmVzZs" organization: default stream-name: default diff --git a/docs/examples/simple-demo-with-secretref/receiver.yaml b/docs/examples/simple-demo-with-secretref/receiver.yaml index 6aa25748..075ab8f8 100644 --- a/docs/examples/simple-demo-with-secretref/receiver.yaml +++ b/docs/examples/simple-demo-with-secretref/receiver.yaml @@ -16,13 +16,13 @@ spec: config: extensions: basicauth: - htpasswd: + htpasswd: inline: | user:pass receivers: otlp: protocols: - grpc: + grpc: auth: authenticator: basicauth tls: @@ -38,7 +38,7 @@ spec: level: "debug" extensions: [basicauth] pipelines: - logs: + logs: processors: [] receivers: [otlp] exporters: [debug] diff --git a/docs/examples/tenant-to-tenant-routing/README.md b/docs/examples/tenant-to-tenant-routing/README.md new file mode 100644 index 00000000..6b4a1860 --- /dev/null +++ b/docs/examples/tenant-to-tenant-routing/README.md @@ -0,0 +1,107 @@ +# Tenant-to-Tenant Log Routing + +This guide demonstrates how to implement cross-tenant log routing using the `Telemetry Controller` in a Kubernetes cluster. You'll learn how to route specific logs from a shared tenant to other tenants based on HTTP verbs. + +## Prerequisites + +- `kubectl` installed and configured +- One of the following: + - `kind` + - `minikube` with containerd runtime +- `helm` + +## Architecture Overview + +This setup implements the following architecture: + +- Three distinct tenants: `shared`, `database`, and `web` +- Each tenant collects logs from its corresponding namespace +- The logs from the shared tenant will get routed following these rules: + - HTTP `GET` requests → `database` tenant + - HTTP `PUT` requests → `web` tenant +- All tenants forward logs to a central `OpenObserve` instance + +## Step-by-Step Installation + +### 1. Create a local cluster + +You can create a Kubernetes cluster using either `kind` or `minikube`. + +```bash +kind create cluster +# OR +minikube start --container-runtime=containerd +``` + +### 2. Set Up namespaces + +Create the required namespaces: + +```bash +kubectl apply -f namespaces.yaml +``` + +### 3. Deploy OpenObserve + +Deploy OpenObserve for log aggregation: + +```bash +# Deploy the core OpenObserve components +kubectl apply -f https://raw.githubusercontent.com/zinclabs/openobserve/main/deploy/k8s/statefulset.yaml + +# Deploy the OpenObserve service +kubectl apply -f openobserve-svc.yaml +``` + +### 4. Create resources + +**Before proceeding, make sure to update the authentication tokens in the `pipeline.yaml` file.** + +_NOTE: You can check out this [example](../../../README.md#example-setup) on how to retrieve your token._ + +Apply the pipeline configuration: + +```bash +kubectl apply -f pipeline.yaml +``` + +This creates: + +- A Collector that will collect logs. +- Three tenant configurations (`shared`, `database`, `web`) +- Tenant-specific log collection rules via the `transform-processor` +- Routing achieved via the [Bridge](../../../config/crd/bases/telemetry.kube-logging.dev_bridges.yaml) custom-resource, based on HTTP verbs. + +### 5. Deploy the log generator + +Deploy the log generator in the shared namespace using Helm: + +```bash +helm upgrade --install --wait log-generator oci://ghcr.io/kube-logging/helm-charts/log-generator -n shared +``` + +### 6. Access the Logs + +Set up port-forwarding to access OpenObserve: + +```bash +kubectl port-forward -n openobserve svc/openobserve 8080:5080 > /dev/null & +``` + +### 7. Verification + +Access the logs by navigating to the OpenObersve UI: + + + +Every log message that had the `GET` verb in them was routed to the `database` tenant, while the ones that had `PUT` got routed to `web`. + +## Cleanup + +Simply teardown the cluster: + +```bash +kind delete cluster +# OR +minikube delete +``` diff --git a/docs/examples/tenant-to-tenant-routing/namespaces.yaml b/docs/examples/tenant-to-tenant-routing/namespaces.yaml new file mode 100644 index 00000000..e73dbe06 --- /dev/null +++ b/docs/examples/tenant-to-tenant-routing/namespaces.yaml @@ -0,0 +1,30 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: collector +--- +apiVersion: v1 +kind: Namespace +metadata: + name: openobserve +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + tenant: shared + name: shared +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + tenant: database + name: database +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + tenant: web + name: web diff --git a/docs/examples/tenant-to-tenant-routing/openobserve-svc.yaml b/docs/examples/tenant-to-tenant-routing/openobserve-svc.yaml new file mode 100644 index 00000000..076385a8 --- /dev/null +++ b/docs/examples/tenant-to-tenant-routing/openobserve-svc.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: openobserve-otlp-grpc + namespace: openobserve +spec: + clusterIP: None + selector: + app: openobserve + ports: + - name: otlp-grpc + port: 5081 + targetPort: 5081 diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml new file mode 100644 index 00000000..e89a36b0 --- /dev/null +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -0,0 +1,155 @@ +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Collector +metadata: + name: cluster +spec: + controlNamespace: collector + tenantSelector: + matchLabels: + collector: cluster +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collector: cluster + name: shared +spec: + transform: + name: parse-nginx + logStatements: + - context: log + statements: + - set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))")) + logSourceNamespaceSelectors: + - matchLabels: + tenant: shared + subscriptionNamespaceSelectors: + - matchLabels: + tenant: shared +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: shared + namespace: shared +spec: + ottl: route() + outputs: + - name: openobserve-shared + namespace: shared +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: openobserve-shared + namespace: shared +spec: + otlp: + endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 + headers: + Authorization: "Basic " + organization: default + stream-name: shared + tls: + insecure: true +--- +# A tenant that consumes logs from the shared tenant using a bridge +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collector: cluster + name: database +spec: + logSourceNamespaceSelectors: + - matchLabels: + tenant: database + subscriptionNamespaceSelectors: + - matchLabels: + tenant: database +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Bridge +metadata: + name: shared-database +spec: + sourceTenant: shared + targetTenant: database + ottl: 'route() where attributes["parsed"]["method"] == "GET"' +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: database + namespace: database +spec: + ottl: route() + outputs: + - name: openobserve-database + namespace: database +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: openobserve-database + namespace: database +spec: + otlp: + endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 + headers: + Authorization: "Basic " + organization: default + stream-name: db + tls: + insecure: true +--- +# Another tenant that consumes logs from the shared tenant using a bridge +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collector: cluster + name: web +spec: + logSourceNamespaceSelectors: + - matchLabels: + tenant: web + subscriptionNamespaceSelectors: + - matchLabels: + tenant: web +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Bridge +metadata: + name: shared-web +spec: + sourceTenant: shared + targetTenant: web + ottl: 'route() where attributes["parsed"]["method"] == "PUT"' +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: web + namespace: web +spec: + ottl: route() + outputs: + - name: openobserve-web + namespace: web +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: openobserve-web + namespace: web +spec: + otlp: + endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 + headers: + Authorization: "Basic " + organization: default + stream-name: web + tls: + insecure: true diff --git a/e2e/e2e_test.sh b/e2e/e2e_test.sh index bfc00d4a..5476eee5 100755 --- a/e2e/e2e_test.sh +++ b/e2e/e2e_test.sh @@ -54,7 +54,6 @@ do true; done echo "E2E (helm) test: PASSED" - # Check if cluster should be removed, ctx restored if [[ -z "${NO_KIND_CLEANUP}" ]]; then kind delete cluster --name "${KIND_CLUSTER_NAME}" diff --git a/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml b/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml index b2e78d17..5a374cbe 100644 --- a/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml +++ b/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml @@ -15,6 +15,6 @@ spec: verbosity: detailed service: pipelines: - logs: + logs: receivers: [otlp] exporters: [debug] diff --git a/go.mod b/go.mod index 43b99106..3c915362 100644 --- a/go.mod +++ b/go.mod @@ -7,101 +7,86 @@ toolchain go1.22.3 require ( emperror.dev/errors v0.8.1 github.com/cisco-open/operator-tools v0.34.0 + github.com/google/go-cmp v0.6.0 github.com/onsi/ginkgo/v2 v2.17.3 github.com/onsi/gomega v1.33.1 github.com/open-telemetry/opentelemetry-operator v0.103.0 + github.com/prometheus/client_golang v1.19.1 github.com/siliconbrain/go-mapseqs v0.2.0 github.com/siliconbrain/go-seqs v0.12.0 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/config/configauth v0.103.0 + go.opentelemetry.io/collector/config/configcompression v1.7.0 + go.opentelemetry.io/collector/config/configopaque v1.16.0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 + k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 k8s.io/client-go v0.30.2 sigs.k8s.io/controller-runtime v0.18.4 + sigs.k8s.io/yaml v1.4.0 ) require ( - github.com/go-viper/mapstructure/v2 v2.1.0 - go.opentelemetry.io/collector/config/configauth v0.103.0 - go.opentelemetry.io/collector/config/configopaque v1.16.0 -) - -require ( - github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/briandowns/spinner v1.23.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cisco-open/k8s-objectmatcher v1.9.0 // indirect github.com/cppforlife/go-patch v0.2.0 // indirect - github.com/evanphx/json-patch v5.9.0+incompatible // indirect - github.com/fatih/color v1.16.0 // indirect - github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-task/slim-sprig/v3 v3.0.0 // indirect - github.com/iancoleman/orderedmap v0.3.0 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect - github.com/nxadm/tail v1.4.8 // indirect - github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect - github.com/spf13/cast v1.6.0 // indirect - github.com/tg123/go-htpasswd v1.2.2 // indirect - github.com/wayneashleyberry/terminal-dimensions v1.1.0 // indirect - go.opentelemetry.io/collector/client v1.16.0 // indirect - go.opentelemetry.io/collector/component v0.110.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.110.0 // indirect - go.opentelemetry.io/collector/extension v0.110.0 // indirect - go.opentelemetry.io/collector/extension/auth v0.110.0 // indirect - go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect - go.opentelemetry.io/collector/pdata v1.16.0 // indirect - go.opentelemetry.io/collector/pipeline v0.110.0 // indirect - go.opentelemetry.io/otel v1.30.0 // indirect - go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect - go.opentelemetry.io/otel/metric v1.30.0 // indirect - go.opentelemetry.io/otel/sdk v1.30.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect - go.opentelemetry.io/otel/trace v1.30.0 // indirect - golang.org/x/crypto v0.26.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect - google.golang.org/grpc v1.66.2 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect - -) - -require ( - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.0 // indirect + github.com/evanphx/json-patch v5.9.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect + github.com/fatih/color v1.16.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect - github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/iancoleman/orderedmap v0.3.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.110.0 - github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.110.0 + github.com/nxadm/tail v1.4.8 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.19.1 + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/collector/config/configcompression v1.7.0 + github.com/wayneashleyberry/terminal-dimensions v1.1.0 // indirect + go.opentelemetry.io/collector/component v0.110.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.110.0 // indirect + go.opentelemetry.io/collector/extension v0.110.0 // indirect + go.opentelemetry.io/collector/extension/auth v0.110.0 // indirect + go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect + go.opentelemetry.io/collector/pdata v1.16.0 // indirect + go.opentelemetry.io/collector/pipeline v0.110.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect + go.opentelemetry.io/otel/metric v1.30.0 // indirect + go.opentelemetry.io/otel/sdk v1.30.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect + go.opentelemetry.io/otel/trace v1.30.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.25.0 // indirect @@ -110,15 +95,16 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.22.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect + google.golang.org/grpc v1.66.2 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - k8s.io/api v0.30.2 + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.30.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240322212309-b815d8309940 // indirect k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index 1805f1da..97484ae4 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ emperror.dev/errors v0.8.1 h1:UavXZ5cSX/4u9iyvH6aDcuGkVjeexUGJ7Ij7G4VfQT0= emperror.dev/errors v0.8.1/go.mod h1:YcRvLPh626Ubn2xqtoprejnA5nFha+TJ+2vew48kWuE= -github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 h1:KeNholpO2xKjgaaSyd+DyQRrsQjhbSeS7qe4nEw8aQw= -github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962/go.mod h1:kC29dT1vFpj7py2OvG1khBdQpo3kInWP+6QipLbdngo= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/briandowns/spinner v1.23.0 h1:alDF2guRWqa/FOZZYWjlMIx2L6H0wyewPxo/CH4Pt2A= @@ -48,8 +46,6 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= -github.com/go-viper/mapstructure/v2 v2.1.0 h1:gHnMa2Y/pIxElCH2GlZZ1lZSsn6XMtufpGyP1XxdC/w= -github.com/go-viper/mapstructure/v2 v2.1.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -78,12 +74,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= -github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= -github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= -github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= -github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= -github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -95,10 +85,6 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= -github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= -github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= -github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -114,10 +100,6 @@ github.com/onsi/ginkgo/v2 v2.17.3 h1:oJcvKpIb7/8uLpDDtnQuf18xVnwKp8DTD7DQ6gTd/MU github.com/onsi/ginkgo/v2 v2.17.3/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= -github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.110.0 h1:OzdaJriUgOT2iyFG4f9pjQdgruYO2P77fw5Z4tjm5iQ= -github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.110.0/go.mod h1:y7jCFpCEEN6HRyeu6tJQtZZkw6q/ZwBa6JTpm//T9NE= -github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.110.0 h1:QfCtczND32VwLrw80qbLcMIPeCMYdsPX6a/MGek0cbc= -github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.110.0/go.mod h1:l1SQz/z61b8I+EQOOK0ZET8qHyTidBbaFWKEnwtTOB4= github.com/open-telemetry/opentelemetry-operator v0.103.0 h1:L0REMuJSMZjqCw7p7fWMn19XkiIULMr3NnHdPLryMQs= github.com/open-telemetry/opentelemetry-operator v0.103.0/go.mod h1:kf5B7DLm4m88avApWmHhBjn66fQfSABM2cuQfHqAR+Y= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= @@ -149,14 +131,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tg123/go-htpasswd v1.2.2 h1:tmNccDsQ+wYsoRfiONzIhDm5OkVHQzN3w4FOBAlN6BY= -github.com/tg123/go-htpasswd v1.2.2/go.mod h1:FcIrK0J+6zptgVwK1JDlqyajW/1B4PtuJ/FLWl7nx8A= github.com/wayneashleyberry/terminal-dimensions v1.1.0 h1:EB7cIzBdsOzAgmhTUtTTQXBByuPheP/Zv1zL2BRPY6g= github.com/wayneashleyberry/terminal-dimensions v1.1.0/go.mod h1:2lc/0eWCObmhRczn2SdGSQtgBooLUzIotkkEGXqghyg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/collector/client v1.16.0 h1:3m7HzWR41+4M8r9q7UvEDjMBTFMrik/BdvAuwCs0cV4= -go.opentelemetry.io/collector/client v1.16.0/go.mod h1:N01Alc/CouREl18hmxdaQXGfID7dBZrrbsBar7WHkZ0= go.opentelemetry.io/collector/component v0.110.0 h1:z7uSY/1dcK+vTY2z3v0XxeCoi2wqgHTow/ds3Gozuz4= go.opentelemetry.io/collector/component v0.110.0/go.mod h1:W99gZdfGtQ5Zg6Bhrwrcl/uZcCG+2qBnZ1z2JO5WCW0= go.opentelemetry.io/collector/config/configauth v0.103.0 h1:tv2Ilj0X9T8ZsDd4mB8Sl+nXQ8CG8MJVQ1Lo4mmE0Pk= @@ -167,10 +145,6 @@ go.opentelemetry.io/collector/config/configopaque v1.16.0 h1:83cVlPL151kHWrHLUAk go.opentelemetry.io/collector/config/configopaque v1.16.0/go.mod h1:6zlLIyOoRpJJ+0bEKrlZOZon3rOp5Jrz9fMdR4twOS4= go.opentelemetry.io/collector/config/configtelemetry v0.110.0 h1:V8Y/Xv7TJpnNGHLrheRKrMydcKBxWYAZ+dj71Kllyos= go.opentelemetry.io/collector/config/configtelemetry v0.110.0/go.mod h1:R0MBUxjSMVMIhljuDHWIygzzJWQyZHXXWIgQNxcFwhc= -go.opentelemetry.io/collector/confmap v1.16.0 h1:0bWw/XSosX6xoE1sGsaD3glzRtSxanrF4sgib3jAYr4= -go.opentelemetry.io/collector/confmap v1.16.0/go.mod h1:GrIZ12P/9DPOuTpe2PIS51a0P/ZM6iKtByVee1Uf3+k= -go.opentelemetry.io/collector/consumer v0.110.0 h1:CnB83KSFQxhFAbNJwTM0blahg16xa6CyUnIIA5qPMbA= -go.opentelemetry.io/collector/consumer v0.110.0/go.mod h1:WlzhfwDfwKkEa5XFdN5x9+jjp9ZF5EUSmtOgVe69zm0= go.opentelemetry.io/collector/extension v0.110.0 h1:AYFk57W25f7xOo3I6pV0rWNWVtOLZsW+lzFCctnvCkU= go.opentelemetry.io/collector/extension v0.110.0/go.mod h1:zD/pw9o83SFyn/DCbBdBcH0eUPyGtYgpMSAOqotFYRc= go.opentelemetry.io/collector/extension/auth v0.110.0 h1:9SHC2sF/KR99LciHABDXRIsXLiujzIjTJpWHO0V8Bqg= @@ -204,8 +178,6 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index f448c205..83075aff 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -40,10 +40,7 @@ import ( "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" ) -const ( - tenantReferenceField = ".status.tenant" - requeueDelayOnFailedTenant = 20 * time.Second -) +const requeueDelayOnFailedTenant = 20 * time.Second // CollectorReconciler reconciles a Collector object type CollectorReconciler struct { @@ -118,9 +115,16 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, } } + bridges, err := r.getBridges(ctx, client.ListOptions{}) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing bridges") + return OtelColConfigInput{}, err + } + otelConfigInput := OtelColConfigInput{ Tenants: tenants, Subscriptions: subscriptions, + Bridges: bridges, OutputsWithSecretData: outputs, TenantSubscriptionMap: tenantSubscriptionMap, SubscriptionOutputMap: subscriptionOutputMap, @@ -156,8 +160,8 @@ func (r *CollectorReconciler) populateSecretForOutput(ctx context.Context, queri return nil } -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;outputs;,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;outputs/status;,verbs=get;update;patch +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;outputs;bridges;,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;outputs/status;bridges/status;,verbs=get;update;patch // +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=secrets;nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete @@ -320,6 +324,22 @@ func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { return })). + Watches(&v1alpha1.Bridge{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + logger := log.FromContext(ctx) + + collectors := &v1alpha1.CollectorList{} + err := r.List(ctx, collectors) + if err != nil { + logger.Error(errors.WithStack(err), "failed listing collectors for mapping requests, unable to send requests") + return + } + + for _, collector := range collectors.Items { + requests = addCollectorRequest(requests, collector.Name) + } + + return + })). Watches(&v1alpha1.Output{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { logger := log.FromContext(ctx) @@ -491,6 +511,15 @@ func (r *CollectorReconciler) getTenantsMatchingSelectors(ctx context.Context, l return tenantsForSelector.Items, nil } +func (r *CollectorReconciler) getBridges(ctx context.Context, listOpts client.ListOptions) ([]v1alpha1.Bridge, error) { + var bridges v1alpha1.BridgeList + if err := r.Client.List(ctx, &bridges, &listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + return bridges.Items, nil +} + func normalizeStringSlice(inputList []string) []string { allKeys := make(map[string]bool) uniqueList := []string{} diff --git a/internal/controller/telemetry/envtest_testdata/config.yaml b/internal/controller/telemetry/envtest_testdata/config.yaml index 360fe4fb..db91d50a 100644 --- a/internal/controller/telemetry/envtest_testdata/config.yaml +++ b/internal/controller/telemetry/envtest_testdata/config.yaml @@ -138,4 +138,4 @@ service: logs/tenant_tenant-2_subscription_subscription-example-2: receivers: [routing/tenant_tenant-2_subscriptions] processors: [attributes/subscription_subscription-example-2] - exporters: [otlp/collector_otlp-test-output-2] \ No newline at end of file + exporters: [otlp/collector_otlp-test-output-2] diff --git a/internal/controller/telemetry/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen.go index bf6d7fe9..a05659a2 100644 --- a/internal/controller/telemetry/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen.go @@ -44,6 +44,7 @@ type OtelColConfigInput struct { // These must only include resources that are selected by the collector, tenant labelselectors, and listed outputs in the subscriptions Tenants []v1alpha1.Tenant Subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription + Bridges []v1alpha1.Bridge OutputsWithSecretData []OutputWithSecretData MemoryLimiter v1alpha1.MemoryLimiter @@ -203,12 +204,24 @@ type ResourceProcessorAction struct { FromContext string `json:"from_context,omitempty"` } +type ErrorMode string + +const ( + ErrorModeIgnore ErrorMode = "ignore" + ErrorModeSilent ErrorMode = "silent" + ErrorModePropagate ErrorMode = "propagate" +) + type TransformProcessor struct { - LogStatements []Statement `json:"log_statements,omitempty"` + ErrorMode ErrorMode `json:"error_mode,omitempty"` + TraceStatements []Statement `json:"trace_statements,omitempty"` + MetricStatements []Statement `json:"metric_statements,omitempty"` + LogStatements []Statement `json:"log_statements,omitempty"` } type Statement struct { Context string `json:"context"` + Conditions []string `json:"conditions"` Statements []string `json:"statements"` } @@ -433,6 +446,20 @@ func (cfgInput *OtelColConfigInput) generateRoutingConnectorForSubscriptionsOutp return rc } +func (cfgInput *OtelColConfigInput) generateRoutingConnectorForBridge(bridge v1alpha1.Bridge) RoutingConnector { + rcName := fmt.Sprintf("routing/bridge_%s", bridge.Name) + rc := newRoutingConnector(rcName) + + tableItem := RoutingConnectorTableItem{ + Statement: bridge.Spec.OTTL, + Pipelines: []string{fmt.Sprintf("logs/tenant_%s", bridge.Spec.TargetTenant)}, + } + + rc.AddRoutingConnectorTableElem(tableItem) + + return rc +} + func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { connectors := make(map[string]any) @@ -449,6 +476,11 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { connectors[rc.Name] = rc } + for _, bridge := range cfgInput.Bridges { + rc := cfgInput.generateRoutingConnectorForBridge(bridge) + connectors[rc.Name] = rc + } + return connectors } @@ -557,6 +589,11 @@ func (cfgInput *OtelColConfigInput) generateProcessors() map[string]any { for _, tenant := range cfgInput.Tenants { processors[fmt.Sprintf("attributes/tenant_%s", tenant.Name)] = generateTenantAttributeProcessor(tenant) + + // Add a transform processor if the tenant has one + if tenant.Spec.Transform.Name != "" { + processors[fmt.Sprintf("transform/%s", tenant.Spec.Transform.Name)] = generateTransformProcessorForTenant(tenant) + } } for _, subscription := range cfgInput.Subscriptions { @@ -645,6 +682,27 @@ func generateSubscriptionAttributeProcessor(subscription v1alpha1.Subscription) return processor } +func generateTransformProcessorForTenant(tenant v1alpha1.Tenant) TransformProcessor { + return TransformProcessor{ + ErrorMode: ErrorMode(tenant.Spec.Transform.ErrorMode), + TraceStatements: convertAPIStatements(tenant.Spec.Transform.TraceStatements), + MetricStatements: convertAPIStatements(tenant.Spec.Transform.MetricStatements), + LogStatements: convertAPIStatements(tenant.Spec.Transform.LogStatements), + } +} + +func convertAPIStatements(APIStatements []v1alpha1.Statement) []Statement { + statements := make([]Statement, len(APIStatements)) + for i, statement := range APIStatements { + statements[i] = Statement{ + Context: statement.Context, + Conditions: statement.Conditions, + Statements: statement.Statements, + } + } + return statements +} + func generateRootPipeline(tenantName string) *otelv1beta1.Pipeline { tenantCountConnectorName := "count/tenant_metrics" receiverName := fmt.Sprintf("filelog/%s", tenantName) @@ -672,6 +730,9 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b tenantRoutingName := fmt.Sprintf("routing/tenant_%s_subscriptions", tenant) namedPipelines[tenantRootPipeline] = generateRootPipeline(tenant) + cfgInput.addBridgeConnectorToTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges) + cfgInput.addTransformProcessorToTenantPipeline(tenant, namedPipelines[tenantRootPipeline]) + // Generate pipelines for the subscriptions for the tenant for _, subscription := range cfgInput.TenantSubscriptionMap[tenant] { tenantSubscriptionPipelineName := fmt.Sprintf("%s_subscription_%s_%s", tenantRootPipeline, subscription.Namespace, subscription.Name) @@ -735,6 +796,40 @@ func generateMetricsPipelines() map[string]*otelv1beta1.Pipeline { return metricsPipelines } +func (cfgInput *OtelColConfigInput) addTransformProcessorToTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline) { + for _, tenant := range cfgInput.Tenants { + if tenant.Name == tenantName && tenant.Spec.Transform.Name != "" { + pipeline.Processors = append(pipeline.Processors, fmt.Sprintf("transform/%s", tenant.Spec.Transform.Name)) + } + } +} + +func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (needsReceiver bool, needsExporter bool, bridgeName string) { + if bridge.Spec.SourceTenant == tenantName { + needsExporter = true + } + if bridge.Spec.TargetTenant == tenantName { + needsReceiver = true + } + bridgeName = bridge.Name + + return +} + +func (cfgInput *OtelColConfigInput) addBridgeConnectorToTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline, bridges []v1alpha1.Bridge) { + for _, bridge := range bridges { + needsReceiver, needsExporter, bridgeName := checkBridgeConnectorForTenant(tenantName, bridge) + + if needsReceiver { + pipeline.Receivers = append(pipeline.Receivers, fmt.Sprintf("routing/bridge_%s", bridgeName)) + } + + if needsExporter { + pipeline.Exporters = append(pipeline.Exporters, fmt.Sprintf("routing/bridge_%s", bridgeName)) + } + } +} + func (cfgInput *OtelColConfigInput) generateDefaultKubernetesProcessor() map[string]any { type Source struct { Name string `json:"name,omitempty"` diff --git a/internal/controller/telemetry/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen_test.go index e82b3a0f..2a7c2ab8 100644 --- a/internal/controller/telemetry/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen_test.go @@ -21,15 +21,14 @@ import ( "testing" "time" - "sigs.k8s.io/yaml" - "github.com/google/go-cmp/cmp" + otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/siliconbrain/go-mapseqs/mapseqs" "github.com/siliconbrain/go-seqs/seqs" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" ) @@ -199,7 +198,7 @@ func TestOtelColConfComplex(t *testing.T) { }, Authentication: &v1alpha1.OutputAuth{ BearerAuth: &v1alpha1.BearerAuthConfig{ - SecretRef: &v1.SecretReference{ + SecretRef: &corev1.SecretReference{ Name: "bearer-test-secret", Namespace: "collector", }, @@ -460,3 +459,215 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te }) } } + +func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { + tests := []struct { + name string + cfgInput OtelColConfigInput + expectedPipelines map[string]*otelv1beta1.Pipeline + }{ + { + name: "Single tenant with no subscriptions", + cfgInput: OtelColConfigInput{ + TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ + "tenant1": { + { + Namespace: "ns1", + Name: "sub1", + }, + }, + }, + SubscriptionOutputMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ + { + Namespace: "ns1", + Name: "sub1", + }: {}, + }, + Bridges: nil, + OutputsWithSecretData: nil, + Debug: false, + }, + expectedPipelines: map[string]*otelv1beta1.Pipeline{ + "logs/tenant_tenant1": generateRootPipeline("tenant1"), + "logs/tenant_tenant1_subscription_ns1_sub1": generatePipeline( + []string{"routing/tenant_tenant1_subscriptions"}, + []string{"attributes/subscription_sub1"}, + []string{"routing/subscription_ns1_sub1_outputs"}, + ), + "metrics/output": generatePipeline( + []string{"count/output_metrics"}, + []string{"deltatocumulative", "attributes/metricattributes"}, + []string{"prometheus/message_metrics_exporter"}, + ), + "metrics/tenant": generatePipeline( + []string{"count/tenant_metrics"}, + []string{"deltatocumulative", "attributes/metricattributes"}, + []string{"prometheus/message_metrics_exporter"}, + ), + }, + }, + { + name: "Two tenants each with a subscription with a bridge", + cfgInput: OtelColConfigInput{ + Tenants: []v1alpha1.Tenant{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tenant1", + }, + Spec: v1alpha1.TenantSpec{ + Transform: v1alpha1.Transform{ + Name: "transform1", + LogStatements: []v1alpha1.Statement{ + { + Statements: []string{`set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))"))`}, + }, + }, + }, + }, + Status: v1alpha1.TenantStatus{ + LogSourceNamespaces: []string{"ns1"}, + Subscriptions: []v1alpha1.NamespacedName{ + { + Namespace: "ns1", + Name: "sub1", + }, + }, + State: v1alpha1.StateReady, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tenant2", + }, + Spec: v1alpha1.TenantSpec{}, + Status: v1alpha1.TenantStatus{ + LogSourceNamespaces: []string{"ns2"}, + Subscriptions: []v1alpha1.NamespacedName{ + { + Namespace: "ns2", + Name: "sub2", + }, + }, + State: v1alpha1.StateReady, + }, + }, + }, + Subscriptions: map[v1alpha1.NamespacedName]v1alpha1.Subscription{ + { + Namespace: "ns1", + Name: "sub1", + }: { + ObjectMeta: metav1.ObjectMeta{ + Name: "sub1", + Namespace: "ns1", + }, + Spec: v1alpha1.SubscriptionSpec{ + OTTL: "route()", + Outputs: []v1alpha1.NamespacedName{}, + }, + }, + { + Namespace: "ns2", + Name: "sub2", + }: { + ObjectMeta: metav1.ObjectMeta{ + Name: "sub2", + Namespace: "ns2", + }, + Spec: v1alpha1.SubscriptionSpec{ + OTTL: "route()", + Outputs: []v1alpha1.NamespacedName{}, + }, + }, + }, + TenantSubscriptionMap: map[string][]v1alpha1.NamespacedName{ + "tenant1": { + { + Namespace: "ns1", + Name: "sub1", + }, + }, + "tenant2": { + { + Namespace: "ns2", + Name: "sub2", + }, + }, + }, + SubscriptionOutputMap: map[v1alpha1.NamespacedName][]v1alpha1.NamespacedName{ + { + Namespace: "ns1", + Name: "sub1", + }: { + { + Namespace: "ns1", + Name: "output1", + }, + }, + { + Namespace: "ns2", + Name: "sub2", + }: { + { + Namespace: "ns2", + Name: "output2", + }, + }, + }, + Bridges: []v1alpha1.Bridge{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge1", + }, + Spec: v1alpha1.BridgeSpec{ + SourceTenant: "tenant1", + TargetTenant: "tenant2", + OTTL: "route()", + }, + }, + }, + OutputsWithSecretData: nil, + Debug: false, + }, + expectedPipelines: map[string]*otelv1beta1.Pipeline{ + "logs/tenant_tenant1": generatePipeline( + []string{"filelog/tenant1"}, + []string{"k8sattributes", "attributes/tenant_tenant1", "transform/transform1"}, + []string{"routing/tenant_tenant1_subscriptions", "count/tenant_metrics", "routing/bridge_bridge1"}, + ), + "logs/tenant_tenant1_subscription_ns1_sub1": generatePipeline( + []string{"routing/tenant_tenant1_subscriptions"}, + []string{"attributes/subscription_sub1"}, + []string{"routing/subscription_ns1_sub1_outputs"}, + ), + "logs/tenant_tenant2": generatePipeline( + []string{"filelog/tenant2", "routing/bridge_bridge1"}, + []string{"k8sattributes", "attributes/tenant_tenant2"}, + []string{"routing/tenant_tenant2_subscriptions", "count/tenant_metrics"}, + ), + "logs/tenant_tenant2_subscription_ns2_sub2": generatePipeline( + []string{"routing/tenant_tenant2_subscriptions"}, + []string{"attributes/subscription_sub2"}, + []string{"routing/subscription_ns2_sub2_outputs"}, + ), + "metrics/output": generatePipeline( + []string{"count/output_metrics"}, + []string{"deltatocumulative", "attributes/metricattributes"}, + []string{"prometheus/message_metrics_exporter"}, + ), + "metrics/tenant": generatePipeline( + []string{"count/tenant_metrics"}, + []string{"deltatocumulative", "attributes/metricattributes"}, + []string{"prometheus/message_metrics_exporter"}, + ), + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(ttp.name, func(t *testing.T) { + assert.Equal(t, ttp.cfgInput.generateNamedPipelines(), ttp.expectedPipelines) + }) + } +} diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go index d11f6417..ab2c2648 100644 --- a/internal/controller/telemetry/route_controller.go +++ b/internal/controller/telemetry/route_controller.go @@ -36,14 +36,23 @@ import ( "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" ) -// CollectorReconciler reconciles a Collector object +const ( + subscriptionTenantReferenceField = ".status.tenant" + bridgeSourceTenantReferenceField = ".spec.sourceTenant" + bridgeTargetTenantReferenceField = ".spec.targetTenant" + tenantNameField = ".metadata.name" +) + +// RouteReconciler is responsible for reconciling Tenant resources +// It also watches for changes to Subscriptions, Outputs, and Namespaces +// to trigger the appropriate reconciliation logic when related resources change. type RouteReconciler struct { client.Client Scheme *runtime.Scheme } -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;outputs;,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;outputs/status;,verbs=get;update;patch +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors;tenants;subscriptions;outputs;bridges;,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/status;tenants/status;subscriptions/status;outputs/status;bridges/status;,verbs=get;update;patch // +kubebuilder:rbac:groups=telemetry.kube-logging.dev,resources=collectors/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=nodes;namespaces;endpoints;nodes/proxy,verbs=get;list;watch // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings;roles;rolebindings,verbs=get;list;watch;create;update;patch;delete @@ -55,7 +64,6 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl logger := log.FromContext(ctx) tenant := &v1alpha1.Tenant{} - logger.Info(fmt.Sprintf("getting tenant: %q", req.NamespacedName.Name)) if err := r.Get(ctx, req.NamespacedName, tenant); client.IgnoreNotFound(err) != nil { @@ -64,13 +72,11 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, err } - logger.Info(fmt.Sprintf("reconciling tenant: %q", tenant.Name)) - originalTenantStatus := tenant.Status + logger.Info(fmt.Sprintf("reconciling tenant: %q", tenant.Name)) subscriptionsForTenant, updateList, err := r.getSubscriptionsForTenant(ctx, tenant) if err != nil { - tenant.Status.State = v1alpha1.StateFailed logger.Error(errors.WithStack(err), "failed to get subscriptions for tenant", "tenant", tenant.Name) if updateErr := r.Status().Update(ctx, tenant); updateErr != nil { @@ -82,17 +88,13 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // add all newly updated subscriptions here subscriptionsForTenant = append(subscriptionsForTenant, r.updateSubscriptionsForTenant(ctx, tenant.Name, updateList)...) - subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, tenant, subscriptionsForTenant) - r.disownSubscriptions(ctx, subscriptionsToDisown) subscriptionNames := getSubscriptionNamesFromSubscription(subscriptionsForTenant) - cmp := func(a, b v1alpha1.NamespacedName) int { return strings.Compare(a.String(), b.String()) } - slices.SortFunc(subscriptionNames, cmp) tenant.Status.Subscriptions = subscriptionNames @@ -108,6 +110,13 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } } + if len(validOutputs) == 0 { + subscription.Status.State = v1alpha1.StateFailed + logger.Error(errors.WithStack(errors.New("no valid outputs for subscription")), "no valid outputs for subscription", "subscription", subscription.NamespacedName().String()) + } else { + subscription.Status.State = v1alpha1.StateReady + } + subscription.Status.Outputs = validOutputs if !reflect.DeepEqual(originalSubscriptionStatus, subscription.Status) { @@ -118,6 +127,34 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } } + bridgesForTenant, err := r.getBridgesForTenant(ctx, tenant.Name) + if err != nil { + tenant.Status.State = v1alpha1.StateFailed + logger.Error(errors.WithStack(err), "failed to get bridges for tenant", "tenant", tenant.Name) + if updateErr := r.Status().Update(ctx, tenant); updateErr != nil { + logger.Error(errors.WithStack(updateErr), "failed update tenant status", "tenant", tenant.Name) + return ctrl.Result{}, err + } + return ctrl.Result{}, err + } + tenant.Status.ConnectedBridges = getBridgeNamesFromBridges(bridgesForTenant) + + for _, bridge := range bridgesForTenant { + originalBridgeStatus := bridge.Status.DeepCopy() + if err := r.checkBridgeConnections(ctx, &bridge); err != nil { + bridge.Status.State = v1alpha1.StateFailed + logger.Error(errors.WithStack(err), "failed bridge connection verification", "bridge", bridge.Name) + } + + bridge.Status.State = v1alpha1.StateReady + if !reflect.DeepEqual(originalBridgeStatus, bridge.Status) { + if updateErr := r.Status().Update(ctx, &bridge); updateErr != nil { + logger.Error(errors.WithStack(updateErr), "failed update bridge status", "bridge", bridge.Name) + return ctrl.Result{}, err + } + } + } + logsourceNamespacesForTenant, err := r.getLogsourceNamespaceNamesForTenant(ctx, tenant) if err != nil { tenant.Status.State = v1alpha1.StateFailed @@ -128,12 +165,10 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } return ctrl.Result{}, err } - slices.Sort(logsourceNamespacesForTenant) tenant.Status.LogSourceNamespaces = logsourceNamespacesForTenant tenant.Status.State = v1alpha1.StateReady - if !reflect.DeepEqual(originalTenantStatus, tenant.Status) { logger.Info("tenant status changed") if err := r.Status().Update(ctx, tenant); err != nil { @@ -148,21 +183,50 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // SetupWithManager sets up the controller with the Manager. func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, tenantReferenceField, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, subscriptionTenantReferenceField, func(rawObj client.Object) []string { subscription := rawObj.(*v1alpha1.Subscription) if subscription.Status.Tenant == "" { return nil } + return []string{subscription.Status.Tenant} }); err != nil { return err } + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Bridge{}, bridgeSourceTenantReferenceField, func(rawObj client.Object) []string { + bridge := rawObj.(*v1alpha1.Bridge) + if bridge.Spec.SourceTenant == "" { + return nil + } + + return []string{bridge.Spec.SourceTenant} + }); err != nil { + return err + } + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Bridge{}, bridgeTargetTenantReferenceField, func(rawObj client.Object) []string { + bridge := rawObj.(*v1alpha1.Bridge) + if bridge.Spec.TargetTenant == "" { + return nil + } + + return []string{bridge.Spec.TargetTenant} + }); err != nil { + return err + } + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Tenant{}, tenantNameField, func(rawObj client.Object) []string { + tenant := rawObj.(*v1alpha1.Tenant) + return []string{tenant.Name} + }); err != nil { + return err + } + addTenantRequest := func(requests []reconcile.Request, tenant string) []reconcile.Request { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Name: tenant, }, }) + return requests } @@ -172,8 +236,7 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { logger := log.FromContext(ctx) tenants := &v1alpha1.TenantList{} - err := r.List(ctx, tenants) - if err != nil { + if err := r.List(ctx, tenants); err != nil { logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") return } @@ -188,8 +251,22 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { logger := log.FromContext(ctx) tenants := &v1alpha1.TenantList{} - err := r.List(ctx, tenants) - if err != nil { + if err := r.List(ctx, tenants); err != nil { + logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") + return + } + + for _, tenant := range tenants.Items { + requests = addTenantRequest(requests, tenant.Name) + } + + return + })). + Watches(&v1alpha1.Bridge{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + logger := log.FromContext(ctx) + + tenants := &v1alpha1.TenantList{} + if err := r.List(ctx, tenants); err != nil { logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") return } @@ -204,8 +281,7 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { logger := log.FromContext(ctx) tenants := &v1alpha1.TenantList{} - err := r.List(ctx, tenants) - if err != nil { + if err := r.List(ctx, tenants); err != nil { logger.Error(errors.WithStack(err), "failed listing tenants for mapping requests, unable to send requests") return } @@ -223,19 +299,16 @@ func (r *RouteReconciler) getSubscriptionsForTenant(ctx context.Context, tenant logger := log.FromContext(ctx) namespaces, err := r.getNamespacesForSelectorSlice(ctx, tenant.Spec.SubscriptionNamespaceSelectors) - if err != nil { return nil, nil, err } var selectedSubscriptions []v1alpha1.Subscription - for _, ns := range namespaces { var subscriptionsForNS v1alpha1.SubscriptionList listOpts := &client.ListOptions{ Namespace: ns.Name, } - if err := r.List(ctx, &subscriptionsForNS, listOpts); client.IgnoreNotFound(err) != nil { return nil, nil, err } @@ -259,23 +332,19 @@ func (r *RouteReconciler) getSubscriptionsForTenant(ctx context.Context, tenant return } -func (r *RouteReconciler) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { +func (r *RouteReconciler) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { var namespaces []apiv1.Namespace - for _, ls := range labelSelectors { - var namespacesForSelector apiv1.NamespaceList - selector, err := metav1.LabelSelectorAsSelector(&ls) - if err != nil { return nil, err } + var namespacesForSelector apiv1.NamespaceList listOpts := &client.ListOptions{ LabelSelector: selector, } - if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { return nil, err } @@ -292,10 +361,11 @@ func (r *RouteReconciler) getNamespacesForSelectorSlice(ctx context.Context, lab // this is by design so that we don't fail the whole reconciliation when a single subscription update fails func (r *RouteReconciler) disownSubscriptions(ctx context.Context, subscriptionsToDisown []v1alpha1.Subscription) { logger := log.FromContext(ctx) + for _, subscription := range subscriptionsToDisown { subscription.Status.Tenant = "" - err := r.Client.Status().Update(ctx, &subscription) - if err != nil { + + if err := r.Client.Status().Update(ctx, &subscription); err != nil { logger.Error(err, fmt.Sprintf("failed to detach subscription %s/%s from collector", subscription.Namespace, subscription.Name)) } else { logger.Info("disowning subscription", "subscription", fmt.Sprintf("%s/%s", subscription.Namespace, subscription.Name)) @@ -307,48 +377,44 @@ func (r *RouteReconciler) disownSubscriptions(ctx context.Context, subscriptions // this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single subscription func (r *RouteReconciler) updateSubscriptionsForTenant(ctx context.Context, tenantName string, subscriptions []v1alpha1.Subscription) (updatedSubscriptions []v1alpha1.Subscription) { logger := log.FromContext(ctx, "tenant", tenantName) + for _, subscription := range subscriptions { subscription.Status.Tenant = tenantName - logger.Info("updating subscription status for tenant ownership") - err := r.Status().Update(ctx, &subscription) - if err != nil { + + if err := r.Status().Update(ctx, &subscription); err != nil { logger.Error(err, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", subscription.Namespace, subscription.Name, tenantName)) } else { updatedSubscriptions = append(updatedSubscriptions, subscription) } } + return } func (r *RouteReconciler) getSubscriptionsReferencingTenantButNotSelected(ctx context.Context, tenant *v1alpha1.Tenant, selectedSubscriptions []v1alpha1.Subscription) []v1alpha1.Subscription { logger := log.FromContext(ctx) + var subscriptionsReferencing v1alpha1.SubscriptionList listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(tenantReferenceField, tenant.Name), + FieldSelector: fields.OneTermEqualSelector(subscriptionTenantReferenceField, tenant.Name), } - if err := r.Client.List(ctx, &subscriptionsReferencing, listOpts); client.IgnoreNotFound(err) != nil { logger.Error(err, "failed to list subscriptions that need to be detached from tenant") return nil } var subscriptionsToDisown []v1alpha1.Subscription - for _, subscriptionReferencing := range subscriptionsReferencing.Items { - idx := slices.IndexFunc(selectedSubscriptions, func(selected v1alpha1.Subscription) bool { return reflect.DeepEqual(subscriptionReferencing.NamespacedName(), selected.NamespacedName()) }) - if idx == -1 { subscriptionsToDisown = append(subscriptionsToDisown, subscriptionReferencing) } - } return subscriptionsToDisown - } func (r *RouteReconciler) getLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { @@ -358,13 +424,82 @@ func (r *RouteReconciler) getLogsourceNamespaceNamesForTenant(ctx context.Contex } namespaceNames := make([]string, len(namespaces)) - for i, namespace := range namespaces { namespaceNames[i] = namespace.Name } return namespaceNames, nil +} + +func (r *RouteReconciler) getBridges(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Bridge, error) { + var bridges v1alpha1.BridgeList + if err := r.Client.List(ctx, &bridges, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + return bridges.Items, nil +} + +func (r *RouteReconciler) getBridgesForTenant(ctx context.Context, tenantName string) (bridgesOwned []v1alpha1.Bridge, err error) { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(bridgeSourceTenantReferenceField, tenantName), + } + sourceBridge, err := r.getBridges(ctx, listOpts) + if err != nil { + return nil, err + } + + listOpts = &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(bridgeTargetTenantReferenceField, tenantName), + } + targetBridge, err := r.getBridges(ctx, listOpts) + if err != nil { + return nil, err + } + + bridges := append(sourceBridge, targetBridge...) + for _, bridge := range bridges { + if bridge.Spec.SourceTenant == tenantName || bridge.Spec.TargetTenant == tenantName { + bridgesOwned = append(bridgesOwned, bridge) + } + } + + return +} + +func (r *RouteReconciler) getTenants(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Tenant, error) { + var tenants v1alpha1.TenantList + if err := r.Client.List(ctx, &tenants, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + return tenants.Items, nil +} + +func (r *RouteReconciler) checkBridgeConnections(ctx context.Context, bridge *v1alpha1.Bridge) error { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(tenantNameField, bridge.Spec.SourceTenant), + } + sourceTenant, err := r.getTenants(ctx, listOpts) + if err != nil { + return err + } + if len(sourceTenant) != 1 && sourceTenant[0].Name != bridge.Spec.SourceTenant { + return errors.Errorf("bridge (%s) has invalid source tenant", bridge.Name) + } + listOpts = &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(tenantNameField, bridge.Spec.TargetTenant), + } + targetTenant, err := r.getTenants(ctx, listOpts) + if err != nil { + return err + } + if len(targetTenant) != 1 && targetTenant[0].Name != bridge.Spec.TargetTenant { + return errors.Errorf("bridge (%s) has invalid target tenant", bridge.Name) + } + + return nil } func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { @@ -380,8 +515,8 @@ func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { cmp := func(a, b apiv1.Namespace) int { return strings.Compare(a.Name, b.Name) } - slices.SortFunc(uniqueList, cmp) + return uniqueList } @@ -393,3 +528,12 @@ func getSubscriptionNamesFromSubscription(subscriptions []v1alpha1.Subscription) return subscriptionNames } + +func getBridgeNamesFromBridges(bridges []v1alpha1.Bridge) []string { + bridgeNames := make([]string, len(bridges)) + for i, bridge := range bridges { + bridgeNames[i] = bridge.Name + } + + return bridgeNames +}