Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Snap Deals #52

Merged
merged 37 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
19305f6
Move tarutil here, Add fin constraints
magik6k Jun 11, 2024
950e50f
Start adding snap tasks
magik6k Jun 11, 2024
e677c06
Plumb tasks
magik6k Jun 11, 2024
fa9874d
Move Deal Data getter from TreeD to a shared pkg
magik6k Jun 11, 2024
f2e1e6c
Get deal data to EncodeInto
magik6k Jun 11, 2024
2ab2619
Get build somewhat passing
magik6k Jun 11, 2024
253cf47
make gen
magik6k Jun 11, 2024
85a3f14
backport fillec calc fix to dealdata
magik6k Jun 12, 2024
ffc00a4
prep for snap piece ingester
magik6k Jun 12, 2024
6cd5008
start impl for snap deal ingest
magik6k Jun 13, 2024
0ff9aa2
snap piece ingest: Convert f05 deals to snap
magik6k Jun 13, 2024
2b6333b
wire up snap
magik6k Jun 14, 2024
9dba44b
fix snap schema
magik6k Jun 17, 2024
adf0dd3
fix types in snap ingest
magik6k Jun 17, 2024
d0167be
sector metadata crawl task
magik6k Jun 17, 2024
77f2067
webui for open sectors
magik6k Jun 17, 2024
0c49cb5
Make the Seal Now button on the deals page work
magik6k Jun 17, 2024
29b63c0
webui: Simple upgrades page
magik6k Jun 17, 2024
0567c07
webui: Button to retry snap pipeline processing
magik6k Jun 17, 2024
4d01c33
mostly working EncodeInto
magik6k Jun 18, 2024
a075a49
use ffiselect for EncodeUpdate
magik6k Jun 18, 2024
24086ee
Mostly implemented prove, cleanup in Encode
magik6k Jun 18, 2024
cdfac6d
Upgrade Submit
magik6k Jun 18, 2024
4be9869
Create unsealed copy in encode when requested
magik6k Jun 19, 2024
8ceacae
MoveStorageSnap
magik6k Jun 19, 2024
671208f
snap pipeline GC
magik6k Jun 19, 2024
f3e1d51
track snap message success
magik6k Jun 19, 2024
871f0ed
lmrpc: Fix sector status rpc
magik6k Jun 20, 2024
f946b27
fix gen
magik6k Jun 21, 2024
e5f12a0
wait for params in snap prove
magik6k Jun 21, 2024
03b1208
post-rebase fixes
magik6k Jul 15, 2024
aff115d
address review
magik6k Jul 15, 2024
c16c1e4
Constructor for UrlPieceReader
magik6k Jul 17, 2024
5f074cc
Merge remote-tracking branch 'origin/main' into feat/snap
magik6k Jul 17, 2024
9fd9cc3
spid getters for snap
magik6k Jul 17, 2024
5dbcacc
tx safety
magik6k Jul 17, 2024
6dd4b6e
fix bored with no commit add, don't panic in prove task
magik6k Jul 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 3 additions & 79 deletions cmd/curio/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ import (
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/lib/reqcontext"
"github.com/filecoin-project/curio/market"
"github.com/filecoin-project/curio/market/lmrpc"

"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
)

