diff --git a/cmd/root.go b/cmd/root.go index 430e8cb2..5e5b229a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -12,7 +12,6 @@ import ( "time" "github.com/getsentry/sentry-go" - "github.com/google/uuid" "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" "github.com/overmindtech/aws-source/proc" @@ -48,20 +47,14 @@ var rootCmd = &cobra.Command{ panic(err) } }() - - var err error - // Get srcman supplied config natsServers := viper.GetStringSlice("nats-servers") natsJWT := viper.GetString("nats-jwt") natsNKeySeed := viper.GetString("nats-nkey-seed") - maxParallel := viper.GetInt("max-parallel") apiKey := viper.GetString("api-key") app := viper.GetString("app") healthCheckPort := viper.GetInt("health-check-port") natsConnectionName := viper.GetString("nats-connection-name") - sourceName := viper.GetString("source-name") - sourceUUIDString := viper.GetString("source-uuid") awsAuthConfig := proc.AwsAuthConfig{ Strategy: viper.GetString("aws-access-strategy"), @@ -73,7 +66,7 @@ var rootCmd = &cobra.Command{ AutoConfig: viper.GetBool("auto-config"), } - err = viper.UnmarshalKey("aws-regions", &awsAuthConfig.Regions) + err := viper.UnmarshalKey("aws-regions", &awsAuthConfig.Regions) if err != nil { log.WithError(err).Fatal("Could not parse aws-regions") } @@ -83,12 +76,17 @@ var rootCmd = &cobra.Command{ natsNKeySeedLog = "[REDACTED]" } + engineConfig, err := discovery.EngineConfigFromViper() + if err != nil { + log.WithError(err).Fatal("Could not create engine config") + } + log.WithFields(log.Fields{ "nats-servers": natsServers, "nats-jwt": natsJWT, "nats-nkey-seed": natsNKeySeedLog, "nats-connection-name": natsConnectionName, - "max-parallel": maxParallel, + "max-parallel": engineConfig.MaxParallelExecutions, "aws-regions": awsAuthConfig.Regions, "aws-access-strategy": awsAuthConfig.Strategy, "aws-external-id": awsAuthConfig.ExternalID, @@ -97,21 +95,10 @@ var rootCmd = &cobra.Command{ "auto-config": awsAuthConfig.AutoConfig, "health-check-port": healthCheckPort, "app": app, - "source-name": sourceName, - "source-uuid": sourceUUIDString, + "source-name": engineConfig.SourceName, + "source-uuid": engineConfig.SourceUUID, }).Info("Got config") - var sourceUUID uuid.UUID - if sourceUUIDString == "" { - sourceUUID = uuid.New() - } else { - sourceUUID, err = uuid.Parse(sourceUUIDString) - - if err != nil { - log.WithError(err).Fatal("Could not parse source UUID") - } - } - // Determine the required Overmind URLs ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -179,14 +166,14 @@ var rootCmd = &cobra.Command{ log.WithError(err).Fatal("Could not create AWS configs") } + // Initialize the engine + e, err := proc.InitializeAwsSourceEngine( rateLimitContext, - sourceName, + engineConfig, tracing.ServiceVersion, - sourceUUID, natsOptions, heartbeatOptions, - maxParallel, 999_999, // Very high max retries as it'll time out after 15min anyway configs..., ) @@ -284,6 +271,9 @@ func init() { log.WithError(err).Fatal("Could not determine hostname for use in NATS connection name and source name") } + // add engine flags + discovery.AddEngineFlags(rootCmd) + // General config options rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "/etc/srcman/config/source.yaml", "config file path") rootCmd.PersistentFlags().StringVar(&logLevel, "log", "info", "Set the log level. Valid values: panic, fatal, error, warn, info, debug, trace") @@ -295,16 +285,6 @@ func init() { rootCmd.PersistentFlags().String("nats-nkey-seed", "", "The NKey seed which corresponds to the NATS JWT e.g. SUAFK6QUC...") rootCmd.PersistentFlags().String("nats-connection-name", hostname, "The name that the source should use to connect to NATS") - rootCmd.PersistentFlags().String("api-key", "", "The API key to use to authenticate to the Overmind API") - // Support API Keys in the environment - err = viper.BindEnv("api-key", "OVM_API_KEY", "API_KEY") - if err != nil { - log.WithError(err).Fatal("could not bind api key to env") - } - - rootCmd.PersistentFlags().String("app", "https://app.overmind.tech", "The URL of the Overmind app to use") - rootCmd.PersistentFlags().Int("max-parallel", 2_000, "Max number of requests to run in parallel") - // Custom flags for this source rootCmd.PersistentFlags().String("aws-access-strategy", "defaults", "The strategy to use to access this customer's AWS account. Valid values: 'access-key', 'external-id', 'sso-profile', 'defaults'. Default: 'defaults'.") rootCmd.PersistentFlags().String("aws-access-key-id", "", "The ID of the access key to use") @@ -315,8 +295,6 @@ func init() { rootCmd.PersistentFlags().String("aws-regions", "", "Comma-separated list of AWS regions that this source should operate in") rootCmd.PersistentFlags().BoolP("auto-config", "a", false, "Use the local AWS config, the same as the AWS CLI could use. This can be set up with \"aws configure\"") rootCmd.PersistentFlags().IntP("health-check-port", "", 8080, "The port that the health check should run on") - rootCmd.PersistentFlags().String("source-name", fmt.Sprintf("aws-source-%v", hostname), "The name of the source") - rootCmd.PersistentFlags().String("source-uuid", "", "The UUID of the source, is this is blank it will be auto-generated. This is used in heartbeats and shouldn't be supplied usually") // tracing rootCmd.PersistentFlags().String("honeycomb-api-key", "", "If specified, configures opentelemetry libraries to submit traces to honeycomb") diff --git a/proc/proc.go b/proc/proc.go index a26b4555..272eb436 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -32,7 +32,6 @@ import ( awssns "github.com/aws/aws-sdk-go-v2/service/sns" awssqs "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/cenkalti/backoff/v4" - "github.com/google/uuid" "github.com/sourcegraph/conc/pool" "github.com/aws/aws-sdk-go-v2/aws" @@ -198,8 +197,8 @@ func CreateAWSConfigs(awsAuthConfig AwsAuthConfig) ([]aws.Config, error) { // engine, and an error if any. The context provided will be used for the rate // limit buckets and should not be cancelled until the source is shut down. AWS // configs should be provided for each region that is enabled -func InitializeAwsSourceEngine(ctx context.Context, name string, version string, engineUUID uuid.UUID, natsOptions auth.NATSOptions, heartbeatOptions *discovery.HeartbeatOptions, maxParallel int, maxRetries uint64, configs ...aws.Config) (*discovery.Engine, error) { - e, err := discovery.NewEngine() +func InitializeAwsSourceEngine(ctx context.Context, ec *discovery.EngineConfig, version string, natsOptions auth.NATSOptions, heartbeatOptions *discovery.HeartbeatOptions, maxRetries uint64, configs ...aws.Config) (*discovery.Engine, error) { + e, err := discovery.NewEngine(ec) if err != nil { return nil, fmt.Errorf("error initializing Engine: %w", err) } @@ -215,12 +214,8 @@ func InitializeAwsSourceEngine(ctx context.Context, name string, version string, e.HeartbeatOptions = heartbeatOptions } - e.Name = "aws-source" e.NATSOptions = &natsOptions - e.MaxParallelExecutions = maxParallel e.Version = version - e.Name = name - e.UUID = engineUUID e.Type = "aws" e.StartSendingHeartbeats(ctx)