Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added heartbeats to started sources #580

Merged
merged 10 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/actions/go_init/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ runs:
mv -v aws-source ..
mv -v k8s-source ..

- name: Install Docgen
shell: bash
run: go install github.com/overmindtech/docgen@latest

- name: Go Generate for sources
shell: bash
run: |
cd ../aws-source
go generate ./...
cd -
cd ../k8s-source
go generate ./...
cd -

- name: Go Generate
shell: bash
run: |
Expand Down
16 changes: 8 additions & 8 deletions cmd/auth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,62 +13,62 @@ import (

// AuthenticatedApiKeyClient Returns an apikey client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedApiKeyClient(ctx context.Context, oi OvermindInstance) sdpconnect.ApiKeyServiceClient {
func AuthenticatedApiKeyClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.ApiKeyServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind apikeys API (pre-authenticated)")
return sdpconnect.NewApiKeyServiceClient(httpClient, oi.ApiUrl.String())
}

// UnauthenticatedApiKeyClient Returns an apikey client with otel instrumentation
// but no authentication. Can only be used for ExchangeKeyForToken
func UnauthenticatedApiKeyClient(ctx context.Context, oi OvermindInstance) sdpconnect.ApiKeyServiceClient {
func UnauthenticatedApiKeyClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.ApiKeyServiceClient {
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind apikeys API")
return sdpconnect.NewApiKeyServiceClient(otelhttp.DefaultClient, oi.ApiUrl.String())
}

// AuthenticatedBookmarkClient Returns a bookmark client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedBookmarkClient(ctx context.Context, oi OvermindInstance) sdpconnect.BookmarksServiceClient {
func AuthenticatedBookmarkClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.BookmarksServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind bookmark API")
return sdpconnect.NewBookmarksServiceClient(httpClient, oi.ApiUrl.String())
}

// AuthenticatedChangesClient Returns a changes client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedChangesClient(ctx context.Context, oi OvermindInstance) sdpconnect.ChangesServiceClient {
func AuthenticatedChangesClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.ChangesServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind changes API")
return sdpconnect.NewChangesServiceClient(httpClient, oi.ApiUrl.String())
}

// AuthenticatedConfigurationClient Returns a config client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedConfigurationClient(ctx context.Context, oi OvermindInstance) sdpconnect.ConfigurationServiceClient {
func AuthenticatedConfigurationClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.ConfigurationServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind configuration API")
return sdpconnect.NewConfigurationServiceClient(httpClient, oi.ApiUrl.String())
}

// AuthenticatedManagementClient Returns a management client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedManagementClient(ctx context.Context, oi OvermindInstance) sdpconnect.ManagementServiceClient {
func AuthenticatedManagementClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.ManagementServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind management API")
return sdpconnect.NewManagementServiceClient(httpClient, oi.ApiUrl.String())
}

// AuthenticatedSnapshotsClient Returns a Snapshots client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedSnapshotsClient(ctx context.Context, oi OvermindInstance) sdpconnect.SnapshotsServiceClient {
func AuthenticatedSnapshotsClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.SnapshotsServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind snapshot API")
return sdpconnect.NewSnapshotsServiceClient(httpClient, oi.ApiUrl.String())
}

