Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AV-73526] Add support for rate-limiting retries #157

Merged
merged 5 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/couchbasecloud/terraform-provider-couchbase-capella/internal/errors"
Expand Down Expand Up @@ -118,27 +119,27 @@ func (c *Client) ExecuteWithRetry(
headers map[string]string,
) (response *Response, err error) {
var requestBody []byte
var dur time.Duration
ajsqr marked this conversation as resolved.
Show resolved Hide resolved
if payload != nil {
requestBody, err = json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("%s: %w", errors.ErrMarshallingPayload, err)
}
}

req, err := http.NewRequest(endpointCfg.Method, endpointCfg.Url, bytes.NewReader(requestBody))
if err != nil {
return nil, fmt.Errorf("%s: %w", errors.ErrConstructingRequest, err)
}

req.Header.Set("Authorization", "Bearer "+authToken)
for header, value := range headers {
req.Header.Set(header, value)
}
var fn = func() (response *Response, backoff time.Duration, err error) {
req, err := http.NewRequest(endpointCfg.Method, endpointCfg.Url, bytes.NewReader(requestBody))
if err != nil {
return nil, dur, fmt.Errorf("%s: %w", errors.ErrConstructingRequest, err)
}

var fn = func() (response *Response, err error) {
req.Header.Set("Authorization", "Bearer "+authToken)
for header, value := range headers {
req.Header.Set(header, value)
}
apiRes, err := c.Do(req)
if err != nil {
return nil, fmt.Errorf("%s: %w", errors.ErrExecutingRequest, err)
return nil, dur, fmt.Errorf("%s: %w", errors.ErrExecutingRequest, err)
}
defer apiRes.Body.Close()

Expand All @@ -150,38 +151,47 @@ func (c *Client) ExecuteWithRetry(
switch apiRes.StatusCode {
case endpointCfg.SuccessStatus:
// success case
case http.StatusTooManyRequests:
header := apiRes.Header.Get("Retry-After")
retryAfter, err := strconv.Atoi(header)
if err != nil {
return nil, dur, fmt.Errorf("error parsing Retry-After value from response header")
}
dur = time.Second * time.Duration(retryAfter)
ajsqr marked this conversation as resolved.
Show resolved Hide resolved
return nil, dur, errors.ErrRatelimit
case http.StatusGatewayTimeout:
return nil, errors.ErrGatewayTimeout
return nil, dur, errors.ErrGatewayTimeout
default:
var apiError Error
if err := json.Unmarshal(responseBody, &apiError); err != nil {
return nil, fmt.Errorf(
return nil, dur, fmt.Errorf(
"unexpected code: %d, expected: %d, body: %s",
apiRes.StatusCode, endpointCfg.SuccessStatus, responseBody)
}
if apiError.Code == 0 {
return nil, fmt.Errorf(
return nil, dur, fmt.Errorf(
"unexpected code: %d, expected: %d, body: %s",
apiRes.StatusCode, endpointCfg.SuccessStatus, responseBody)

}
return nil, &apiError
return nil, dur, &apiError
}

return &Response{
Response: apiRes,
Body: responseBody,
}, nil
}, dur, nil
}

return exec(ctx, fn, defaultWaitAttempt)
ajsqr marked this conversation as resolved.
Show resolved Hide resolved
}

func exec(ctx context.Context, fn func() (response *Response, err error), waitOnReattempt time.Duration) (*Response, error) {
func exec(ctx context.Context, fn func() (response *Response, dur time.Duration, err error), waitOnReattempt time.Duration) (*Response, error) {
ajsqr marked this conversation as resolved.
Show resolved Hide resolved
timer := time.NewTimer(time.Millisecond)

var (
err error
backOff time.Duration
response *Response
)

Expand All @@ -196,14 +206,20 @@ func exec(ctx context.Context, fn func() (response *Response, err error), waitOn
case <-ctx.Done():
return nil, fmt.Errorf("timed out executing request against api: %w", err)
case <-timer.C:
response, err = fn()
response, backOff, err = fn()
switch {
case err == nil:
return response, nil
case goer.Is(err, errors.ErrRatelimit):
case !goer.Is(err, errors.ErrGatewayTimeout):
return response, err
}
timer.Reset(waitOnReattempt)

if backOff > 0 {
timer.Reset(backOff)
} else {
timer.Reset(waitOnReattempt)
}
ajsqr marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
3 changes: 3 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ var (
// ErrIfMatchCannotBeSetWhileCreate is returned when if_match is set during create operation.
ErrIfMatchCannotBeSetWhileCreate = errors.New("if_match attribute cannot be set during create operation")

// ErrRatelimit is returned when the Capella API reaches a rate limit for the same API key.
ErrRatelimit = errors.New("api key reached the ratelimit")

// ErrScopeNameMissing is returned when an expected ScopeName was not found after an import.
ErrScopeNameMissing = errors.New("scope Name is missing or was passed incorrectly, please check provider documentation for syntax")

ajsqr marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading