Skip to content

Commit

Permalink
worker: fix pt cache (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan authored Dec 6, 2023
1 parent 941b3dd commit e876aa5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 44 deletions.
60 changes: 30 additions & 30 deletions worker/rhpv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
// messages.
defaultWithdrawalExpiryBlocks = 6

// maxPriceTableSize defines the maximum size of a price table
maxPriceTableSize = 16 * 1024

// responseLeeway is the amount of leeway given to the maxLen when we read
// the response in the ReadSector RPC
responseLeeway = 1 << 12 // 4 KiB
Expand Down Expand Up @@ -953,24 +956,14 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis
// fetchPT is a helper function that performs the RPC given a payment function
fetchPT := func(paymentFn PriceTablePaymentFunc) (hpt hostdb.HostPriceTable, err error) {
err = h.transportPool.withTransportV3(ctx, h.HostKey(), h.siamuxAddr, func(ctx context.Context, t *transportV3) (err error) {
defer func() {
InteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{
HostKey: h.HostKey(),
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
PriceTable: hpt,
})
}()

pt, err := RPCPriceTable(ctx, t, paymentFn)
if err != nil {
return err
}
hpt = hostdb.HostPriceTable{
HostPriceTable: pt,
Expiry: time.Now().Add(pt.Validity),
}
return nil
hpt, err = RPCPriceTable(ctx, t, paymentFn)
InteractionRecorderFromContext(ctx).RecordPriceTableUpdate(hostdb.PriceTableUpdate{
HostKey: h.HostKey(),
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
PriceTable: hpt,
})
return
})
return
}
Expand All @@ -989,33 +982,40 @@ func (h *host) FetchPriceTable(ctx context.Context, rev *types.FileContractRevis
}

// RPCPriceTable calls the UpdatePriceTable RPC.
func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePaymentFunc) (pt rhpv3.HostPriceTable, err error) {
func RPCPriceTable(ctx context.Context, t *transportV3, paymentFunc PriceTablePaymentFunc) (_ hostdb.HostPriceTable, err error) {
defer wrapErr(&err, "PriceTable")
start := time.Now()

s, err := t.DialStream(ctx)
if err != nil {
return rhpv3.HostPriceTable{}, err
return hostdb.HostPriceTable{}, err
}
defer s.Close()

const maxPriceTableSize = 16 * 1024
var pt rhpv3.HostPriceTable
var ptr rhpv3.RPCUpdatePriceTableResponse
if err := s.WriteRequest(rhpv3.RPCUpdatePriceTableID, nil); err != nil {
return rhpv3.HostPriceTable{}, fmt.Errorf("couldn't send RPCUpdatePriceTableID: %w (%v)", err, time.Since(start))
return hostdb.HostPriceTable{}, fmt.Errorf("couldn't send RPCUpdatePriceTableID: %w", err)
} else if err := s.ReadResponse(&ptr, maxPriceTableSize); err != nil {
return rhpv3.HostPriceTable{}, fmt.Errorf("couldn't read RPCUpdatePriceTableResponse: %w (%v)", err, time.Since(start))
return hostdb.HostPriceTable{}, fmt.Errorf("couldn't read RPCUpdatePriceTableResponse: %w", err)
} else if err := json.Unmarshal(ptr.PriceTableJSON, &pt); err != nil {
return rhpv3.HostPriceTable{}, fmt.Errorf("couldn't unmarshal price table: %w (%v)", err, time.Since(start))
return hostdb.HostPriceTable{}, fmt.Errorf("couldn't unmarshal price table: %w", err)
} else if payment, err := paymentFunc(pt); err != nil {
return rhpv3.HostPriceTable{}, fmt.Errorf("couldn't create payment: %w (%v)", err, time.Since(start))
return hostdb.HostPriceTable{}, fmt.Errorf("couldn't create payment: %w", err)
} else if payment == nil {
return pt, nil // intended not to pay
return hostdb.HostPriceTable{
HostPriceTable: pt,
Expiry: time.Now().Add(pt.Validity),
}, nil // intended not to pay
} else if err := processPayment(s, payment); err != nil {
return rhpv3.HostPriceTable{}, fmt.Errorf("couldn't process payment: %w (%v)", err, time.Since(start))
return hostdb.HostPriceTable{}, fmt.Errorf("couldn't process payment: %w", err)
} else if err := s.ReadResponse(&rhpv3.RPCPriceTableResponse{}, 0); err != nil {
return rhpv3.HostPriceTable{}, fmt.Errorf("couldn't read RPCPriceTableResponse: %w (%v)", err, time.Since(start))
return hostdb.HostPriceTable{}, fmt.Errorf("couldn't read RPCPriceTableResponse: %w", err)
} else {
return hostdb.HostPriceTable{
HostPriceTable: pt,
Expiry: time.Now().Add(pt.Validity),
}, nil
}
return pt, nil
}

// RPCAccountBalance calls the AccountBalance RPC.
Expand Down
25 changes: 11 additions & 14 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,8 @@ func (w *worker) rhpPriceTableHandler(jc jape.Context) {
}()

err = w.transportPoolV3.withTransportV3(ctx, rptr.HostKey, rptr.SiamuxAddr, func(ctx context.Context, t *transportV3) error {
if pt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil {
return err
} else {
hpt = hostdb.HostPriceTable{
HostPriceTable: pt,
Expiry: time.Now().Add(pt.Validity),
}
}
return nil
hpt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil })
return err
})

if jc.Check("could not get price table", err) != nil {
Expand Down Expand Up @@ -1567,15 +1560,15 @@ func (w *worker) acquireContractLock(ctx context.Context, fcid types.FileContrac
return newContractLock(fcid, lockID, w.contractLockingDuration, w.bus, w.logger), nil
}

func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP string) (settings rhpv2.HostSettings, hpt rhpv3.HostPriceTable, elapsed time.Duration, err error) {
func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP string) (settings rhpv2.HostSettings, pt rhpv3.HostPriceTable, elapsed time.Duration, err error) {
// record host scan
defer func() {
InteractionRecorderFromContext(ctx).RecordHostScan(hostdb.HostScan{
HostKey: hostKey,
Success: isSuccessfulInteraction(err),
Timestamp: time.Now(),
Settings: settings,
PriceTable: hpt,
PriceTable: pt,
})
}()

Expand Down Expand Up @@ -1610,9 +1603,13 @@ func (w *worker) scanHost(ctx context.Context, hostKey types.PublicKey, hostIP s

// fetch the host pricetable
if err == nil {
err = w.transportPoolV3.withTransportV3(ctx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) (err error) {
hpt, err = RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil })
return err
err = w.transportPoolV3.withTransportV3(ctx, hostKey, settings.SiamuxAddr(), func(ctx context.Context, t *transportV3) error {
if hpt, err := RPCPriceTable(ctx, t, func(pt rhpv3.HostPriceTable) (rhpv3.PaymentMethod, error) { return nil, nil }); err != nil {
return err
} else {
pt = hpt.HostPriceTable
return nil
}
})
}
return
Expand Down

0 comments on commit e876aa5

Please sign in to comment.