// AuthenticatedInviteClient Returns a Invite client that uses the auth
// embedded in the context and otel instrumentation
func AuthenticatedInviteClient(ctx context.Context, oi OvermindInstance) sdpconnect.InviteServiceClient {
func AuthenticatedInviteClient(ctx context.Context, oi sdp.OvermindInstance) sdpconnect.InviteServiceClient {
httpClient := NewAuthenticatedClient(ctx, otelhttp.DefaultClient)
log.WithContext(ctx).WithField("apiUrl", oi.ApiUrl).Debug("Connecting to overmind invite API")
return sdpconnect.NewInviteServiceClient(httpClient, oi.ApiUrl.String())
Expand Down
40 changes: 35 additions & 5 deletions cmd/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"atomicgo.dev/keyboard"
"atomicgo.dev/keyboard/keys"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/google/uuid"
"github.com/overmindtech/aws-source/proc"
"github.com/overmindtech/cli/tfutils"
"github.com/overmindtech/discovery"
"github.com/overmindtech/pterm"
"github.com/overmindtech/sdp-go"
stdlibsource "github.com/overmindtech/stdlib-source/sources"
log "github.com/sirupsen/logrus"
"github.com/sourcegraph/conc/pool"
Expand All @@ -34,18 +36,32 @@ var exploreCmd = &cobra.Command{
// any query or request during the runtime of the CLI. for proper cleanup,
// execute the returned function. The method returns once the sources are
// started. Progress is reported into the provided multi printer.
func StartLocalSources(ctx context.Context, oi OvermindInstance, token *oauth2.Token, tfArgs []string, multi *pterm.MultiPrinter) (func(), error) {
func StartLocalSources(ctx context.Context, oi sdp.OvermindInstance, token *oauth2.Token, tfArgs []string, multi *pterm.MultiPrinter) (func(), error) {
var err error

stdlibSpinner, _ := pterm.DefaultSpinner.WithWriter(multi.NewWriter()).Start("Starting stdlib source engine")
awsSpinner, _ := pterm.DefaultSpinner.WithWriter(multi.NewWriter()).Start("Starting AWS source engine")

natsOptions := natsOptions(ctx, oi, token)
heartbeatOptions := heartbeatOptions(oi, token)

hostname, err := os.Hostname()
if err != nil {
return func() {}, fmt.Errorf("failed to get hostname: %w", err)
}

p := pool.NewWithResults[*discovery.Engine]().WithErrors()

p.Go(func() (*discovery.Engine, error) {
stdlibEngine, err := stdlibsource.InitializeEngine(natsOptions, 2_000, true)
stdlibEngine, err := stdlibsource.InitializeEngine(
natsOptions,
fmt.Sprintf("stdlib-source-%v", hostname),
fmt.Sprintf("cli-%v", cliVersion),
uuid.New(),
heartbeatOptions,
2_000,
true,
)
if err != nil {
stdlibSpinner.Fail("Failed to initialize stdlib source engine")
return nil, fmt.Errorf("failed to initialize stdlib source engine: %w", err)
Expand Down Expand Up @@ -90,7 +106,17 @@ func StartLocalSources(ctx context.Context, oi OvermindInstance, token *oauth2.T
configs = append(configs, c)
}

awsEngine, err := proc.InitializeAwsSourceEngine(ctx, natsOptions, 2_000, configs...)
awsEngine, err := proc.InitializeAwsSourceEngine(
ctx,
fmt.Sprintf("aws-source-%v", hostname),
fmt.Sprintf("cli-%v", cliVersion),
uuid.New(),
natsOptions,
heartbeatOptions,
2_000,
1, // Don't retry as we want the user to get notified immediately
configs...,
)
if err != nil {
awsSpinner.Fail("Failed to initialize AWS source engine")
return nil, fmt.Errorf("failed to initialize AWS source engine: %w", err)
Expand Down Expand Up @@ -147,9 +173,13 @@ func Explore(cmd *cobra.Command, args []string) error {
pterm.Println()
pterm.Println(fmt.Sprintf("Explore your infrastructure graph at %v/explore", oi.FrontendUrl))
pterm.Println()
pterm.Success.Println("Press any key to stop the sources")
pterm.Success.Println("Press Ctrl+C to stop the locally running sources")
err = keyboard.Listen(func(keyInfo keys.Key) (stop bool, err error) {
return true, nil
if keyInfo.Code == keys.CtrlC {
return true, nil
}

return false, nil
})
if err != nil {
return fmt.Errorf("error reading keyboard input: %w", err)
Expand Down
34 changes: 29 additions & 5 deletions cmd/pterm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/exec"
"strings"
Expand All @@ -14,13 +15,16 @@ import (

"connectrpc.com/connect"
"github.com/overmindtech/cli/tracing"
"github.com/overmindtech/discovery"
"github.com/overmindtech/pterm"
"github.com/overmindtech/sdp-go"
"github.com/overmindtech/sdp-go/auth"
"github.com/overmindtech/sdp-go/sdpconnect"
log "github.com/sirupsen/logrus"
"github.com/sourcegraph/conc/pool"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"golang.org/x/oauth2"
)

Expand Down Expand Up @@ -58,7 +62,7 @@ func PTermSetup() {
}
}

func StartSources(ctx context.Context, cmd *cobra.Command, args []string) (context.Context, OvermindInstance, *oauth2.Token, func(), error) {
func StartSources(ctx context.Context, cmd *cobra.Command, args []string) (context.Context, sdp.OvermindInstance, *oauth2.Token, func(), error) {
multi := pterm.DefaultMultiPrinter
_, _ = multi.Start()
defer func() {
Expand All @@ -67,19 +71,19 @@ func StartSources(ctx context.Context, cmd *cobra.Command, args []string) (conte

ctx, oi, token, err := login(ctx, cmd, []string{"explore:read", "changes:write", "config:write", "request:receive"}, multi.NewWriter())
if err != nil {
return ctx, OvermindInstance{}, nil, nil, err
return ctx, sdp.OvermindInstance{}, nil, nil, err
}

cleanup, err := StartLocalSources(ctx, oi, token, args, &multi)
if err != nil {
return ctx, OvermindInstance{}, nil, nil, err
return ctx, sdp.OvermindInstance{}, nil, nil, err
}

return ctx, oi, token, cleanup, nil
}

// start revlink warmup in the background
func RunRevlinkWarmup(ctx context.Context, oi OvermindInstance, postPlanPrinter *atomic.Pointer[pterm.MultiPrinter], args []string) *pool.ErrorPool {
func RunRevlinkWarmup(ctx context.Context, oi sdp.OvermindInstance, postPlanPrinter *atomic.Pointer[pterm.MultiPrinter], args []string) *pool.ErrorPool {
p := pool.New().WithErrors()
p.Go(func() error {
ctx, span := tracing.Tracer().Start(ctx, "revlink warmup")
Expand Down Expand Up @@ -221,7 +225,7 @@ func snapshotDetail(state string, items, edges uint32) string {
return detailStr
}

func natsOptions(ctx context.Context, oi OvermindInstance, token *oauth2.Token) auth.NATSOptions {
func natsOptions(ctx context.Context, oi sdp.OvermindInstance, token *oauth2.Token) auth.NATSOptions {
hostname, err := os.Hostname()
if err != nil {
hostname = "localhost"
Expand Down Expand Up @@ -251,6 +255,26 @@ func natsOptions(ctx context.Context, oi OvermindInstance, token *oauth2.Token)
}
}

func heartbeatOptions(oi sdp.OvermindInstance, token *oauth2.Token) *discovery.HeartbeatOptions {
tokenSource := oauth2.StaticTokenSource(token)

transport := oauth2.Transport{
Source: tokenSource,
Base: http.DefaultTransport,
}
authenticatedClient := http.Client{
Transport: otelhttp.NewTransport(&transport),
}

return &discovery.HeartbeatOptions{
ManagementClient: sdpconnect.NewManagementServiceClient(
&authenticatedClient,
oi.ApiUrl.String(),
),
Frequency: time.Second * 30,
}
}

func HasScopesFlexible(token *oauth2.Token, requiredScopes []string) (bool, string, error) {
if token == nil {
return false, "", errors.New("HasScopesFlexible: token is nil")
Expand Down
Loading
Loading