Skip to content
This repository has been archived by the owner on Nov 24, 2022. It is now read-only.

Commit

Permalink
add est-download-speed to make data fetching timeout adjustable
Browse files Browse the repository at this point in the history
Signed-off-by: Merlin Ran <[email protected]>
  • Loading branch information
merlinran committed Aug 13, 2021
1 parent 5b1cff5 commit 7a1d01b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func init() {
DefValue: 24 * time.Hour,
Description: "The timeout before discarding deal with no progress",
},
{
Name: "est-download-speed",
DefValue: "5MB",
Description: `The estimated download speed per second, to govern the timeouts downloading CAR files.
Be conservative to leave enough room for network instability.`,
},
{
Name: "running-bytes-limit",
DefValue: "",
Expand Down Expand Up @@ -312,8 +318,10 @@ var daemonCmd = &cobra.Command{
dealDataDirectory = filepath.Join(defaultConfigPath, "deal_data")
}

var bytesLimiter limiter.Limiter = limiter.NopeLimiter{}
estDownloadSpeed, err := humanize.ParseBytes(v.GetString("est-download-speed"))
common.CheckErrf("parsing est-download-speed: %v", err)

var bytesLimiter limiter.Limiter = limiter.NopeLimiter{}
if limit := v.GetString("running-bytes-limit"); limit != "" {
lim, err := parseRunningBytesLimit(limit)
common.CheckErrf(fmt.Sprintf("parsing '%s': %%w", limit), err)
Expand Down Expand Up @@ -344,6 +352,7 @@ var daemonCmd = &cobra.Command{
},
},
BytesLimiter: bytesLimiter,
EstDownloadSpeed: estDownloadSpeed,
SealingSectorsLimit: v.GetInt("sealing-sectors-limit"),
}
serv, err := service.New(config, store, lc, fc)
Expand Down
4 changes: 3 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
BidParams BidParams
AuctionFilters AuctionFilters
BytesLimiter limiter.Limiter
EstDownloadSpeed uint64
SealingSectorsLimit int
}

Expand Down Expand Up @@ -182,6 +183,7 @@ func New(
conf.BidParams.DealDataFetchAttempts,
conf.BidParams.DiscardOrphanDealsAfter,
conf.BytesLimiter,
conf.EstDownloadSpeed,
)
if err != nil {
return nil, fin.Cleanupf("creating bid store: %v", err)
Expand Down Expand Up @@ -297,7 +299,7 @@ func (s *Service) GetBid(id auction.BidID) (*bidstore.Bid, error) {

// WriteDataURI writes a data uri resource to the configured deal data directory.
func (s *Service) WriteDataURI(payloadCid, uri string) (string, error) {
return s.store.WriteDataURI("", payloadCid, uri)
return s.store.WriteDataURI("", payloadCid, uri, 0)
}

func (s *Service) eventHandler(from core.ID, topic string, msg []byte) {
Expand Down
19 changes: 14 additions & 5 deletions service/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ var (
// DataURIFetchStartDelay is the time delay before the store will process queued data uri fetches on start.
DataURIFetchStartDelay = time.Second * 10

// DataURIFetchTimeout is the timeout used when fetching data uris.
DataURIFetchTimeout = time.Hour * 3
// DefaultDataURIFetchTimeout is the timeout used when fetching data uris of unknown size.
DefaultDataURIFetchTimeout = time.Hour * 10

// MaxDataURIFetchConcurrency is the maximum number of data uri fetches that will be handled concurrently.
MaxDataURIFetchConcurrency = 10
Expand Down Expand Up @@ -157,6 +157,7 @@ type Store struct {

dealDataDirectory string
dealDataFetchAttempts uint32
estDownloadSpeed uint64

ctx context.Context
cancel context.CancelFunc
Expand All @@ -174,6 +175,7 @@ func NewStore(
dealDataFetchAttempts uint32,
discardOrphanDealsAfter time.Duration,
bytesLimiter limiter.Limiter,
estDownloadSpeed uint64,
) (*Store, error) {
ctx, cancel := context.WithCancel(context.Background())
s := &Store{
Expand All @@ -186,6 +188,7 @@ func NewStore(
tickCh: make(chan struct{}, MaxDataURIFetchConcurrency),
dealDataDirectory: dealDataDirectory,
dealDataFetchAttempts: dealDataFetchAttempts,
estDownloadSpeed: estDownloadSpeed,
ctx: ctx,
cancel: cancel,
}
Expand Down Expand Up @@ -467,13 +470,13 @@ func (s *Store) ListBids(query Query) ([]*Bid, error) {
// WriteDealData writes the deal data to the configured deal data directory.
func (s *Store) WriteDealData(b *Bid) (string, error) {
if b.Sources.CARURL != nil {
return s.WriteDataURI(b.ID, b.PayloadCid.String(), b.Sources.CARURL.URL.String())
return s.WriteDataURI(b.ID, b.PayloadCid.String(), b.Sources.CARURL.URL.String(), b.DealSize)
}
return "", errors.New("not implemented")
}

// WriteDataURI writes the uri resource to the configured deal data directory.
func (s *Store) WriteDataURI(bidID auction.BidID, payloadCid, uri string) (string, error) {
func (s *Store) WriteDataURI(bidID auction.BidID, payloadCid, uri string, size uint64) (string, error) {
duri, err := datauri.NewURI(payloadCid, uri)
if err != nil {
return "", fmt.Errorf("parsing data uri: %w", err)
Expand All @@ -491,7 +494,13 @@ func (s *Store) WriteDataURI(bidID auction.BidID, payloadCid, uri string) (strin
if _, err := f.Seek(0, 0); err != nil {
return "", fmt.Errorf("seeking file to the beginning: %v", err)
}
ctx, cancel := context.WithTimeout(s.ctx, DataURIFetchTimeout)
fetchTimeout := DefaultDataURIFetchTimeout
if size != 0 {
// add baseline timeout for very small files
fetchTimeout = time.Minute + time.Duration(size/s.estDownloadSpeed)*time.Second
}
log.Debugf("fetching %s with timeout of %v", uri, fetchTimeout)
ctx, cancel := context.WithTimeout(s.ctx, fetchTimeout)
defer cancel()
if err := duri.Write(ctx, f); err != nil {
return "", fmt.Errorf("writing data uri %s: %w", uri, err)
Expand Down
4 changes: 1 addition & 3 deletions service/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ func init() {
}); err != nil {
panic(err)
}

DataURIFetchTimeout = time.Second * 5
}

func TestStore_ListBids(t *testing.T) {
Expand Down Expand Up @@ -260,7 +258,7 @@ func newStore(t *testing.T) (*Store, format.DAGService, blockstore.Blockstore) {
PrivKey: sk,
})
require.NoError(t, err)
s, err := NewStore(ds, p.Host(), p.DAGService(), newLotusClientMock(), t.TempDir(), 2, 0, limiter.NopeLimiter{})
s, err := NewStore(ds, p.Host(), p.DAGService(), newLotusClientMock(), t.TempDir(), 2, 0, limiter.NopeLimiter{}, 1<<30)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, s.Close())
Expand Down

0 comments on commit 7a1d01b

Please sign in to comment.