Skip to content

Commit

Permalink
bus: rework uploading sectors cache
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Aug 11, 2023
1 parent da7590b commit 92ae050
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 204 deletions.
19 changes: 17 additions & 2 deletions api/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ var (
// ErrSettingNotFound is returned if a requested setting is not present in the
// database.
ErrSettingNotFound = errors.New("setting not found")

// ErrUploadAlreadyExists is returned when starting an upload with an id
// that's already in use.
ErrUploadAlreadyExists = errors.New("upload already exists")

// ErrUnknownUpload is returned when adding sectors for an upload id that's
// not known.
ErrUnknownUpload = errors.New("unknown upload")
)

// ArchiveContractsRequest is the request type for the /contracts/archive endpoint.
Expand Down Expand Up @@ -90,8 +98,8 @@ type ContractsIDAddRequest struct {
TotalCost types.Currency `json:"totalCost"`
}

// UploadsAddSectorRequest is the request type for the /uploads/:id endpoint.
type UploadsAddSectorRequest struct {
// UploadSectorRequest is the request type for the /upload/:id/sector endpoint.
type UploadSectorRequest struct {
ContractID types.FileContractID `json:"contractID"`
Root types.Hash256 `json:"root"`
}
Expand All @@ -105,6 +113,13 @@ type ContractsIDRenewedRequest struct {
TotalCost types.Currency `json:"totalCost"`
}

// ContractRootsResponse is the responser type for the /contract/:id/roots
// endpoint.
type ContractRootsResponse struct {
Roots []types.Hash256 `json:"roots"`
Uploading []types.Hash256 `json:"uploading"`
}

// ContractAcquireRequest is the request type for the /contract/acquire
// endpoint.
type ContractAcquireRequest struct {
Expand Down
33 changes: 7 additions & 26 deletions api/params.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package api

import (
"encoding/hex"
"errors"
"fmt"
"net/url"
"strconv"
"time"

"go.sia.tech/core/types"
"lukechampine.com/frand"
)

type (
Expand All @@ -35,9 +34,14 @@ type (
SlabID uint

// UploadID identifies an ongoing upload.
UploadID [8]byte
UploadID types.Specifier
)

func NewUploadID() (uID UploadID) {
frand.Read(uID[:])
return
}

// String implements fmt.Stringer.
func (c ParamCurrency) String() string { return types.Currency(c).ExactString() }

Expand Down Expand Up @@ -140,26 +144,3 @@ func (sid *SlabID) LoadString(s string) (err error) {
func (sid SlabID) String() string {
return fmt.Sprint(uint8(sid))
}

// String implements fmt.Stringer.
func (uID UploadID) String() string {
return hex.EncodeToString(uID[:])
}

// MarshalText implements encoding.TextMarshaler.
func (uID UploadID) MarshalText() ([]byte, error) {
return []byte(uID.String()), nil
}

// UnmarshalText implements encoding.TextUnmarshaler.
func (uID *UploadID) UnmarshalText(b []byte) error {
b, err := hex.DecodeString(string(b))
if err != nil {
return err
} else if len(b) != 8 {
return errors.New("invalid length")
}

copy(uID[:], b)
return nil
}
59 changes: 35 additions & 24 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ type bus struct {

eas EphemeralAccountStore

logger *zap.SugaredLogger
accounts *accounts
contractLocks *contractLocks
sectorsCache *uploadedSectorsCache
logger *zap.SugaredLogger
accounts *accounts
contractLocks *contractLocks
uploadingSectors *uploadingSectorsCache
}

func (b *bus) consensusAcceptBlock(jc jape.Context) {
Expand Down Expand Up @@ -771,7 +771,10 @@ func (b *bus) contractIDRootsHandlerGET(jc jape.Context) {

roots, err := b.ms.ContractRoots(jc.Request.Context(), id)
if jc.Check("couldn't fetch contract sectors", err) == nil {
jc.Encode(append(roots, b.sectorsCache.cachedSectors(id)...))
jc.Encode(api.ContractRootsResponse{
Roots: roots,
Uploading: b.uploadingSectors.sectors(id),
})
}
}

Expand Down Expand Up @@ -1347,41 +1350,48 @@ func (b *bus) contractTaxHandlerGET(jc jape.Context) {
jc.Encode(cs.FileContractTax(types.FileContract{Payout: payout}))
}

func (b *bus) uploadHandlerPOST(jc jape.Context) {
func (b *bus) uploadTrackHandlerPOST(jc jape.Context) {
var id api.UploadID
if jc.DecodeParam("id", &id) == nil {
jc.Check("failed to track upload", b.uploadingSectors.trackUpload(id))
}
}

func (b *bus) uploadAddSectorHandlerPOST(jc jape.Context) {
var id api.UploadID
if jc.DecodeParam("id", &id) != nil {
return
}
var req api.UploadsAddSectorRequest
var req api.UploadSectorRequest
if jc.Decode(&req) != nil {
return
}
b.sectorsCache.addUploadedSector(id, req.ContractID, req.Root)
b.uploadingSectors.addUploadingSector(id, req.ContractID, req.Root)
}

func (b *bus) uploadFinishedHandlerDELETE(jc jape.Context) {
var id api.UploadID
if jc.DecodeParam("id", &id) == nil {
b.sectorsCache.finishUpload(id)
b.uploadingSectors.finishUpload(id)
}
}

// New returns a new Bus.
func New(s Syncer, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, l *zap.Logger) (*bus, error) {
b := &bus{
alerts: alerts.NewManager(),
s: s,
cm: cm,
tp: tp,
w: w,
hdb: hdb,
as: as,
ms: ms,
ss: ss,
eas: eas,
contractLocks: newContractLocks(),
sectorsCache: newUploadedSectorsCache(),
logger: l.Sugar().Named("bus"),
alerts: alerts.NewManager(),
s: s,
cm: cm,
tp: tp,
w: w,
hdb: hdb,
as: as,
ms: ms,
ss: ss,
eas: eas,
contractLocks: newContractLocks(),
uploadingSectors: newUploadingSectorsCache(),
logger: l.Sugar().Named("bus"),
}
ctx, span := tracing.Tracer.Start(context.Background(), "bus.New")
defer span.End()
Expand Down Expand Up @@ -1561,8 +1571,9 @@ func (b *bus) Handler() http.Handler {
"PUT /setting/:key": b.settingKeyHandlerPUT,
"DELETE /setting/:key": b.settingKeyHandlerDELETE,

"POST /upload/:id": b.uploadHandlerPOST,
"POST /upload/:id/finished": b.uploadFinishedHandlerDELETE,
"POST /upload/:id": b.uploadTrackHandlerPOST,
"POST /upload/:id/sector": b.uploadAddSectorHandlerPOST,
"DELETE /upload/:id": b.uploadFinishedHandlerDELETE,
}))
}

Expand Down
31 changes: 20 additions & 11 deletions bus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,14 @@ func (c *Client) Contract(ctx context.Context, id types.FileContractID) (contrac
return
}

// ContractRoots returns the roots of the sectors for the contract with given
// id.
func (c *Client) ContractRoots(ctx context.Context, fcid types.FileContractID) (roots []types.Hash256, err error) {
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/contract/%s/roots", fcid), &roots)
return
// ContractRoots returns the sector roots, as well as the ones that are still
// uploading, for the contract with given id.
func (c *Client) ContractRoots(ctx context.Context, fcid types.FileContractID) (roots, uploading []types.Hash256, err error) {
var resp api.ContractRootsResponse
if err = c.c.WithContext(ctx).GET(fmt.Sprintf("/contract/%s/roots", fcid), &resp); err != nil {
return
}
return resp.Roots, resp.Uploading, nil
}

// ContractSets returns the contract sets of the bus.
Expand Down Expand Up @@ -727,18 +730,24 @@ func (c *Client) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.Upload
return
}

// AddUploadedSector signals the bus a sector was uploaded under a given contract ID.
func (c *Client) AddUploadedSector(ctx context.Context, uID api.UploadID, id types.FileContractID, root types.Hash256) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/uploads/%s", uID), api.UploadsAddSectorRequest{
// TrackUpload tracks the upload with given id in the bus.
func (c *Client) TrackUpload(ctx context.Context, uID api.UploadID) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/upload/%s", uID), nil, nil)
return
}

// AddUploadingSector adds the given sector to the upload with given id.
func (c *Client) AddUploadingSector(ctx context.Context, uID api.UploadID, id types.FileContractID, root types.Hash256) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/upload/%s/sector", uID), api.UploadSectorRequest{
ContractID: id,
Root: root,
}, nil)
return
}

// MarkUploadFinished marks the given upload as finished.
func (c *Client) MarkUploadFinished(ctx context.Context, uID api.UploadID) (err error) {
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/uploads/%s", uID))
// FinishUpload marks the given upload as finished.
func (c *Client) FinishUpload(ctx context.Context, uID api.UploadID) (err error) {
err = c.c.WithContext(ctx).DELETE(fmt.Sprintf("/upload/%s", uID))
return
}

Expand Down
60 changes: 0 additions & 60 deletions bus/uploadedsectors.go

This file was deleted.

56 changes: 0 additions & 56 deletions bus/uploadedsectors_test.go

This file was deleted.

Loading

0 comments on commit 92ae050

Please sign in to comment.