Skip to content

Commit

Permalink
Merge pull request #698 from overmindtech/streaming
Browse files Browse the repository at this point in the history
Streaming
  • Loading branch information
dylanratcliffe authored Dec 8, 2024
2 parents 811e31a + f6d277f commit 500a150
Show file tree
Hide file tree
Showing 28 changed files with 2,036 additions and 682 deletions.
183 changes: 93 additions & 90 deletions adapterhelpers/always_get_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/overmindtech/discovery"
"github.com/overmindtech/sdp-go"
"github.com/overmindtech/sdpcache"
"github.com/sourcegraph/conc/pool"
Expand Down Expand Up @@ -196,158 +197,154 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru

// List Lists all available items. This is done by running the ListFunc, then
// passing these results to GetFunc in order to get the details
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) List(ctx context.Context, scope string, ignoreCache bool) ([]*sdp.Item, error) {
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) ListStream(ctx context.Context, scope string, ignoreCache bool, stream *discovery.QueryResultStream) {
if scope != s.Scopes()[0] {
return nil, &sdp.QueryError{
stream.SendError(&sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("requested scope %v does not match adapter scope %v", scope, s.Scopes()[0]),
}
})
return
}

if err := s.Validate(); err != nil {
stream.SendError(WrapAWSError(err))
return
}

// Check to see if we have supplied the required functions
if s.DisableList {
// In this case we can't run list, so just return empty
return []*sdp.Item{}, nil
return
}

s.ensureCache()
cacheHit, ck, cachedItems, qErr := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_LIST, scope, s.ItemType, "", ignoreCache)
if qErr != nil {
return nil, qErr
stream.SendError(qErr)
return
}
if cacheHit {
return cachedItems, nil
}

items, err := s.listInternal(ctx, scope, s.ListInput)
if err != nil {
err := WrapAWSError(err)
if !CanRetry(err) {
s.cache.StoreError(err, s.cacheDuration(), ck)
for _, item := range cachedItems {
stream.SendItem(item)
}
return nil, err
return
}

for _, item := range items {
s.cache.StoreItem(item, s.cacheDuration(), ck)
}

return items, nil
s.listInternal(ctx, scope, s.ListInput, ck, stream)
}

// listInternal Accepts a ListInput and runs the List logic against it
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) listInternal(ctx context.Context, scope string, input ListInput) ([]*sdp.Item, error) {
var output ListOutput
var err error

if err = s.Validate(); err != nil {
return nil, WrapAWSError(err)
}

p := pool.NewWithResults[*sdp.Item]().WithErrors().WithContext(ctx).WithMaxGoroutines(s.MaxParallel.Value())

func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) listInternal(ctx context.Context, scope string, input ListInput, ck sdpcache.CacheKey, stream *discovery.QueryResultStream) {
paginator := s.ListFuncPaginatorBuilder(s.Client, input)
var newGetInputs []GetInput

for paginator.HasMorePages() {
output, err = paginator.NextPage(ctx)
p := pool.New().WithContext(ctx).WithMaxGoroutines(s.MaxParallel.Value())

output, err := paginator.NextPage(ctx)

if err != nil {
return nil, err
err := WrapAWSError(err)
if !CanRetry(err) {
s.cache.StoreError(err, s.cacheDuration(), ck)
}
stream.SendError(err)
return
}

newGetInputs, err = s.ListFuncOutputMapper(output, input)

if err != nil {
return nil, err
err := WrapAWSError(err)
if !CanRetry(err) {
s.cache.StoreError(err, s.cacheDuration(), ck)
}
stream.SendError(err)
return
}

for _, input := range newGetInputs {
p.Go(func(ctx context.Context) (*sdp.Item, error) {
return s.GetFunc(ctx, s.Client, scope, input)
p.Go(func(ctx context.Context) error {
// Ignore the error here as we don't want to stop the whole process
item, _ := s.GetFunc(ctx, s.Client, scope, input)

if item != nil {
s.cache.StoreItem(item, s.cacheDuration(), ck)
stream.SendItem(item)
}

return nil
})
}
}

// We are deciding to throw the errors away from the Get requests, this
// probably isn't the best idea, but we don't want to fail the whole list
// because a Get failed. We might want to revisit this logic in the future
items, _ := p.Wait()

return items, nil
// Wait for this page to be processed before moving on to the next one
_ = p.Wait()
}
}

// Search Searches for AWS resources by ARN
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) Search(ctx context.Context, scope string, query string, ignoreCache bool) ([]*sdp.Item, error) {
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) SearchStream(ctx context.Context, scope string, query string, ignoreCache bool, stream *discovery.QueryResultStream) {
if scope != s.Scopes()[0] {
return nil, &sdp.QueryError{
stream.SendError(&sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("requested scope %v does not match adapter scope %v", scope, s.Scopes()[0]),
}
})
return
}