var marketCmd = &cli.Command{
Expand Down Expand Up @@ -96,6 +93,7 @@ var marketSealCmd = &cli.Command{
Value: false, // todo implement synthetic
},
},
ArgsUsage: "<sector>",
Action: func(cctx *cli.Context) error {
act, err := address.NewFromString(cctx.String("actor"))
if err != nil {
Expand All @@ -119,80 +117,6 @@ var marketSealCmd = &cli.Command{
return err
}

mid, err := address.IDFromAddress(act)
if err != nil {
return xerrors.Errorf("getting miner id: %w", err)
}

mi, err := dep.Chain.StateMinerInfo(ctx, act, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}

nv, err := dep.Chain.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting network version: %w", err)
}

wpt := mi.WindowPoStProofType
spt, err := miner.PreferredSealProofTypeFromWindowPoStType(nv, wpt, cctx.Bool("synthetic"))
if err != nil {
return xerrors.Errorf("getting seal proof type: %w", err)
}

comm, err := dep.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// Get current open sector pieces from DB
var pieces []struct {
Sector abi.SectorNumber `db:"sector_number"`
Size abi.PaddedPieceSize `db:"piece_size"`
Index uint64 `db:"piece_index"`
}
err = tx.Select(&pieces, `
SELECT
sector_number,
piece_size,
piece_index
FROM
open_sector_pieces
WHERE
sp_id = $1 AND sector_number = $2
ORDER BY
piece_index DESC;`, mid, sector)
if err != nil {
return false, xerrors.Errorf("getting open sectors from DB: %w", err)
}

if len(pieces) < 1 {
return false, xerrors.Errorf("sector %d is not waiting to be sealed", sector)
}

cn, err := tx.Exec(`INSERT INTO sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) VALUES ($1, $2, $3);`, mid, sector, spt)

if err != nil {
return false, xerrors.Errorf("adding sector to pipeline: %w", err)
}

if cn != 1 {
return false, xerrors.Errorf("incorrect number of rows returned")
}

_, err = tx.Exec("SELECT transfer_and_delete_open_piece($1, $2)", mid, sector)
if err != nil {
return false, xerrors.Errorf("adding sector to pipeline: %w", err)
}

return true, nil

}, harmonydb.OptionRetry())

if err != nil {
return xerrors.Errorf("start sealing sector: %w", err)
}

if !comm {
return xerrors.Errorf("start sealing sector: commit failed")
}

return nil
return market.SealNow(ctx, dep.Chain, dep.DB, act, abi.SectorNumber(sector), cctx.Bool("synthetic"))
},
}
8 changes: 4 additions & 4 deletions cmd/curio/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/lib/repo"
"github.com/filecoin-project/curio/market"
"github.com/filecoin-project/curio/web"

lapi "github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -161,8 +160,9 @@ func (p *CurioAPI) StorageStat(ctx context.Context, id storiface.ID) (fsutil.FsS
return p.Stor.FsStat(ctx, id)
}

// this method is currently unused, might be back when we get markets into curio
func (p *CurioAPI) AllocatePieceToSector(ctx context.Context, maddr address.Address, piece piece.PieceDealInfo, rawSize int64, source url.URL, header http.Header) (lapi.SectorOffset, error) {
di, err := market.NewPieceIngester(ctx, p.Deps.DB, p.Deps.Chain, maddr, true, time.Minute)
/*di, err := market.NewPieceIngester(ctx, p.Deps.DB, p.Deps.Chain, maddr, true, time.Minute)
if err != nil {
return lapi.SectorOffset{}, xerrors.Errorf("failed to create a piece ingestor")
}
Expand All @@ -176,8 +176,8 @@ func (p *CurioAPI) AllocatePieceToSector(ctx context.Context, maddr address.Addr
if err != nil {
return lapi.SectorOffset{}, xerrors.Errorf("failed to start sealing the sector %d for actor %s", sector.Sector, maddr)
}

return sector, nil
*/
return lapi.SectorOffset{}, xerrors.Errorf("not implemented")
}

