Skip to content

Commit

Permalink
Merge pull request #843 from bbockelm/client_refactor_v2
Browse files Browse the repository at this point in the history
Refactor client to have an asynchronous core
  • Loading branch information
joereuss12 authored Feb 29, 2024
2 parents 08bf74c + 9659f93 commit 756fda6
Show file tree
Hide file tree
Showing 32 changed files with 2,108 additions and 1,262 deletions.
2 changes: 2 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ builds:
binary: pelican
tags:
- forceposix
ldflags:
- -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.builtBy=goreleaser -X config.version={{.Version}}
ignore:
- goos: windows
goarch: arm64
Expand Down
2 changes: 1 addition & 1 deletion broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func doRetrieveRequest(t *testing.T, ctx context.Context, dur time.Duration) (*h

req.Header.Set("X-Pelican-Timeout", dur.String())
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "pelican-origin/"+config.PelicanVersion)
req.Header.Set("User-Agent", "pelican-origin/"+config.GetVersion())

req.Header.Set("Authorization", "Bearer "+token)

Expand Down
6 changes: 3 additions & 3 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func ConnectToOrigin(ctx context.Context, brokerUrl, prefix, originName string)
// Send a request to the broker for a connection reversal
req, err := http.NewRequestWithContext(ctx, "POST", brokerUrl, reqReader)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "pelican-cache/"+config.PelicanVersion)
req.Header.Set("User-Agent", "pelican-cache/"+config.GetVersion())

