Skip to content

Commit

Permalink
Sort the caches based on responsiveness
Browse files Browse the repository at this point in the history
If we cannot get a HEAD request through a cache within a second, it's
a strong signal that it won't be responsive later on when we try to
download.  Do simultaneous HEAD queries and sort those without a response
after 1 second to the end.

The intent is to reduce the time cost of a totally unresponsive cache.
  • Loading branch information
bbockelm committed Mar 11, 2024
1 parent cd3d906 commit 316cc2e
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 11 deletions.
111 changes: 104 additions & 7 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,98 @@ func runTransferWorker(ctx context.Context, workChan <-chan *clientTransferFile,
}
}

// If there are multiple potential attempts, try to see if we can quickly eliminate some of them
//
// Attempts a HEAD against all the endpoints simultaneously. Put any that don't respond within
// a second behind those that do respond.
func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDetails) (size int64, results []transferAttemptDetails) {
size = -1
if len(attempts) < 2 {
results = attempts
return
}
transport := config.GetTransport()
headChan := make(chan struct {
idx int
size uint64
err error
})
defer close(headChan)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
for idx, transferEndpoint := range attempts {
tUrl := *transferEndpoint.Url
tUrl.Path = path

go func(idx int, tUrl string) {
headClient := &http.Client{Transport: transport}
headRequest, _ := http.NewRequestWithContext(ctx, "HEAD", tUrl, nil)
var headResponse *http.Response
headResponse, err := headClient.Do(headRequest)
if err != nil {
headChan <- struct {
idx int
size uint64
err error
}{idx, 0, err}
return
}
headResponse.Body.Close()
contentLengthStr := headResponse.Header.Get("Content-Length")
size := int64(0)
if contentLengthStr != "" {
size, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
log.Errorln("problem converting content-length to an int:", err)
}
}
headChan <- struct {
idx int
size uint64
err error
}{idx, uint64(size), nil}

This comment has been minimized.

Copy link
@haoming29

haoming29 Mar 11, 2024

Contributor

Do we want to return error or nil here if there is an error converting Content-Length header?

}(idx, tUrl.String())
}
finished := make(map[int]bool)
for ctr := 0; ctr != len(attempts); ctr++ {
result := <-headChan
if result.err != nil {
if result.err != context.Canceled {
log.Debugf("Failure when doing a HEAD request against %s: %s", attempts[result.idx].Url.String(), result.err.Error())
}
} else {
finished[result.idx] = true
if result.idx == 0 {
cancel()

This comment has been minimized.

Copy link
@haoming29

haoming29 Mar 11, 2024

Contributor

Trying to understand the logic here. Are we cancelling the reminding request if we get back from the first attempt? Wouldn't that cause the remaining attempts (no matter they succeeded or not) to have finished[idx] = false? Not sure if this will cause unwanted side effect

}
if size <= int64(result.size) {
size = int64(result.size)
}
}
}
// Sort all the successful attempts first; use stable sort so the original ordering
// is preserved if the two entries are both successful or both unsuccessful.
type sorter struct {
good bool
attempt transferAttemptDetails
}
tmpResults := make([]sorter, len(attempts))
for idx, attempt := range attempts {
tmpResults[idx] = sorter{finished[idx], attempt}
}
results = make([]transferAttemptDetails, len(attempts))
slices.SortStableFunc(tmpResults, func(left sorter, right sorter) int {
if left.good && !right.good {
return -1
}
return 0
})
for idx, val := range tmpResults {
results[idx] = val.attempt
}
return
}