var items []*sdp.Item
var err error
if err := s.Validate(); err != nil {
stream.SendError(WrapAWSError(err))
return
}

if s.SearchInputMapper == nil && s.SearchGetInputMapper == nil {
items, err = s.SearchARN(ctx, scope, query, ignoreCache)
s.SearchARN(ctx, scope, query, ignoreCache, stream)
} else {
// If we should always look for ARNs first, do that
if s.AlwaysSearchARNs {
if _, err = ParseARN(query); err == nil {
items, err = s.SearchARN(ctx, scope, query, ignoreCache)
if _, err := ParseARN(query); err == nil {
s.SearchARN(ctx, scope, query, ignoreCache, stream)
} else {
items, err = s.SearchCustom(ctx, scope, query, ignoreCache)
s.SearchCustom(ctx, scope, query, ignoreCache, stream)
}
} else {
items, err = s.SearchCustom(ctx, scope, query, ignoreCache)
s.SearchCustom(ctx, scope, query, ignoreCache, stream)
}
}

if err != nil {
return nil, err
}

return items, nil
}

// SearchCustom Searches using custom mapping logic. The SearchInputMapper is
// used to create an input for ListFunc, at which point the usual logic is used
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) SearchCustom(ctx context.Context, scope string, query string, ignoreCache bool) ([]*sdp.Item, error) {
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) SearchCustom(ctx context.Context, scope string, query string, ignoreCache bool, stream *discovery.QueryResultStream) {
s.ensureCache()
cacheHit, ck, cachedItems, qErr := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_SEARCH, scope, s.ItemType, query, ignoreCache)
if qErr != nil {
return nil, qErr
stream.SendError(qErr)
return
}
if cacheHit {
return cachedItems, nil
for _, item := range cachedItems {
stream.SendItem(item)
}
return
}

var items []*sdp.Item

if s.SearchInputMapper != nil {
input, err := s.SearchInputMapper(scope, query)

if err != nil {
// Don't bother caching this error since it costs nearly nothing
return nil, WrapAWSError(err)
stream.SendError(WrapAWSError(err))
return
}

items, err = s.listInternal(ctx, scope, input)

if err != nil {
err := WrapAWSError(err)
if !CanRetry(err) {
s.cache.StoreError(err, s.cacheDuration(), ck)
}
return nil, err
}
s.listInternal(ctx, scope, input, ck, stream)
} else if s.SearchGetInputMapper != nil {
input, err := s.SearchGetInputMapper(scope, query)

if err != nil {
// Don't cache this as it costs nearly nothing
return nil, WrapAWSError(err)
stream.SendError(WrapAWSError(err))
return
}

item, err := s.GetFunc(ctx, s.Client, scope, input)
Expand All @@ -357,51 +354,57 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
if !CanRetry(err) {
s.cache.StoreError(err, s.cacheDuration(), ck)
}
return nil, err
stream.SendError(err)
return
}

items = []*sdp.Item{item}
if item != nil {
s.cache.StoreItem(item, s.cacheDuration(), ck)
stream.SendItem(item)
}
} else {
return nil, errors.New("SearchCustom called without SearchInputMapper or SearchGetInputMapper")
}

for _, item := range items {
s.cache.StoreItem(item, s.cacheDuration(), ck)
stream.SendError(errors.New("SearchCustom called without SearchInputMapper or SearchGetInputMapper"))
return
}
return items, nil
}

func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) SearchARN(ctx context.Context, scope string, query string, ignoreCache bool) ([]*sdp.Item, error) {
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) SearchARN(ctx context.Context, scope string, query string, ignoreCache bool, stream *discovery.QueryResultStream) {
// Parse the ARN
a, err := ParseARN(query)

if err != nil {
return nil, WrapAWSError(err)
stream.SendError(WrapAWSError(err))
return
}

if a.ContainsWildcard() {
// We can't handle wildcards by default so bail out
return nil, &sdp.QueryError{
stream.SendError(&sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: fmt.Sprintf("wildcards are not supported by adapter %v", s.Name()),
Scope: scope,
}
})
return
}

if arnScope := FormatScope(a.AccountID, a.Region); arnScope != scope {
return nil, &sdp.QueryError{
stream.SendError(&sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("ARN scope %v does not match request scope %v", arnScope, scope),
Scope: scope,
}
})
return
}

item, err := s.Get(ctx, scope, a.ResourceID(), ignoreCache)
if err != nil {
return nil, WrapAWSError(err)
stream.SendError(WrapAWSError(err))
return
}

return []*sdp.Item{item}, nil
if item != nil {
stream.SendItem(item)
}
}

// Weight Returns the priority weighting of items returned by this sourcs.
Expand Down
Loading

0 comments on commit 500a150

Please sign in to comment.