brokerAud, err := url.Parse(brokerUrl)
if err != nil {
Expand Down Expand Up @@ -404,7 +404,7 @@ func doCallback(ctx context.Context, brokerResp reversalRequest) (listener net.L
dur := time.Duration(5*time.Second - time.Duration(mrand.Intn(500))*time.Millisecond)
req.Header.Set("X-Pelican-Timeout", dur.String())
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "pelican-origin/"+config.PelicanVersion)
req.Header.Set("User-Agent", "pelican-origin/"+config.GetVersion())

cacheAud, err := url.Parse(brokerResp.CallbackUrl)
if err != nil {
Expand Down Expand Up @@ -585,7 +585,7 @@ func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, resultChan
dur := param.Transport_ResponseHeaderTimeout.GetDuration() - time.Duration(mrand.Intn(500))*time.Millisecond
req.Header.Set("X-Pelican-Timeout", dur.String())
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "pelican-origin/"+config.PelicanVersion)
req.Header.Set("User-Agent", "pelican-origin/"+config.GetVersion())

brokerAud, err := url.Parse(param.Federation_BrokerUrl.GetString())
if err != nil {
Expand Down
36 changes: 18 additions & 18 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ func queryDirector(verb, source, directorUrl string) (resp *http.Response, err e
// Include the Client's version as a User-Agent header. The Director will decide
// if it supports the version, and provide an error message in the case that it
// cannot.
userAgent := "pelican-client/" + ObjectClientOptions.Version
req.Header.Set("User-Agent", userAgent)
req.Header.Set("User-Agent", getUserAgent(""))

// Perform the HTTP request
resp, err = client.Do(req)
Expand Down Expand Up @@ -235,8 +234,8 @@ func GetCachesFromDirectorResponse(resp *http.Response, needsToken bool) (caches
}

// NewTransferDetails creates the TransferDetails struct with the given cache
func NewTransferDetailsUsingDirector(cache namespaces.DirectorCache, opts TransferDetailsOptions) []TransferDetails {
details := make([]TransferDetails, 0)
func NewTransferDetailsUsingDirector(cache namespaces.DirectorCache, opts transferDetailsOptions) []transferAttemptDetails {
details := make([]transferAttemptDetails, 0)
cacheEndpoint := cache.EndpointUrl

// Form the URL
Expand All @@ -252,40 +251,41 @@ func NewTransferDetailsUsingDirector(cache namespaces.DirectorCache, opts Transf
cacheURL.Scheme = ""
cacheURL.Opaque = ""
}
log.Debugf("Parsed Cache: %s\n", cacheURL.String())
log.Debugf("Parsed Cache: %s", cacheURL.String())
if opts.NeedsToken {
cacheURL.Scheme = "https"
if !HasPort(cacheURL.Host) {
if !hasPort(cacheURL.Host) {
// Add port 8444 and 8443
cacheURL.Host += ":8444"
details = append(details, TransferDetails{
Url: *cacheURL,
urlCopy := *cacheURL
urlCopy.Host += ":8444"
details = append(details, transferAttemptDetails{
Url: &urlCopy,
Proxy: false,
PackOption: opts.PackOption,
})
// Strip the port off and add 8443
cacheURL.Host = cacheURL.Host[:len(cacheURL.Host)-5] + ":8443"
cacheURL.Host = cacheURL.Host + ":8443"
}
// Whether port is specified or not, add a transfer without proxy
details = append(details, TransferDetails{
Url: *cacheURL,
details = append(details, transferAttemptDetails{
Url: cacheURL,
Proxy: false,
PackOption: opts.PackOption,
})
} else {
cacheURL.Scheme = "http"
if !HasPort(cacheURL.Host) {
if !hasPort(cacheURL.Host) {
cacheURL.Host += ":8000"
}
isProxyEnabled := IsProxyEnabled()
details = append(details, TransferDetails{
Url: *cacheURL,
isProxyEnabled := isProxyEnabled()
details = append(details, transferAttemptDetails{
Url: cacheURL,
Proxy: isProxyEnabled,
PackOption: opts.PackOption,
})
if isProxyEnabled && CanDisableProxy() {
details = append(details, TransferDetails{
Url: *cacheURL,
details = append(details, transferAttemptDetails{
Url: cacheURL,
Proxy: false,
PackOption: opts.PackOption,
})
Expand Down
8 changes: 4 additions & 4 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestNewTransferDetailsUsingDirector(t *testing.T) {

// Case 1: cache with http

transfers := NewTransferDetailsUsingDirector(nonAuthCache, TransferDetailsOptions{nonAuthCache.AuthedReq, ""})
transfers := NewTransferDetailsUsingDirector(nonAuthCache, transferDetailsOptions{nonAuthCache.AuthedReq, ""})
assert.Equal(t, 2, len(transfers))
assert.Equal(t, "my-cache-url:8000", transfers[0].Url.Host)
assert.Equal(t, "http", transfers[0].Url.Scheme)
Expand All @@ -166,15 +166,15 @@ func TestNewTransferDetailsUsingDirector(t *testing.T) {
assert.Equal(t, false, transfers[1].Proxy)

// Case 2: cache with https
transfers = NewTransferDetailsUsingDirector(authCache, TransferDetailsOptions{authCache.AuthedReq, ""})
transfers = NewTransferDetailsUsingDirector(authCache, transferDetailsOptions{authCache.AuthedReq, ""})
assert.Equal(t, 1, len(transfers))
assert.Equal(t, "my-cache-url:8443", transfers[0].Url.Host)
assert.Equal(t, "https", transfers[0].Url.Scheme)
assert.Equal(t, false, transfers[0].Proxy)

// Case 3: cache without port with http
nonAuthCache.EndpointUrl = "my-cache-url"
transfers = NewTransferDetailsUsingDirector(nonAuthCache, TransferDetailsOptions{nonAuthCache.AuthedReq, ""})
transfers = NewTransferDetailsUsingDirector(nonAuthCache, transferDetailsOptions{nonAuthCache.AuthedReq, ""})
assert.Equal(t, 2, len(transfers))
assert.Equal(t, "my-cache-url:8000", transfers[0].Url.Host)
assert.Equal(t, "http", transfers[0].Url.Scheme)
Expand All @@ -185,7 +185,7 @@ func TestNewTransferDetailsUsingDirector(t *testing.T) {

// Case 4. cache without port with https
authCache.EndpointUrl = "my-cache-url"
transfers = NewTransferDetailsUsingDirector(authCache, TransferDetailsOptions{authCache.AuthedReq, ""})
transfers = NewTransferDetailsUsingDirector(authCache, transferDetailsOptions{authCache.AuthedReq, ""})
assert.Equal(t, 2, len(transfers))
assert.Equal(t, "my-cache-url:8444", transfers[0].Url.Host)
assert.Equal(t, "https", transfers[0].Url.Scheme)
Expand Down
104 changes: 71 additions & 33 deletions client/errorAccum.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,83 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"strings"
"time"

grab "github.com/opensaucerer/grab/v3"
)

type TimestampedError struct {
err error
timestamp time.Time
}
type (
TimestampedError struct {
err error
timestamp time.Time
}

var (
bunchOfErrors []TimestampedError
mu sync.Mutex
// We will generate an error string including the time since startup
startup time.Time = time.Now()
// A container object for multiple sub-errors representing transfer failures.
TransferErrors struct {
start time.Time
errors []error
}
)

// AddError will add an accumulated error to the error stack
func AddError(err error) bool {
mu.Lock()
defer mu.Unlock()
bunchOfErrors = append(bunchOfErrors, TimestampedError{err, time.Now()})
return true
func (te *TimestampedError) Error() string {
return te.err.Error()
}

func (te *TimestampedError) Unwrap() error {
return te.err
}

// Create a new transfer error object
func NewTransferErrors() *TransferErrors {
return &TransferErrors{
start: time.Now(),
errors: make([]error, 0),
}
}

func (te *TransferErrors) AddError(err error) {
if te.errors == nil {
te.errors = make([]error, 0)
}
if err != nil {
te.errors = append(te.errors, &TimestampedError{err: err, timestamp: time.Now()})
}
}

func ClearErrors() {
mu.Lock()
defer mu.Unlock()
func (te *TransferErrors) Unwrap() []error {
return te.errors
}

bunchOfErrors = make([]TimestampedError, 0)
func (te *TransferErrors) Error() string {
if te.errors == nil {
return "transfer error unknown"
}
if len(te.errors) == 1 {
return "transfer error: " + te.errors[0].Error()
}
errors := make([]string, len(te.errors))
for idx, err := range te.errors {
errors[idx] = err.Error()
}
return "transfer errors: [" + strings.Join(errors, ", ") + "]"
}

func GetErrors() string {
mu.Lock()
defer mu.Unlock()
// Return a more refined, user-friendly error string
func (te *TransferErrors) UserError() string {
first := true
lastError := startup
lastError := te.start
var errorsFormatted []string
for idx, theError := range bunchOfErrors {
for idx, err := range te.errors {
theError := err.(*TimestampedError)
errFmt := fmt.Sprintf("Attempt #%v: %s", idx+1, theError.err.Error())
timeElapsed := theError.timestamp.Sub(lastError)
timeFormat := timeElapsed.Truncate(100 * time.Millisecond).String()
errFmt += " (" + timeFormat
if first {
errFmt += " since start)"
} else {
timeSinceStart := theError.timestamp.Sub(startup)
timeSinceStart := theError.timestamp.Sub(te.start)
timeSinceStartFormat := timeSinceStart.Truncate(100 * time.Millisecond).String()
errFmt += " elapsed, " + timeSinceStartFormat + " since start)"
}
Expand Down Expand Up @@ -127,15 +156,24 @@ func IsRetryable(err error) bool {
return false
}

// ErrorsRetryable returns if the errors in the stack are retryable later
func ErrorsRetryable() bool {
mu.Lock()
defer mu.Unlock()
// Loop through the errors and see if all of them are retryable
for _, theError := range bunchOfErrors {
if !IsRetryable(theError.err) {
// Returns true if all errors are retryable.
// If no errors are present, then returns true
func (te *TransferErrors) AllErrorsRetryable() bool {
if te.errors == nil {
return true
}
for _, err := range te.errors {
if !IsRetryable(err) {
return false
}
}
return true
}

func ShouldRetry(err error) bool {
var te *TransferErrors
if errors.As(err, &te) {
return te.AllErrorsRetryable()
}
return IsRetryable(err)
}
45 changes: 19 additions & 26 deletions client/errorAccum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,45 @@ import (

// TestErrorAccum tests simple adding and removing from the accumulator
func TestErrorAccum(t *testing.T) {
bunchOfErrors = make([]TimestampedError, 0)
defer func() {
bunchOfErrors = make([]TimestampedError, 0)
}()
te := NewTransferErrors()
// Case 1: cache with http
err := errors.New("error1")
err2 := errors.New("error2")
AddError(err)
AddError(err2)
te.AddError(err)
te.AddError(err2)

errStr := GetErrors()
errStr := te.UserError()
assert.Regexp(t, `Attempt\ \#2:\ error2\ \(0s\ elapsed,\ [0-9]+m?s\ since\ start\);\ Attempt\ \#1:\ error1\ \([0-9]+m?s\ since\ start\)`, errStr)

}

// TestErrorsRetryableFalse tests that errors are not retryable
func TestErrorsRetryableFalse(t *testing.T) {
bunchOfErrors = make([]TimestampedError, 0)
defer func() {
bunchOfErrors = make([]TimestampedError, 0)
}()
te := NewTransferErrors()

// Case 2: cache with http
AddError(&SlowTransferError{})
AddError(&SlowTransferError{})
assert.True(t, ErrorsRetryable(), "ErrorsRetryable should be true")
te.AddError(&SlowTransferError{})
te.AddError(&SlowTransferError{})
assert.True(t, te.AllErrorsRetryable(), "ErrorsRetryable should be true")

AddError(&ConnectionSetupError{})
assert.True(t, ErrorsRetryable(), "ErrorsRetryable should be true")
te.AddError(&ConnectionSetupError{})
assert.True(t, te.AllErrorsRetryable(), "ErrorsRetryable should be true")

// Now add a non-retryable error
AddError(errors.New("Non retryable error"))
assert.False(t, ErrorsRetryable(), "ErrorsRetryable should be false")
te.AddError(errors.New("Non retryable error"))
assert.False(t, te.AllErrorsRetryable(), "ErrorsRetryable should be false")

}

// TestErrorsRetryableTrue tests that errors are retryable
func TestErrorsRetryableTrue(t *testing.T) {
bunchOfErrors = make([]TimestampedError, 0)
defer func() {
bunchOfErrors = make([]TimestampedError, 0)
}()
te := NewTransferErrors()

// Try with a retryable error nested error
AddError(&url.Error{Err: &SlowTransferError{}})
assert.True(t, ErrorsRetryable(), "ErrorsRetryable should be true")
te.AddError(&url.Error{Err: &SlowTransferError{}})
assert.True(t, te.AllErrorsRetryable(), "ErrorsRetryable should be true")

AddError(&ConnectionSetupError{})
assert.True(t, ErrorsRetryable(), "ErrorsRetryable should be true")
te.AddError(&ConnectionSetupError{})
assert.True(t, te.AllErrorsRetryable(), "ErrorsRetryable should be true")

}
Loading

0 comments on commit 756fda6

Please sign in to comment.