func downloadObject(transfer *transferFile) (transferResults TransferResults, err error) {
log.Debugln("Downloading file from", transfer.remoteURL, "to", transfer.localPath)
// Remove the source from the file path
Expand All @@ -1192,9 +1284,12 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
if err = os.MkdirAll(directory, 0700); err != nil {
return
}

size, attempts := sortAttempts(transfer.job.ctx, transfer.remoteURL.Path, transfer.attempts)

transferResults = newTransferResults(transfer.job)
success := false
for idx, transferEndpoint := range transfer.attempts { // For each transfer (usually 3), populate each attempt given
for idx, transferEndpoint := range attempts { // For each transfer attempt (usually 3), try to download via HTTP
var attempt TransferResult
var timeToFirstByte float64
var serverVersion string
Expand All @@ -1206,7 +1301,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
transferEndpointUrl.Path = transfer.remoteURL.Path
transferEndpoint.Url = &transferEndpointUrl
transferStartTime := time.Now()
if downloaded, timeToFirstByte, serverVersion, err = downloadHTTP(transfer.ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, transfer.token, &transfer.accounting); err != nil {
if downloaded, timeToFirstByte, serverVersion, err = downloadHTTP(transfer.ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, transfer.token, &transfer.accounting); err != nil {
log.Debugln("Failed to download:", err)
transferEndTime := time.Now()
transferTime := transferEndTime.Unix() - transferStartTime.Unix()
Expand Down Expand Up @@ -1278,7 +1373,7 @@ func parseTransferStatus(status string) (int, string) {
// Perform the actual download of the file
//
// Returns the downloaded size, time to 1st byte downloaded, serverVersion and an error if there is one
func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCallbackFunc, transfer transferAttemptDetails, dest string, token string, payload *payloadStruct) (downloaded int64, timeToFirstByte float64, serverVersion string, err error) {
func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCallbackFunc, transfer transferAttemptDetails, dest string, totalSize int64, token string, payload *payloadStruct) (downloaded int64, timeToFirstByte float64, serverVersion string, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorln("Panic occurred in downloadHTTP:", r)
Expand All @@ -1287,15 +1382,17 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
}
}()

var totalSize int64 = 0

lastUpdate := time.Now()
if callback != nil {
callback(dest, 0, 0, false)
}
defer func() {
if callback != nil {
callback(dest, downloaded, totalSize, true)
finalSize := int64(0)
if totalSize >= 0 {
finalSize = totalSize
}
callback(dest, downloaded, finalSize, true)
}
if te != nil {
te.ewmaCtr.Add(int64(time.Since(lastUpdate)))
Expand Down Expand Up @@ -1396,7 +1493,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
// Size of the download
totalSize = resp.Size()
// Do a head request for content length if resp.Size is unknown
if totalSize <= 0 {
if totalSize <= 0 && !resp.IsComplete() {
headClient := &http.Client{Transport: transport}
headRequest, _ := http.NewRequest("HEAD", transferUrl.String(), nil)
var headResponse *http.Response
Expand Down
71 changes: 67 additions & 4 deletions client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/namespaces"
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestSlowTransfers(t *testing.T) {
var err error
// Do a quick timeout
go func() {
_, _, _, err = downloadHTTP(ctx, nil, nil, transfers[0], filepath.Join(t.TempDir(), "test.txt"), "", nil)
_, _, _, err = downloadHTTP(ctx, nil, nil, transfers[0], filepath.Join(t.TempDir(), "test.txt"), -1, "", nil)
finishedChannel <- true
}()

Expand Down Expand Up @@ -258,7 +259,7 @@ func TestStoppedTransfer(t *testing.T) {
var err error

go func() {
_, _, _, err = downloadHTTP(ctx, nil, nil, transfers[0], filepath.Join(t.TempDir(), "test.txt"), "", nil)
_, _, _, err = downloadHTTP(ctx, nil, nil, transfers[0], filepath.Join(t.TempDir(), "test.txt"), -1, "", nil)
finishedChannel <- true
}()

Expand Down Expand Up @@ -290,7 +291,7 @@ func TestConnectionError(t *testing.T) {
addr := l.Addr().String()
l.Close()

_, _, _, err = downloadHTTP(ctx, nil, nil, transferAttemptDetails{Url: &url.URL{Host: addr, Scheme: "http"}, Proxy: false}, filepath.Join(t.TempDir(), "test.txt"), "", nil)
_, _, _, err = downloadHTTP(ctx, nil, nil, transferAttemptDetails{Url: &url.URL{Host: addr, Scheme: "http"}, Proxy: false}, filepath.Join(t.TempDir(), "test.txt"), -1, "", nil)

assert.IsType(t, &ConnectionSetupError{}, err)

Expand Down Expand Up @@ -325,7 +326,7 @@ func TestTrailerError(t *testing.T) {
assert.Equal(t, svr.URL, transfers[0].Url.String())

// Call DownloadHTTP and check if the error is returned correctly
_, _, _, err := downloadHTTP(ctx, nil, nil, transfers[0], filepath.Join(t.TempDir(), "test.txt"), "", nil)
_, _, _, err := downloadHTTP(ctx, nil, nil, transfers[0], filepath.Join(t.TempDir(), "test.txt"), -1, "", nil)

assert.NotNil(t, err)
assert.EqualError(t, err, "transfer error: Unable to read test.txt; input/output error")
Expand Down Expand Up @@ -388,3 +389,65 @@ func TestFailedUpload(t *testing.T) {
assert.Fail(t, "Timeout while waiting for response")
}
}

func TestSortAttempts(t *testing.T) {
ctx, cancel, _ := test_utils.TestContext(context.Background(), t)

neverRespond := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
select {
case <-ctx.Done():
case <-ticker.C:
}
})
alwaysRespond := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "HEAD" {
w.Header().Set("Content-Length", "42")
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
})
svr1 := httptest.NewServer(neverRespond)
defer svr1.Close()
url1, err := url.Parse(svr1.URL)
require.NoError(t, err)
attempt1 := transferAttemptDetails{Url: url1}

svr2 := httptest.NewServer(alwaysRespond)
defer svr2.Close()
url2, err := url.Parse(svr2.URL)
require.NoError(t, err)
attempt2 := transferAttemptDetails{Url: url2}

svr3 := httptest.NewServer(alwaysRespond)
defer svr3.Close()
url3, err := url.Parse(svr3.URL)
require.NoError(t, err)
attempt3 := transferAttemptDetails{Url: url3}

defer cancel()

size, results := sortAttempts(ctx, "/path", []transferAttemptDetails{attempt1, attempt2, attempt3})
assert.Equal(t, int64(42), size)
assert.Equal(t, svr2.URL, results[0].Url.String())
assert.Equal(t, svr3.URL, results[1].Url.String())
assert.Equal(t, svr1.URL, results[2].Url.String())

size, results = sortAttempts(ctx, "/path", []transferAttemptDetails{attempt2, attempt3, attempt1})
assert.Equal(t, int64(42), size)
assert.Equal(t, svr2.URL, results[0].Url.String())
assert.Equal(t, svr3.URL, results[1].Url.String())
assert.Equal(t, svr1.URL, results[2].Url.String())

size, results = sortAttempts(ctx, "/path", []transferAttemptDetails{attempt1, attempt1})
assert.Equal(t, int64(-1), size)
assert.Equal(t, svr1.URL, results[0].Url.String())
assert.Equal(t, svr1.URL, results[1].Url.String())

size, results = sortAttempts(ctx, "/path", []transferAttemptDetails{attempt2, attempt3})
assert.Equal(t, int64(42), size)
assert.Equal(t, svr2.URL, results[0].Url.String())
assert.Equal(t, svr3.URL, results[1].Url.String())
}

0 comments on commit 316cc2e

Please sign in to comment.