// Trigger shutdown
Expand Down
2 changes: 1 addition & 1 deletion cmd/curio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ var runCmd = &cli.Command{
taskEngine, err := tasks.StartTasks(ctx, dependencies)

if err != nil {
return nil
return xerrors.Errorf("starting tasks: %w", err)
}
defer taskEngine.GracefullyTerminate()

Expand Down
31 changes: 26 additions & 5 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/tasks/gc"
"github.com/filecoin-project/curio/tasks/message"
"github.com/filecoin-project/curio/tasks/metadata"
piece2 "github.com/filecoin-project/curio/tasks/piece"
"github.com/filecoin-project/curio/tasks/seal"
"github.com/filecoin-project/curio/tasks/snap"
window2 "github.com/filecoin-project/curio/tasks/window"
"github.com/filecoin-project/curio/tasks/winning"

Expand Down Expand Up @@ -166,7 +168,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
cfg.Subsystems.EnableSendPrecommitMsg ||
cfg.Subsystems.EnablePoRepProof ||
cfg.Subsystems.EnableMoveStorage ||
cfg.Subsystems.EnableSendCommitMsg
cfg.Subsystems.EnableSendCommitMsg ||
cfg.Subsystems.EnableUpdateEncode ||
cfg.Subsystems.EnableUpdateProve ||
cfg.Subsystems.EnableUpdateSubmit
if hasAnySealingTask {
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore)
if err != nil {
Expand Down Expand Up @@ -218,7 +223,7 @@ func addSealingTasks(
asyncParams func() func() (bool, error), si paths.SectorIndex, stor *paths.Remote,
bstore curiochain.CurioBlockstore) ([]harmonytask.TaskInterface, error) {
var activeTasks []harmonytask.TaskInterface
// Sealing
// Sealing / Snap

var sp *seal.SealPoller
var slr *ffi.SealCalls
Expand Down Expand Up @@ -250,22 +255,38 @@ func addSealingTasks(
}
if cfg.Subsystems.EnableMoveStorage {
moveStorageTask := seal.NewMoveStorageTask(sp, slr, db, cfg.Subsystems.MoveStorageMaxTasks)
activeTasks = append(activeTasks, moveStorageTask)
moveStorageSnapTask := snap.NewMoveStorageTask(slr, db, cfg.Subsystems.MoveStorageMaxTasks)
activeTasks = append(activeTasks, moveStorageTask, moveStorageSnapTask)
}
if cfg.Subsystems.EnableSendCommitMsg {
commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg)
activeTasks = append(activeTasks, commitTask)
}

if cfg.Subsystems.EnableUpdateEncode {
encodeTask := snap.NewEncodeTask(slr, db, cfg.Subsystems.UpdateEncodeMaxTasks)
activeTasks = append(activeTasks, encodeTask)
}
if cfg.Subsystems.EnableUpdateProve {
proveTask := snap.NewProveTask(slr, db, asyncParams(), cfg.Subsystems.UpdateProveMaxTasks)
activeTasks = append(activeTasks, proveTask)
}
if cfg.Subsystems.EnableUpdateSubmit {
submitTask := snap.NewSubmitTask(db, full, bstore, sender, as, cfg)
activeTasks = append(activeTasks, submitTask)
}
activeTasks = lo.Reverse(activeTasks)

if hasAnySealingTask {
// Sealing nodes maintain storage index when bored
storageEndpointGcTask := gc.NewStorageEndpointGC(si, stor, db)
sdrPipelineGcTask := gc.NewSDRPipelineGC(db)
pipelineGcTask := gc.NewPipelineGC(db)
storageGcMarkTask := gc.NewStorageGCMark(si, stor, db, bstore, full)
storageGcSweepTask := gc.NewStorageGCSweep(db, stor, si)

activeTasks = append(activeTasks, storageEndpointGcTask, sdrPipelineGcTask, storageGcMarkTask, storageGcSweepTask)
sectorMetadataTask := metadata.NewSectorMetadataTask(db, bstore, full)

activeTasks = append(activeTasks, storageEndpointGcTask, pipelineGcTask, storageGcMarkTask, storageGcSweepTask, sectorMetadataTask)
}

return activeTasks, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/curio/test-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0)

for maddr := range deps.Maddrs {
out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition"))
out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition"), true)
if err != nil {
fmt.Println("Error computing WindowPoSt for miner", maddr, err)
continue
Expand Down
40 changes: 40 additions & 0 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,24 @@ type CurioSubsystemsConfig struct {
// uses all available network (or disk) bandwidth on the machine without causing bottlenecks.
MoveStorageMaxTasks int

// EnableUpdateEncode enables the encoding step of the SnapDeal process on this curio instance.
// This step involves encoding the data into the sector and computing updated TreeR (uses gpu).
EnableUpdateEncode bool

// EnableUpdateProve enables the proving step of the SnapDeal process on this curio instance.
// This step generates the snark proof for the updated sector.
EnableUpdateProve bool

// EnableUpdateSubmit enables the submission of SnapDeal proofs to the blockchain from this curio instance.
// This step submits the generated proofs to the chain.
EnableUpdateSubmit bool

// UpdateEncodeMaxTasks sets the maximum number of concurrent SnapDeal encoding tasks that can run on this instance.
UpdateEncodeMaxTasks int

// UpdateProveMaxTasks sets the maximum number of concurrent SnapDeal proving tasks that can run on this instance.
UpdateProveMaxTasks int

// BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests.
// This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations.
// Strings should be in the format "actor:ip:port". IP cannot be 0.0.0.0. We recommend using a private IP.
Expand Down Expand Up @@ -400,6 +418,10 @@ type CurioIngestConfig struct {

// Maximum time an open deal sector should wait for more deal before it starts sealing
MaxDealWaitTime Duration

// DoSnap enables the snap deal process for deals ingested by this instance. Unlike in lotus-miner there is no
// fallback to porep when no sectors are available to snap into. When enabled all deals will be snap deals.
DoSnap bool
}

type CurioAlertingConfig struct {
Expand Down
34 changes: 34 additions & 0 deletions documentation/en/configuration/default-curio-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,34 @@ description: The default curio configuration
# type: int
#MoveStorageMaxTasks = 0

# EnableUpdateEncode enables the encoding step of the SnapDeal process on this curio instance.
# This step involves encoding the data into the sector and computing updated TreeR (uses gpu).
#
# type: bool
#EnableUpdateEncode = false

# EnableUpdateProve enables the proving step of the SnapDeal process on this curio instance.
# This step generates the snark proof for the updated sector.
#
# type: bool
#EnableUpdateProve = false

# EnableUpdateSubmit enables the submission of SnapDeal proofs to the blockchain from this curio instance.
# This step submits the generated proofs to the chain.
#
# type: bool
#EnableUpdateSubmit = false

# UpdateEncodeMaxTasks sets the maximum number of concurrent SnapDeal encoding tasks that can run on this instance.
#
# type: int
#UpdateEncodeMaxTasks = 0

# UpdateProveMaxTasks sets the maximum number of concurrent SnapDeal proving tasks that can run on this instance.
#
# type: int
#UpdateProveMaxTasks = 0

# BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests.
# This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations.
# Strings should be in the format "actor:ip:port". IP cannot be 0.0.0.0. We recommend using a private IP.
Expand Down Expand Up @@ -389,6 +417,12 @@ description: The default curio configuration
# type: Duration
#MaxDealWaitTime = "1h0m0s"

# DoSnap enables the snap deal process for deals ingested by this instance. Unlike in lotus-miner there is no
# fallback to porep when no sectors are available to snap into. When enabled all deals will be snap deals.
#
# type: bool
#DoSnap = false


[Journal]
# Events of the form: "system1:event1,system1:event2[,...]"
Expand Down
2 changes: 1 addition & 1 deletion documentation/en/curio-cli/curio.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ NAME:
curio market seal - start sealing a deal sector early

USAGE:
curio market seal [command options] [arguments...]
curio market seal [command options] <sector>

OPTIONS:
--actor value Specify actor address to start sealing sectors for
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
github.com/charmbracelet/lipgloss v0.10.0
github.com/codeskyblue/go-sh v0.0.0-20200712050446-30169cf553fe
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/docker/go-units v0.5.0
github.com/dustin/go-humanize v1.0.1
github.com/elastic/go-sysinfo v1.7.0
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e // indirect
github.com/dgraph-io/badger/v2 v2.2007.4 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
Expand Down
4 changes: 4 additions & 0 deletions harmony/harmonydb/sql/20240425-sector_meta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ CREATE TABLE sectors_meta (
seed_epoch BIGINT NOT NULL,
seed_value BYTEA NOT NULL,

-- Added in 20240611-snap-pipeline.sql
-- is_cc BOOLEAN NOT NULL DEFAULT (complex condition),
-- expiration_epoch BIGINT, (null = not crawled)

PRIMARY KEY (sp_id, sector_num)
);

Expand Down
Loading