Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: context #299

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions api/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package api

import (
"context"
"errors"
"fmt"

v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type ScrapeContext interface {
duty.DBContext

IsTrace() bool

WithContext(ctx context.Context) ScrapeContext

WithScrapeConfig(scraper *v1.ScrapeConfig) ScrapeContext
ScrapeConfig() *v1.ScrapeConfig

Namespace() string
Kubernetes() kubernetes.Interface
KubernetesRestConfig() *rest.Config

GetEnvVarValue(input types.EnvVar) (string, error)
GetEnvValueFromCache(env types.EnvVar) (string, error)

HydrateConnection(connectionIdentifier string) (*models.Connection, error)
}

type scrapeContext struct {
context.Context

db *gorm.DB
pool *pgxpool.Pool

namespace string
kubernetes *kubernetes.Clientset
kubernetesRestConfig *rest.Config

scrapeConfig *v1.ScrapeConfig
}

func NewScrapeContext(ctx context.Context, db *gorm.DB, pool *pgxpool.Pool) ScrapeContext {
return &scrapeContext{
Context: ctx,
namespace: Namespace,
kubernetes: KubernetesClient,
kubernetesRestConfig: KubernetesRestConfig,
db: db,
pool: pool,
}
}

func (ctx scrapeContext) WithContext(from context.Context) ScrapeContext {
ctx.Context = from
return &ctx
}

func (ctx scrapeContext) WithScrapeConfig(scraper *v1.ScrapeConfig) ScrapeContext {
ctx.scrapeConfig = scraper
return &ctx
}

func (ctx scrapeContext) DB() *gorm.DB {
return ctx.db
}

func (ctx scrapeContext) Pool() *pgxpool.Pool {
return ctx.pool
}

func (ctx scrapeContext) ScrapeConfig() *v1.ScrapeConfig {
return ctx.scrapeConfig
}

func (ctx scrapeContext) Namespace() string {
return ctx.namespace
}

func (c scrapeContext) Kubernetes() kubernetes.Interface {
return c.kubernetes
}

func (c scrapeContext) KubernetesRestConfig() *rest.Config {
return c.kubernetesRestConfig
}

func (ctx scrapeContext) IsTrace() bool {
return ctx.scrapeConfig.Spec.IsTrace()
}

func (ctx *scrapeContext) HydrateConnection(connectionName string) (*models.Connection, error) {
if connectionName == "" {
return nil, nil
}

if ctx.db == nil {
return nil, errors.New("db has not been initialized")
}

if ctx.kubernetes == nil {
return nil, errors.New("kubernetes clientset has not been initialized")
}

connection, err := duty.HydratedConnectionByURL(ctx, ctx.db, ctx.kubernetes, ctx.namespace, connectionName)
if err != nil {
return nil, err
}

// Connection name was explicitly provided but was not found.
// That's an error.
if connection == nil {
return nil, fmt.Errorf("connection %s not found", connectionName)
}

return connection, nil
}

func (c *scrapeContext) GetEnvVarValue(input types.EnvVar) (string, error) {
return duty.GetEnvValueFromCache(c.kubernetes, input, c.namespace)
}

func (ctx *scrapeContext) GetEnvValueFromCache(env types.EnvVar) (string, error) {
return duty.GetEnvValueFromCache(ctx.kubernetes, env, ctx.namespace)
}
24 changes: 9 additions & 15 deletions api/global.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package api

import (
"context"

v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var KubernetesClient *kubernetes.Clientset
var KubernetesRestConfig *rest.Config
var Namespace string
var (
KubernetesClient *kubernetes.Clientset
KubernetesRestConfig *rest.Config
Namespace string
DefaultContext ScrapeContext
)

func NewScrapeContext(ctx context.Context, scraper v1.ScrapeConfig) *v1.ScrapeContext {
return &v1.ScrapeContext{
Context: ctx,
ScrapeConfig: scraper,
Namespace: Namespace,
Kubernetes: KubernetesClient,
KubernetesRestConfig: KubernetesRestConfig,
DB: db.DefaultDB(),
}
type Scraper interface {
Scrape(ctx ScrapeContext) v1.ScrapeResults
CanScrape(config v1.ScraperSpec) bool
}
34 changes: 0 additions & 34 deletions api/v1/azure.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package v1

import (
"fmt"

"github.com/flanksource/duty"
"github.com/flanksource/duty/types"
)

Expand All @@ -25,34 +22,3 @@ type Azure struct {
ClientSecret types.EnvVar `yaml:"clientSecret,omitempty" json:"clientSecret,omitempty"`
TenantID string `yaml:"tenantID" json:"tenantID"`
}

// HydrateConnection populates the credentials in Azure from the connection name (if available)
// else it'll try to fetch the credentials from kubernetes secrets.
func (t *Azure) HydrateConnection(ctx *ScrapeContext) error {
if t.ConnectionName != "" {
connection, err := ctx.HydrateConnectionByURL(t.ConnectionName)
if err != nil {
return fmt.Errorf("could not hydrate connection: %w", err)
} else if connection == nil {
return fmt.Errorf("connection %s not found", t.ConnectionName)
}

t.ClientID.ValueStatic = connection.Username
t.ClientSecret.ValueStatic = connection.Password
t.TenantID = connection.Properties["tenant"]
return nil
}

var err error
t.ClientID.ValueStatic, err = duty.GetEnvValueFromCache(ctx.Kubernetes, t.ClientID, ctx.Namespace)
if err != nil {
return fmt.Errorf("failed to get client id: %w", err)
}

t.ClientSecret.ValueStatic, err = duty.GetEnvValueFromCache(ctx.Kubernetes, t.ClientSecret, ctx.Namespace)
if err != nil {
return fmt.Errorf("failed to get client secret: %w", err)
}

return nil
}
79 changes: 0 additions & 79 deletions api/v1/interface.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
package v1

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/models"
"gorm.io/gorm"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// Scraper ...
// +kubebuilder:object:generate=false
type Scraper interface {
Scrape(ctx *ScrapeContext) ScrapeResults
CanScrape(config ScraperSpec) bool
}

// Analyzer ...
// +kubebuilder:object:generate=false
type Analyzer func(configs []ScrapeResult) AnalysisResult
Expand Down Expand Up @@ -341,67 +326,3 @@ type RunNowResponse struct {
Failed int `json:"failed"`
Errors []string `json:"errors,omitempty"`
}

// ScrapeContext ...
// +kubebuilder:object:generate=false
type ScrapeContext struct {
context.Context
DB *gorm.DB
Namespace string
Kubernetes *kubernetes.Clientset
KubernetesRestConfig *rest.Config
ScrapeConfig ScrapeConfig
}

func (ctx ScrapeContext) Find(path string) ([]string, error) {
return filepath.Glob(path)
}

// Read returns the contents of a file, the base filename and an error
func (ctx ScrapeContext) Read(path string) ([]byte, string, error) {
content, err := os.ReadFile(path)
filename := filepath.Base(path)
return content, filename, err
}

// GetNamespace ...
func (ctx ScrapeContext) GetNamespace() string {
return ctx.Namespace
}

// IsTrace ...
func (ctx ScrapeContext) IsTrace() bool {
return ctx.ScrapeConfig.Spec.IsTrace()
}

// HydrateConnectionByURL ...
func (ctx *ScrapeContext) HydrateConnectionByURL(connectionName string) (*models.Connection, error) {
if connectionName == "" {
return nil, nil
}

if !strings.HasPrefix(connectionName, "connection://") {
return nil, fmt.Errorf("invalid connection name: [%s]", connectionName)
}

if ctx.DB == nil {
return nil, errors.New("db has not been initialized")
}

if ctx.Kubernetes == nil {
return nil, errors.New("kubernetes clientset has not been initialized")
}

connection, err := duty.HydratedConnectionByURL(ctx, ctx.DB, ctx.Kubernetes, ctx.Namespace, connectionName)
if err != nil {
return nil, err
}

// Connection name was explicitly provided but was not found.
// That's an error.
if connection == nil {
return nil, fmt.Errorf("connection %s not found", connectionName)
}

return connection, nil
}
4 changes: 4 additions & 0 deletions cmd/operator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"os"

Expand All @@ -13,6 +14,7 @@ import (
ctrlzap "sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
configsv1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/controllers"
"github.com/flanksource/config-db/db"
Expand All @@ -38,6 +40,8 @@ func init() {

func run(cmd *cobra.Command, args []string) {
db.MustInit()
api.DefaultContext = api.NewScrapeContext(context.Background(), db.DefaultDB(), db.Pool)

zapLogger := logger.GetZapLogger()
if zapLogger == nil {
logger.Fatalf("failed to get zap logger")
Expand Down
9 changes: 5 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ var Run = &cobra.Command{

if db.ConnectionString != "" {
db.MustInit()
api.DefaultContext = api.NewScrapeContext(context.Background(), db.DefaultDB(), db.Pool)
}

if db.ConnectionString == "" && outputDir == "" {
logger.Fatalf("skipping export: neither --output-dir nor --db is specified")
}

for _, scraperConfig := range scraperConfigs {
ctx := api.NewScrapeContext(context.Background(), scraperConfig)
for i := range scraperConfigs {
ctx := api.DefaultContext.WithScrapeConfig(&scraperConfigs[i])
if err := scrapeAndStore(ctx); err != nil {
logger.Errorf("error scraping config: (name=%s) %v", scraperConfig.Name, err)
logger.Errorf("error scraping config: (name=%s) %v", scraperConfigs[i].Name, err)
}
}
},
}

func scrapeAndStore(ctx *v1.ScrapeContext) error {
func scrapeAndStore(ctx api.ScrapeContext) error {
results, err := scrapers.Run(ctx)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var Serve = &cobra.Command{

func serve(configFiles []string) {
db.MustInit()
api.DefaultContext = api.NewScrapeContext(context.Background(), db.DefaultDB(), db.Pool)

e := echo.New()
// PostgREST needs to know how it is exposed to create the correct links
db.HTTPEndpoint = publicEndpoint + "/db"
Expand Down Expand Up @@ -101,7 +103,7 @@ func startScraperCron(configFiles []string) {
scrapers.AddToCron(_scraper)

fn := func() {
ctx := api.NewScrapeContext(context.Background(), _scraper)
ctx := api.DefaultContext.WithScrapeConfig(&_scraper)
if _, err := scrapers.RunScraper(ctx); err != nil {
logger.Errorf("Error running scraper(id=%s): %v", scraper.ID, err)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/scrapeconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ScrapeConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request

// Sync jobs if new scrape config is created
if changed || scrapeConfig.Generation == 1 {
ctx := api.NewScrapeContext(ctx, *scrapeConfig)
ctx := api.DefaultContext.WithScrapeConfig(scrapeConfig)
if _, err := scrapers.RunScraper(ctx); err != nil {
logger.Error(err, "failed to run scraper")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
Expand Down
Loading
Loading