diff --git a/api/daemon.go b/api/daemon.go index d94120d7c0..2383ede841 100644 --- a/api/daemon.go +++ b/api/daemon.go @@ -32,6 +32,7 @@ type DaemonInitConfig struct { ClusterNightly bool // AutoSealIntervalSeconds, when set, configures the daemon to check for and seal any staged sectors on an interval AutoSealIntervalSeconds uint + DefaultAddress address.Address } // DaemonInitOpt is the signature a daemon init option has to fulfill. @@ -85,3 +86,10 @@ func AutoSealIntervalSeconds(autoSealIntervalSeconds uint) DaemonInitOpt { dc.AutoSealIntervalSeconds = autoSealIntervalSeconds } } + +// DefaultAddress sets the daemons's default address to the provided address. +func DefaultAddress(address address.Address) DaemonInitOpt { + return func(dc *DaemonInitConfig) { + dc.DefaultAddress = address + } +} diff --git a/api/impl/daemon.go b/api/impl/daemon.go index 960657fa34..75ad9b7814 100644 --- a/api/impl/daemon.go +++ b/api/impl/daemon.go @@ -99,6 +99,14 @@ func (nd *nodeDaemon) Init(ctx context.Context, opts ...api.DaemonInitOpt) error } } + if cfg.DefaultAddress != (address.Address{}) { + newConfig := rep.Config() + newConfig.Wallet.DefaultAddress = cfg.DefaultAddress + if err := rep.ReplaceConfig(newConfig); err != nil { + return err + } + } + if cfg.ClusterTest && cfg.ClusterNightly { return fmt.Errorf(`cannot use both "--cluster-test" and "--cluster-nightly" options`) } diff --git a/commands/client_daemon_test.go b/commands/client_daemon_test.go index b6850cf569..2264325afd 100644 --- a/commands/client_daemon_test.go +++ b/commands/client_daemon_test.go @@ -1,35 +1,80 @@ package commands import ( - "fmt" "strings" + "sync" "testing" - - "github.com/stretchr/testify/assert" + "time" "github.com/filecoin-project/go-filecoin/fixtures" th "github.com/filecoin-project/go-filecoin/testhelpers" + "github.com/stretchr/testify/assert" ) func TestListAsks(t *testing.T) { t.Parallel() assert := assert.New(t) - peer := th.NewDaemon(t, th.WithMiner(fixtures.TestMiners[0]), th.KeyFile(fixtures.KeyFilePaths()[2])).Start() - defer peer.ShutdownSuccess() - d := th.NewDaemon(t, th.KeyFile(fixtures.KeyFilePaths()[2])).Start() - defer d.ShutdownSuccess() - peer.ConnectSuccess(d) + minerDaemon := th.NewDaemon(t, + th.WithMiner(fixtures.TestMiners[0]), + th.KeyFile(fixtures.KeyFilePaths()[0]), + th.DefaultAddress(fixtures.TestAddresses[0]), + ).Start() + defer minerDaemon.ShutdownSuccess() + + minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10") + + listAsksOutput := minerDaemon.RunSuccess("client", "list-asks").ReadStdoutTrimNewlines() + assert.Equal(fixtures.TestMiners[0]+" 000 20 11", listAsksOutput) +} + +func TestStorageDealsAfterRestart(t *testing.T) { + t.Parallel() + + minerDaemon := th.NewDaemon(t, + th.WithMiner(fixtures.TestMiners[0]), + th.KeyFile(fixtures.KeyFilePaths()[0]), + th.DefaultAddress(fixtures.TestAddresses[0]), + ).Start() + defer minerDaemon.ShutdownSuccess() + + clientDaemon := th.NewDaemon(t, + th.KeyFile(fixtures.KeyFilePaths()[1]), + th.DefaultAddress(fixtures.TestAddresses[1]), + ).Start() + defer clientDaemon.ShutdownSuccess() + + minerDaemon.UpdatePeerID() + minerDaemon.RunSuccess("mining", "start") + + minerDaemon.ConnectSuccess(clientDaemon) + + minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10") + dataCid := clientDaemon.RunWithStdin(strings.NewReader("HODLHODLHODL"), "client", "import").ReadStdoutTrimNewlines() + + proposeDealOutput := clientDaemon.RunSuccess("client", "propose-storage-deal", fixtures.TestMiners[0], dataCid, "0", "5").ReadStdoutTrimNewlines() + + splitOnSpace := strings.Split(proposeDealOutput, " ") + + dealCid := splitOnSpace[len(splitOnSpace)-1] + + minerDaemon.Restart() + minerDaemon.RunSuccess("mining", "start") - // create a miner with one ask - minerAddr := d.CreateMinerAddr(peer, fixtures.TestAddresses[2]) - d.CreateAsk(peer, minerAddr.String(), fixtures.TestAddresses[2], "20", "10") + clientDaemon.Restart() - // check ls - expectedBaseHeight := 2 - expectedExpiry := expectedBaseHeight + 10 - ls := d.RunSuccess("client", "list-asks") - expectedResult := fmt.Sprintf("%s 000 20 %d", minerAddr.String(), expectedExpiry) - assert.Equal(expectedResult, strings.Trim(ls.ReadStdout(), "\n")) + minerDaemon.ConnectSuccess(clientDaemon) + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + queryDealOutput := clientDaemon.RunSuccess("client", "query-storage-deal", dealCid).ReadStdout() + if strings.Contains(queryDealOutput, "posted") { + wg.Done() + break + } + } + }() + th.WaitTimeout(&wg, 120*time.Second) } diff --git a/commands/init.go b/commands/init.go index 096c187fde..10629cc46f 100644 --- a/commands/init.go +++ b/commands/init.go @@ -20,6 +20,7 @@ var initCmd = &cmds.Command{ cmdkit.StringOption(GenesisFile, "path of file or HTTP(S) URL containing archive of genesis block DAG data"), cmdkit.StringOption(PeerKeyFile, "path of file containing key to use for new node's libp2p identity"), cmdkit.StringOption(WithMiner, "when set, creates a custom genesis block with a pre generated miner account, requires running the daemon using dev mode (--dev)"), + cmdkit.StringOption(DefaultAddress, "when set, sets the daemons's default address to the provided address"), cmdkit.UintOption(AutoSealIntervalSeconds, "when set to a number > 0, configures the daemon to check for and seal any staged sectors on an interval.").WithDefault(uint(120)), cmdkit.BoolOption(ClusterTest, "when set, populates config bootstrap addrs with the dns multiaddrs of the test cluster and other test cluster specific bootstrap parameters."), cmdkit.BoolOption(ClusterNightly, "when set, populates config bootstrap addrs with the dns multiaddrs of the nightly cluster and other nightly cluster specific bootstrap parameters"), @@ -45,6 +46,15 @@ var initCmd = &cmds.Command{ } } + var defaultAddress address.Address + if m, ok := req.Options[DefaultAddress].(string); ok { + var err error + defaultAddress, err = address.NewFromString(m) + if err != nil { + return err + } + } + return GetAPI(env).Daemon().Init( req.Context, api.RepoDir(repoDir), @@ -54,6 +64,7 @@ var initCmd = &cmds.Command{ api.ClusterTest(clusterTest), api.ClusterNightly(clusterNightly), api.AutoSealIntervalSeconds(autoSealIntervalSeconds), + api.DefaultAddress(defaultAddress), ) }, Encoders: cmds.EncoderMap{ diff --git a/commands/main.go b/commands/main.go index 371a36306a..aeec3bb1b2 100644 --- a/commands/main.go +++ b/commands/main.go @@ -55,6 +55,9 @@ const ( // WithMiner when set, creates a custom genesis block with a pre generated miner account, requires to run the daemon using dev mode (--dev) WithMiner = "with-miner" + // DefaultAddress when set, sets the daemons's default address to the provided address + DefaultAddress = "default-address" + // GenesisFile is the path of file containing archive of genesis block DAG data GenesisFile = "genesisfile" diff --git a/node/block_propagate_test.go b/node/block_propagate_test.go index 51201f8b73..e2a5a19763 100644 --- a/node/block_propagate_test.go +++ b/node/block_propagate_test.go @@ -128,7 +128,7 @@ func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions) minerNode := MakeNodeWithChainSeed(t, seed, PeerKeyOpt(PeerKeys[0]), AutoSealIntervalSecondsOpt(1)) seed.GiveKey(t, minerNode, 0) mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0) - _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.PlumbingAPI) + _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), minerNode.PlumbingAPI) assertions.NoError(err) clientNode := MakeNodeWithChainSeed(t, seed) nodes := []*Node{minerNode, clientNode} diff --git a/node/node.go b/node/node.go index c9ec5804a0..e8931c0b4d 100644 --- a/node/node.go +++ b/node/node.go @@ -407,7 +407,11 @@ func (node *Node) Start(ctx context.Context) error { node.Host(), node.Lookup(), node.CallQueryMethod) - node.StorageMinerClient = storage.NewClient(cni) + var err error + node.StorageMinerClient, err = storage.NewClient(cni, node.Repo.ClientDealsDatastore()) + if err != nil { + return errors.Wrap(err, "Could not make new storage client") + } node.RetrievalClient = retrieval.NewClient(node) node.RetrievalMiner = retrieval.NewMiner(node) @@ -830,7 +834,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup") } - miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.PlumbingAPI) + miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.MinerDealsDatastore(), node.Repo.DealsAwaitingSealDatastore(), node.PlumbingAPI) if err != nil { return nil, errors.Wrap(err, "failed to instantiate storage miner") } diff --git a/node/node_test.go b/node/node_test.go index 12f9e73191..a9cd53227b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -161,7 +161,7 @@ func TestNodeStartMining(t *testing.T) { seed.GiveKey(t, minerNode, 0) mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0) - _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI) + _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI) assert.NoError(err) assert.NoError(minerNode.Start(ctx)) diff --git a/protocol/storage/client.go b/protocol/storage/client.go index 1ca61e89d9..a866f8a9e9 100644 --- a/protocol/storage/client.go +++ b/protocol/storage/client.go @@ -6,18 +6,21 @@ import ( "math/big" "sync" - cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor" "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" "gx/ipfs/QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8/go-multistream" "gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query" "github.com/filecoin-project/go-filecoin/abi" "github.com/filecoin-project/go-filecoin/actor/builtin/miner" "github.com/filecoin-project/go-filecoin/address" cbu "github.com/filecoin-project/go-filecoin/cborutil" "github.com/filecoin-project/go-filecoin/lookup" + "github.com/filecoin-project/go-filecoin/repo" "github.com/filecoin-project/go-filecoin/types" ) @@ -29,26 +32,36 @@ type clientNode interface { GetAskPrice(ctx context.Context, miner address.Address, askid uint64) (*types.AttoFIL, error) } +type clientDealState struct { + Miner address.Address + Proposal *DealProposal + State *DealResponse +} + // Client is used to make deals directly with storage miners. type Client struct { deals map[cid.Cid]*clientDealState + dealsDs repo.Datastore dealsLk sync.Mutex node clientNode } -type clientDealState struct { - miner address.Address - proposal *DealProposal - lastState *DealResponse +func init() { + cbor.RegisterCborType(clientDealState{}) } -// NewClient creaters a new storage miner client. -func NewClient(nd clientNode) *Client { - return &Client{ - deals: make(map[cid.Cid]*clientDealState), - node: nd, +// NewClient creates a new storage client. +func NewClient(nd clientNode, dealsDs repo.Datastore) (*Client, error) { + smc := &Client{ + deals: make(map[cid.Cid]*clientDealState), + node: nd, + dealsDs: dealsDs, } + if err := smc.loadDeals(); err != nil { + return nil, errors.Wrap(err, "failed to load client deals") + } + return smc, nil } // ProposeDeal is @@ -120,12 +133,11 @@ func (smc *Client) recordResponse(resp *DealResponse, miner address.Address, p * } smc.deals[resp.Proposal] = &clientDealState{ - lastState: resp, - miner: miner, - proposal: p, + Miner: miner, + Proposal: p, + State: resp, } - - return nil + return smc.saveDeal(resp.Proposal) } func (smc *Client) checkDealResponse(ctx context.Context, resp *DealResponse) error { @@ -149,7 +161,7 @@ func (smc *Client) minerForProposal(c cid.Cid) (address.Address, error) { return address.Address{}, fmt.Errorf("no such proposal by cid: %s", c) } - return st.miner, nil + return st.Miner, nil } // QueryDeal queries an in-progress proposal. @@ -237,3 +249,39 @@ func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Addres return ask.Price, nil } + +func (smc *Client) loadDeals() error { + res, err := smc.dealsDs.Query(query.Query{}) + if err != nil { + return errors.Wrap(err, "failed to query deals from datastore") + } + + entries, err := res.Rest() + if err != nil { + return errors.Wrap(err, "failed to load deals from datastore") + } + + smc.deals = make(map[cid.Cid]*clientDealState) + + for _, entry := range entries { + var deal clientDealState + if err := cbor.DecodeInto(entry.Value, &deal); err != nil { + return errors.Wrap(err, "failed to unmarshal deals from datastore") + } + smc.deals[deal.State.Proposal] = &deal + } + + return nil +} + +func (smc *Client) saveDeal(cid cid.Cid) error { + datum, err := cbor.DumpObject(smc.deals[cid]) + if err != nil { + return errors.Wrap(err, "could not marshal storageDealState") + } + err = smc.dealsDs.Put(datastore.NewKey(cid.String()), datum) + if err != nil { + return errors.Wrap(err, "could not save client storage deal") + } + return nil +} diff --git a/protocol/storage/miner.go b/protocol/storage/miner.go index 81f9914c25..8502df4f7f 100644 --- a/protocol/storage/miner.go +++ b/protocol/storage/miner.go @@ -2,7 +2,9 @@ package storage import ( "context" + "encoding/json" "fmt" + "github.com/filecoin-project/go-filecoin/repo" "math/big" "math/rand" "sync" @@ -11,14 +13,17 @@ import ( inet "gx/ipfs/QmNgLg1NTw37iWbYPKcyK85YJ9Whs1MkPtJwhfqbNYAyKg/go-libp2p-net" "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" hamt "gx/ipfs/QmRXf2uUSdGSunRJsM9wXSUNVwLUGCY3So5fAs7h2CBJVf/go-hamt-ipld" + cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor" dag "gx/ipfs/QmVYm5u7aHGrxA67Jxgo23bQKxbWFYvYAb76kZMnSB37TG/go-merkledag" "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" bserv "gx/ipfs/QmZ9PMwfBmywNgpxG7zRHKsAno76gMCBbKGBTVXbma44H7/go-blockservice" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" - host "gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host" + "gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" - unixfs "gx/ipfs/QmeeZKidkDAKwyvXictWdfjMkyJv1Jh4FQCHrYX6dapC2G/go-unixfs" + "gx/ipfs/QmeeZKidkDAKwyvXictWdfjMkyJv1Jh4FQCHrYX6dapC2G/go-unixfs" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query" "github.com/filecoin-project/go-filecoin/actor/builtin/miner" "github.com/filecoin-project/go-filecoin/address" @@ -46,21 +51,22 @@ type Miner struct { // deals is a list of deals we made. It is indexed by the CID of the proposal. deals map[cid.Cid]*storageDealState + dealsDs repo.Datastore dealsLk sync.Mutex postInProcessLk sync.Mutex postInProcess *types.BlockHeight - dealsAwaitingSeal *dealsAwaitingSealStruct + dealsAwaitingSealDs repo.Datastore + dealsAwaitingSeal *dealsAwaitingSealStruct plumbingAPI plumbing node node } type storageDealState struct { - proposal *DealProposal - - state *DealResponse + Proposal *DealProposal + State *DealResponse } // plumbing is the subset of the plumbing API that storage.Miner needs. @@ -82,19 +88,33 @@ type node interface { CborStore() *hamt.CborIpldStore } +func init() { + cbor.RegisterCborType(storageDealState{}) + cbor.RegisterCborType(dealsAwaitingSealStruct{}) +} + // NewMiner is -func NewMiner(ctx context.Context, minerAddr, minerOwnerAddr address.Address, nd node, plumbingAPI plumbing) (*Miner, error) { +func NewMiner(ctx context.Context, minerAddr, minerOwnerAddr address.Address, nd node, dealsDs repo.Datastore, dealsAwaitingSealDs repo.Datastore, plumbingAPI plumbing) (*Miner, error) { sm := &Miner{ - minerAddr: minerAddr, - minerOwnerAddr: minerOwnerAddr, - deals: make(map[cid.Cid]*storageDealState), - plumbingAPI: plumbingAPI, - node: nd, + minerAddr: minerAddr, + minerOwnerAddr: minerOwnerAddr, + deals: make(map[cid.Cid]*storageDealState), + plumbingAPI: plumbingAPI, + dealsDs: dealsDs, + dealsAwaitingSealDs: dealsAwaitingSealDs, + node: nd, + } + + if err := sm.loadDealsAwaitingSeal(); err != nil { + return nil, errors.Wrap(err, "failed to make miner") } - sm.dealsAwaitingSeal = newDealsAwaitingSeal() sm.dealsAwaitingSeal.onSuccess = sm.onCommitSuccess sm.dealsAwaitingSeal.onFail = sm.onCommitFail + if err := sm.loadDeals(); err != nil { + return nil, errors.Wrap(err, "failed to make miner") + } + nd.Host().SetStreamHandler(makeDealProtocol, sm.handleMakeDeal) nd.Host().SetStreamHandler(queryDealProtocol, sm.handleQueryDeal) @@ -143,28 +163,30 @@ func (sm *Miner) acceptProposal(ctx context.Context, p *DealProposal) (*DealResp // TODO: we don't really actually want to put this in our general storage // but we just want to get its cid, as a way to uniquely track it - propcid, err := sm.node.CborStore().Put(ctx, p) + proposalCid, err := sm.node.CborStore().Put(ctx, p) if err != nil { return nil, errors.Wrap(err, "failed to get cid of proposal") } resp := &DealResponse{ State: Accepted, - Proposal: propcid, + Proposal: proposalCid, Signature: types.Signature("signaturrreee"), } sm.dealsLk.Lock() defer sm.dealsLk.Unlock() - // TODO: clear out deals when appropriate. - sm.deals[propcid] = &storageDealState{ - proposal: p, - state: resp, + sm.deals[proposalCid] = &storageDealState{ + Proposal: p, + State: resp, + } + if err := sm.saveDeal(proposalCid); err != nil { + return nil, errors.Wrap(err, "failed to save miner deal") } // TODO: use some sort of nicer scheduler - go sm.processStorageDeal(propcid) + go sm.processStorageDeal(proposalCid) return resp, nil } @@ -175,11 +197,16 @@ func (sm *Miner) getStorageDeal(c cid.Cid) *storageDealState { return sm.deals[c] } -func (sm *Miner) updateDealState(c cid.Cid, f func(*DealResponse)) { +func (sm *Miner) updateDealState(proposalCid cid.Cid, f func(*DealResponse)) { sm.dealsLk.Lock() defer sm.dealsLk.Unlock() - f(sm.deals[c].state) - log.Debugf("Miner.updateDealState(%s) - %d", c.String(), sm.deals[c].state) + f(sm.deals[proposalCid].State) + err := sm.saveDeal(proposalCid) + if err != nil { + log.Errorf("failed to store deal in datastore: %s", err) + } + + log.Debugf("Miner.updateDealState(%s) - %d", proposalCid.String(), sm.deals[proposalCid].State) } func (sm *Miner) processStorageDeal(c cid.Cid) { @@ -188,7 +215,7 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { defer cancel() d := sm.getStorageDeal(c) - if d.state.State != Accepted { + if d.State.State != Accepted { // TODO: handle resumption of deal processing across miner restarts log.Error("attempted to process an already started deal") return @@ -198,7 +225,7 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { // TODO: this is not a great way to do this. At least use a session // Also, this needs to be fetched into a staging area for miners to prepare and seal in data log.Debug("Miner.processStorageDeal - FetchGraph") - if err := dag.FetchGraph(ctx, d.proposal.PieceRef, dag.NewDAGService(sm.node.BlockService())); err != nil { + if err := dag.FetchGraph(ctx, d.Proposal.PieceRef, dag.NewDAGService(sm.node.BlockService())); err != nil { log.Errorf("failed to fetch data: %s", err) sm.updateDealState(c, func(resp *DealResponse) { resp.Message = "Transfer failed" @@ -217,8 +244,8 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { } pi := §orbuilder.PieceInfo{ - Ref: d.proposal.PieceRef, - Size: d.proposal.Size.Uint64(), + Ref: d.Proposal.PieceRef, + Size: d.Proposal.Size.Uint64(), } // There is a race here that requires us to use dealsAwaitingSeal below. If the @@ -243,6 +270,9 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { // Careful: this might update state to success or failure so it should go after // updating state to Staged. sm.dealsAwaitingSeal.add(sectorID, c) + if err := sm.saveDealsAwaitingSeal(); err != nil { + log.Errorf("could not save deal awaiting seal: %s", err) + } } // dealsAwaitingSealStruct is a container for keeping track of which sectors have @@ -253,29 +283,51 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { type dealsAwaitingSealStruct struct { l sync.Mutex // Maps from sector id to the deal cids with pieces in the sector. - sectorsToDeals map[uint64][]cid.Cid + SectorsToDeals map[uint64][]cid.Cid // Maps from sector id to sector. - successfulSectors map[uint64]*sectorbuilder.SealedSectorMetadata + SuccessfulSectors map[uint64]*sectorbuilder.SealedSectorMetadata // Maps from sector id to seal failure error string. - failedSectors map[uint64]string + FailedSectors map[uint64]string onSuccess func(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) onFail func(dealCid cid.Cid, message string) } -func newDealsAwaitingSeal() *dealsAwaitingSealStruct { - return &dealsAwaitingSealStruct{ - sectorsToDeals: make(map[uint64][]cid.Cid), - successfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), - failedSectors: make(map[uint64]string), +func (sm *Miner) loadDealsAwaitingSeal() error { + sm.dealsAwaitingSeal = &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), } + + result, notFound := sm.dealsAwaitingSealDs.Get(datastore.NewKey("dealsAwaitingSeal")) + if notFound == nil { + if err := json.Unmarshal(result, &sm.dealsAwaitingSeal); err != nil { + return errors.Wrap(err, "failed to unmarshal deals awaiting seal from datastore") + } + } + + return nil +} + +func (sm *Miner) saveDealsAwaitingSeal() error { + marshalledDealsAwaitingSeal, err := json.Marshal(sm.dealsAwaitingSeal) + if err != nil { + return errors.Wrap(err, "Could not marshal dealsAwaitingSeal") + } + err = sm.dealsAwaitingSealDs.Put(datastore.NewKey("dealsAwaitingSeal"), marshalledDealsAwaitingSeal) + if err != nil { + return errors.Wrap(err, "could not save deals awaiting seal") + } + + return nil } func (dealsAwaitingSeal *dealsAwaitingSealStruct) add(sectorID uint64, dealCid cid.Cid) { dealsAwaitingSeal.l.Lock() defer dealsAwaitingSeal.l.Unlock() - if sector, ok := dealsAwaitingSeal.successfulSectors[sectorID]; ok { + if sector, ok := dealsAwaitingSeal.SuccessfulSectors[sectorID]; ok { dealsAwaitingSeal.onSuccess(dealCid, sector) // Don't keep references to sectors around forever. Assume that at most // one success-before-add call will happen (eg, in a test). Sector sealing @@ -284,17 +336,17 @@ func (dealsAwaitingSeal *dealsAwaitingSealStruct) add(sectorID uint64, dealCid c // the state around for longer for some reason we need to limit how many // sectors we hang onto, eg keep a fixed-length slice of successes // and failures and shift the oldest off and the newest on. - delete(dealsAwaitingSeal.successfulSectors, sectorID) - } else if message, ok := dealsAwaitingSeal.failedSectors[sectorID]; ok { + delete(dealsAwaitingSeal.SuccessfulSectors, sectorID) + } else if message, ok := dealsAwaitingSeal.FailedSectors[sectorID]; ok { dealsAwaitingSeal.onFail(dealCid, message) // Same as above. - delete(dealsAwaitingSeal.failedSectors, sectorID) + delete(dealsAwaitingSeal.FailedSectors, sectorID) } else { - deals, ok := dealsAwaitingSeal.sectorsToDeals[sectorID] + deals, ok := dealsAwaitingSeal.SectorsToDeals[sectorID] if ok { - dealsAwaitingSeal.sectorsToDeals[sectorID] = append(deals, dealCid) + dealsAwaitingSeal.SectorsToDeals[sectorID] = append(deals, dealCid) } else { - dealsAwaitingSeal.sectorsToDeals[sectorID] = []cid.Cid{dealCid} + dealsAwaitingSeal.SectorsToDeals[sectorID] = []cid.Cid{dealCid} } } } @@ -303,24 +355,24 @@ func (dealsAwaitingSeal *dealsAwaitingSealStruct) success(sector *sectorbuilder. dealsAwaitingSeal.l.Lock() defer dealsAwaitingSeal.l.Unlock() - dealsAwaitingSeal.successfulSectors[sector.SectorID] = sector + dealsAwaitingSeal.SuccessfulSectors[sector.SectorID] = sector - for _, dealCid := range dealsAwaitingSeal.sectorsToDeals[sector.SectorID] { + for _, dealCid := range dealsAwaitingSeal.SectorsToDeals[sector.SectorID] { dealsAwaitingSeal.onSuccess(dealCid, sector) } - delete(dealsAwaitingSeal.sectorsToDeals, sector.SectorID) + delete(dealsAwaitingSeal.SectorsToDeals, sector.SectorID) } func (dealsAwaitingSeal *dealsAwaitingSealStruct) fail(sectorID uint64, message string) { dealsAwaitingSeal.l.Lock() defer dealsAwaitingSeal.l.Unlock() - dealsAwaitingSeal.failedSectors[sectorID] = message + dealsAwaitingSeal.FailedSectors[sectorID] = message - for _, dealCid := range dealsAwaitingSeal.sectorsToDeals[sectorID] { + for _, dealCid := range dealsAwaitingSeal.SectorsToDeals[sectorID] { dealsAwaitingSeal.onFail(dealCid, message) } - delete(dealsAwaitingSeal.sectorsToDeals, sectorID) + delete(dealsAwaitingSeal.SectorsToDeals, sectorID) } // OnCommitmentAddedToChain is a callback, called when a sector seal message was posted to the chain. @@ -331,12 +383,14 @@ func (sm *Miner) OnCommitmentAddedToChain(sector *sectorbuilder.SealedSectorMeta if err != nil { // we failed to seal this sector, cancel all the deals errMsg := fmt.Sprintf("failed sealing sector: %v: %s; canceling all outstanding deals", sectorID, err) - log.Errorf(errMsg) + log.Error(errMsg) sm.dealsAwaitingSeal.fail(sector.SectorID, errMsg) - return + } else { + sm.dealsAwaitingSeal.success(sector) + } + if err = sm.saveDealsAwaitingSeal(); err != nil { + log.Errorf("failed sealing sector: %v: %s; canceling all outstanding deals", sectorID, err) } - - sm.dealsAwaitingSeal.success(sector) } func (sm *Miner) onCommitSuccess(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) { @@ -504,7 +558,7 @@ func (sm *Miner) Query(ctx context.Context, c cid.Cid) *DealResponse { } } - return d.state + return d.State } func (sm *Miner) handleQueryDeal(s inet.Stream) { @@ -538,3 +592,39 @@ func getFileSize(ctx context.Context, c cid.Cid, dserv ipld.DAGService) (uint64, return 0, fmt.Errorf("unrecognized node type: %T", fnode) } } + +func (sm *Miner) loadDeals() error { + res, err := sm.dealsDs.Query(query.Query{}) + if err != nil { + return errors.Wrap(err, "failed to query deals from datastore") + } + + entries, err := res.Rest() + if err != nil { + return errors.Wrap(err, "failed to load deals from datastore") + } + + sm.deals = make(map[cid.Cid]*storageDealState) + + for _, entry := range entries { + var deal storageDealState + if err := cbor.DecodeInto(entry.Value, &deal); err != nil { + return errors.Wrap(err, "failed to unmarshal deals from datastore") + } + sm.deals[deal.State.Proposal] = &deal + } + + return nil +} + +func (sm *Miner) saveDeal(proposalCid cid.Cid) error { + marshalledDeal, err := cbor.DumpObject(sm.deals[proposalCid]) + if err != nil { + return errors.Wrap(err, "Could not marshal storageDealState") + } + err = sm.dealsDs.Put(datastore.NewKey(proposalCid.String()), marshalledDeal) + if err != nil { + return errors.Wrap(err, "could not save client storage deal") + } + return nil +} diff --git a/protocol/storage/miner_test.go b/protocol/storage/miner_test.go index 64d6d71f5d..ab9fca840c 100644 --- a/protocol/storage/miner_test.go +++ b/protocol/storage/miner_test.go @@ -1,15 +1,17 @@ package storage import ( - cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" "testing" "github.com/filecoin-project/go-filecoin/proofs/sectorbuilder" + "github.com/filecoin-project/go-filecoin/repo" "github.com/filecoin-project/go-filecoin/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" ) -func TestNewDealsAwaitingSeal(t *testing.T) { +func TestDealsAwaitingSeal(t *testing.T) { newCid := types.NewCidForTestGetter() cid0 := newCid() cid1 := newCid() @@ -21,11 +23,38 @@ func TestNewDealsAwaitingSeal(t *testing.T) { wantMessage := "boom" + t.Run("saveDealsAwaitingSeal saves, loadDealsAwaitingSeal loads", func(t *testing.T) { + t.Parallel() + assert := assert.New(t) + require := require.New(t) + + miner := &Miner{ + dealsAwaitingSeal: &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + }, + dealsAwaitingSealDs: repo.NewInMemoryRepo().DealsAwaitingSealDatastore(), + } + + miner.dealsAwaitingSeal.add(wantSectorID, cid0) + + require.NoError(miner.saveDealsAwaitingSeal()) + miner.dealsAwaitingSeal = &dealsAwaitingSealStruct{} + require.NoError(miner.loadDealsAwaitingSeal()) + + assert.Equal(cid0, miner.dealsAwaitingSeal.SectorsToDeals[42][0]) + }) + t.Run("add before success", func(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onSuccess = func(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) { assert.Equal(sector, wantSector) @@ -44,7 +73,11 @@ func TestNewDealsAwaitingSeal(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onSuccess = func(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) { assert.Equal(sector, wantSector) @@ -63,7 +96,11 @@ func TestNewDealsAwaitingSeal(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onFail = func(dealCid cid.Cid, message string) { assert.Equal(message, wantMessage) @@ -82,7 +119,11 @@ func TestNewDealsAwaitingSeal(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onFail = func(dealCid cid.Cid, message string) { assert.Equal(message, wantMessage) diff --git a/protocol/storage/storage_protocol_test.go b/protocol/storage/storage_protocol_test.go index 3b227b674a..ecfa3219f9 100644 --- a/protocol/storage/storage_protocol_test.go +++ b/protocol/storage/storage_protocol_test.go @@ -9,7 +9,7 @@ import ( "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor" dag "gx/ipfs/QmVYm5u7aHGrxA67Jxgo23bQKxbWFYvYAb76kZMnSB37TG/go-merkledag" - unixfs "gx/ipfs/QmeeZKidkDAKwyvXictWdfjMkyJv1Jh4FQCHrYX6dapC2G/go-unixfs" + "gx/ipfs/QmeeZKidkDAKwyvXictWdfjMkyJv1Jh4FQCHrYX6dapC2G/go-unixfs" mactor "github.com/filecoin-project/go-filecoin/actor/builtin/miner" "github.com/filecoin-project/go-filecoin/address" @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-filecoin/api2/impl/mthdsigapi" "github.com/filecoin-project/go-filecoin/node" . "github.com/filecoin-project/go-filecoin/protocol/storage" + th "github.com/filecoin-project/go-filecoin/testhelpers" "github.com/filecoin-project/go-filecoin/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -86,8 +87,8 @@ func TestStorageProtocolBasic(t *testing.T) { return [][]byte{enc}, 0, nil }, ) - c := NewClient(cni) - m, err := NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI) + c, _ := NewClient(cni, clientNode.Repo.ClientDealsDatastore()) + m, err := NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI) assert.NoError(err) _ = m @@ -173,7 +174,7 @@ func TestStorageProtocolBasic(t *testing.T) { } require.True(done) - if waitTimeout(&wg, 120*time.Second) { + if th.WaitTimeout(&wg, 120*time.Second) { state, message := requireQueryDeal() require.NotEqual(Failed, state, message) require.Failf("TestStorageProtocolBasic failed", "waiting for submission timed out. Saw %d blocks with %d messages while waiting", bCount, mCount) @@ -197,19 +198,3 @@ func TestStorageProtocolBasic(t *testing.T) { assert.True(done, "failed to finish transfer") } - -// waitTimeout waits for the waitgroup for the specified max timeout. -// Returns true if waiting timed out. -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { - c := make(chan struct{}) - go func() { - defer close(c) - wg.Wait() - }() - select { - case <-c: - return false // completed normally - case <-time.After(timeout): - return true // timed out - } -} diff --git a/repo/fsrepo.go b/repo/fsrepo.go index be7be029e0..938a6d4296 100644 --- a/repo/fsrepo.go +++ b/repo/fsrepo.go @@ -24,15 +24,18 @@ import ( const ( // APIFile is the filename containing the filecoin node's api address. - APIFile = "api" - configFilename = "config.json" - tempConfigFilename = ".config.json.temp" - lockFile = "repo.lock" - versionFilename = "version" - walletDatastorePrefix = "wallet" - chainDatastorePrefix = "chain" - snapshotStorePrefix = "snapshots" - snapshotFilenamePrefix = "snapshot" + APIFile = "api" + configFilename = "config.json" + tempConfigFilename = ".config.json.temp" + lockFile = "repo.lock" + versionFilename = "version" + walletDatastorePrefix = "wallet" + chainDatastorePrefix = "chain" + minerDealsDatastorePrefix = "miner_deals" + clientDealsDatastorePrefix = "client_deals" + dealsAwaitingSealDatastorePrefix = "deals_awaiting_seal" + snapshotStorePrefix = "snapshots" + snapshotFilenamePrefix = "snapshot" ) // NoRepoError is returned when trying to open a repo where one does not exist @@ -52,12 +55,15 @@ type FSRepo struct { version uint // lk protects the config file - lk sync.RWMutex - cfg *config.Config - ds Datastore - keystore keystore.Keystore - walletDs Datastore - chainDs Datastore + lk sync.RWMutex + cfg *config.Config + ds Datastore + keystore keystore.Keystore + walletDs Datastore + chainDs Datastore + minerDealsDs Datastore + clientDealsDs Datastore + dealsAwaitingSealDs Datastore // lockfile is the file system lock to prevent others from opening the same repo. lockfile io.Closer @@ -127,6 +133,18 @@ func (r *FSRepo) loadFromDisk() error { if err := r.openChainDatastore(); err != nil { return errors.Wrap(err, "failed to open chain datastore") } + + if err := r.openMinerDealsDatastore(); err != nil { + return errors.Wrap(err, "failed to open miner deals datastore") + } + + if err := r.openClientDealsDatastore(); err != nil { + return errors.Wrap(err, "failed to open client deals datastore") + } + + if err := r.openDealsAwaitingSealDatastore(); err != nil { + return errors.Wrap(err, "failed to open client deals datastore") + } return nil } @@ -216,6 +234,21 @@ func (r *FSRepo) ChainDatastore() Datastore { return r.chainDs } +// MinerDealsDatastore returns the deals datastore for a miner. +func (r *FSRepo) MinerDealsDatastore() Datastore { + return r.minerDealsDs +} + +// ClientDealsDatastore returns the deals datastore for a client. +func (r *FSRepo) ClientDealsDatastore() Datastore { + return r.clientDealsDs +} + +// DealsAwaitingSealDatastore returns the deals awaiting seals. +func (r *FSRepo) DealsAwaitingSealDatastore() Datastore { + return r.dealsAwaitingSealDs +} + // Version returns the version of the repo func (r *FSRepo) Version() uint { return r.version @@ -233,11 +266,23 @@ func (r *FSRepo) Close() error { } if err := r.walletDs.Close(); err != nil { - return errors.Wrap(err, "failed to close datastore") + return errors.Wrap(err, "failed to close wallet datastore") } if err := r.chainDs.Close(); err != nil { - return errors.Wrap(err, "failed to close datastore") + return errors.Wrap(err, "failed to close chain datastore") + } + + if err := r.minerDealsDs.Close(); err != nil { + return errors.Wrap(err, "failed to close miner deals datastore") + } + + if err := r.clientDealsDs.Close(); err != nil { + return errors.Wrap(err, "failed to close client deals datastore") + } + + if err := r.dealsAwaitingSealDs.Close(); err != nil { + return errors.Wrap(err, "failed to close deals awaiting seal datastore") } if err := r.removeAPIFile(); err != nil { @@ -351,6 +396,40 @@ func (r *FSRepo) openWalletDatastore() error { return nil } +// TODO: Undup these methods, they are all the same +func (r *FSRepo) openMinerDealsDatastore() error { + ds, err := badgerds.NewDatastore(filepath.Join(r.path, minerDealsDatastorePrefix), nil) + if err != nil { + return err + } + + r.minerDealsDs = ds + + return nil +} + +func (r *FSRepo) openClientDealsDatastore() error { + ds, err := badgerds.NewDatastore(filepath.Join(r.path, clientDealsDatastorePrefix), nil) + if err != nil { + return err + } + + r.clientDealsDs = ds + + return nil +} + +func (r *FSRepo) openDealsAwaitingSealDatastore() error { + ds, err := badgerds.NewDatastore(filepath.Join(r.path, dealsAwaitingSealDatastorePrefix), nil) + if err != nil { + return err + } + + r.dealsAwaitingSealDs = ds + + return nil +} + func initVersion(p string, version uint) error { return ioutil.WriteFile(filepath.Join(p, versionFilename), []byte(strconv.Itoa(int(version))), 0644) } diff --git a/repo/mem.go b/repo/mem.go index 833b7c8f15..ebd4ad7654 100644 --- a/repo/mem.go +++ b/repo/mem.go @@ -16,16 +16,19 @@ import ( // implementation of the Repo interface. type MemRepo struct { // lk guards the config - lk sync.RWMutex - C *config.Config - D Datastore - Ks keystore.Keystore - W Datastore - Chain Datastore - version uint - apiAddress string - stagingDir string - sealedDir string + lk sync.RWMutex + C *config.Config + D Datastore + Ks keystore.Keystore + W Datastore + Chain Datastore + MinerDealsDs Datastore + ClientDealsDs Datastore + DealsAwaitingSealDs Datastore + version uint + apiAddress string + stagingDir string + sealedDir string } var _ Repo = (*MemRepo)(nil) @@ -50,14 +53,17 @@ func NewInMemoryRepo() *MemRepo { // sector-storage. func NewInMemoryRepoWithSectorDirectories(staging, sealedDir string) *MemRepo { return &MemRepo{ - C: config.NewDefaultConfig(), - D: dss.MutexWrap(datastore.NewMapDatastore()), - Ks: keystore.MutexWrap(keystore.NewMemKeystore()), - W: dss.MutexWrap(datastore.NewMapDatastore()), - Chain: dss.MutexWrap(datastore.NewMapDatastore()), - version: Version, - stagingDir: staging, - sealedDir: sealedDir, + C: config.NewDefaultConfig(), + D: dss.MutexWrap(datastore.NewMapDatastore()), + Ks: keystore.MutexWrap(keystore.NewMemKeystore()), + W: dss.MutexWrap(datastore.NewMapDatastore()), + Chain: dss.MutexWrap(datastore.NewMapDatastore()), + MinerDealsDs: dss.MutexWrap(datastore.NewMapDatastore()), + ClientDealsDs: dss.MutexWrap(datastore.NewMapDatastore()), + DealsAwaitingSealDs: dss.MutexWrap(datastore.NewMapDatastore()), + version: Version, + stagingDir: staging, + sealedDir: sealedDir, } } @@ -99,6 +105,21 @@ func (mr *MemRepo) ChainDatastore() Datastore { return mr.Chain } +// MinerDealsDatastore returns the deals datastore for miners. +func (mr *MemRepo) MinerDealsDatastore() Datastore { + return mr.MinerDealsDs +} + +// ClientDealsDatastore returns the deals datastore for miners. +func (mr *MemRepo) ClientDealsDatastore() Datastore { + return mr.ClientDealsDs +} + +// DealsAwaitingSealDatastore returns the deals awaiting seal datastore. +func (mr *MemRepo) DealsAwaitingSealDatastore() Datastore { + return mr.DealsAwaitingSealDs +} + // Version returns the version of the repo. func (mr *MemRepo) Version() uint { return mr.version diff --git a/repo/repo.go b/repo/repo.go index 39238c85f4..4db0f2600b 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -35,6 +35,15 @@ type Repo interface { // ChainDatastore is a specific storage solution, only used to store already validated chain data. ChainDatastore() Datastore + // MinerDealsDatastore holds deals data. + MinerDealsDatastore() Datastore + + // ClientDealsDatastore holds deals data. + ClientDealsDatastore() Datastore + + // DealsAwaitingSealDatastore holds deals awaiting seal data. + DealsAwaitingSealDatastore() Datastore + // SetAPIAddr sets the address of the running API. SetAPIAddr(string) error diff --git a/testhelpers/commands.go b/testhelpers/commands.go index c73cc4043e..f6eaa45a08 100644 --- a/testhelpers/commands.go +++ b/testhelpers/commands.go @@ -107,16 +107,15 @@ type TestDaemon struct { init bool // The filecoin daemon process - process *exec.Cmd - - lk sync.Mutex - Stdin io.Writer - Stdout io.Reader - Stderr io.Reader - - test *testing.T - - cmdTimeout time.Duration + process *exec.Cmd + lk sync.Mutex + Stdin io.Writer + Stdout io.Reader + Stderr io.Reader + test *testing.T + cmdTimeout time.Duration + defaultAddress string + daemonArgs []string } // RepoDir returns the repo directory of the test daemon. @@ -355,6 +354,8 @@ func (td *TestDaemon) ReadStderr() string { // Start starts up the daemon. func (td *TestDaemon) Start() *TestDaemon { + td.createNewProcess() + require.NoError(td.test, td.process.Start()) err := td.WaitForAPI() @@ -376,7 +377,23 @@ func (td *TestDaemon) Start() *TestDaemon { return td } -// Shutdown stops the daemon. +// Stop stops the daemon +func (td *TestDaemon) Stop() *TestDaemon { + if err := td.process.Process.Signal(syscall.SIGINT); err != nil { + panic(err) + } + if _, err := td.process.Process.Wait(); err != nil { + panic(err) + } + return td +} + +// Restart restarts the daemon +func (td *TestDaemon) Restart() *TestDaemon { + return td.Stop().Start() +} + +// Shutdown stops the daemon and deletes the repository. func (td *TestDaemon) Shutdown() { if err := td.process.Process.Signal(syscall.SIGTERM); err != nil { td.test.Errorf("Daemon Stderr:\n%s", td.ReadStderr()) @@ -480,6 +497,44 @@ func (td *TestDaemon) CreateAsk(peer *TestDaemon, minerAddr string, fromAddr str return &askID } +// UpdatePeerID updates a miner's peer ID +func (td *TestDaemon) UpdatePeerID() { + require := require.New(td.test) + assert := assert.New(td.test) + var wg sync.WaitGroup + wg.Add(1) + + go func() { + var idOutput map[string]interface{} + peerIDJSON := td.RunSuccess("id").ReadStdout() + err := json.Unmarshal([]byte(peerIDJSON), &idOutput) + require.NoError(err) + updateCidStr := td.RunSuccess("miner", "update-peerid", "--price=0", "--limit=999999", td.GetMinerAddress().String(), idOutput["ID"].(string)).ReadStdoutTrimNewlines() + updateCid, err := cid.Parse(updateCidStr) + require.NoError(err) + assert.NotNil(updateCid) + wg.Done() + }() + td.MineAndPropagate(time.Second, td) + wg.Wait() +} + +// CreateStorageProposal does what it sounds like it does +func (td *TestDaemon) CreateStorageProposal(minerAddr string, dataCid string, AskID string, duration string) *big.Int { + var askID big.Int + var wg sync.WaitGroup + wg.Add(1) + + go func() { + td.RunSuccess("client", "propose-storage-deal", minerAddr, dataCid, AskID, duration) + wg.Done() + }() + td.MineAndPropagate(time.Second, td) + wg.Wait() + + return &askID +} + // WaitForMessageRequireSuccess accepts a message cid and blocks until a message with matching cid is included in a // block. The receipt is then inspected to ensure that the corresponding message receipt had a 0 exit code. func (td *TestDaemon) WaitForMessageRequireSuccess(msgCid cid.Cid) *types.MessageReceipt { @@ -674,6 +729,13 @@ func KeyFile(kf string) func(*TestDaemon) { } } +// DefaultAddress specifies a key file for this daemon to add to their wallet during init +func DefaultAddress(defaultAddr string) func(*TestDaemon) { + return func(td *TestDaemon) { + td.defaultAddress = defaultAddr + } +} + // GenesisFile allows setting the `genesisFile` config option on the daemon. func GenesisFile(a string) func(*TestDaemon) { return func(td *TestDaemon) { @@ -700,11 +762,10 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { } td := &TestDaemon{ - test: t, - repoDir: dir, - init: true, // we want to init unless told otherwise - firstRun: true, - + test: t, + repoDir: dir, + init: true, // we want to init unless told otherwise + firstRun: true, cmdTimeout: DefaultDaemonCmdTimeout, genesisFile: GenesisFilePath(), // default file includes all test addresses, } @@ -720,6 +781,7 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { // build command options initopts := []string{ repoDirFlag, + "--auto-seal-interval-seconds=1", } if td.genesisFile != "" { @@ -730,6 +792,10 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { initopts = append(initopts, fmt.Sprintf("--with-miner=%s", td.withMiner)) } + if td.defaultAddress != "" { + initopts = append(initopts, fmt.Sprintf("--default-address=%s", td.defaultAddress)) + } + if td.init { t.Logf("run: go-filecoin init %s", initopts) out, err := RunInit(td, initopts...) @@ -755,29 +821,7 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { swarmListenFlag := fmt.Sprintf("--swarmlisten=%s", td.swarmAddr) cmdAPIAddrFlag := fmt.Sprintf("--cmdapiaddr=%s", td.cmdAddr) - finalArgs := []string{"daemon", repoDirFlag, cmdAPIAddrFlag, swarmListenFlag, blockTimeFlag} - td.test.Logf("(%s) run: %q\n", td.swarmAddr, strings.Join(finalArgs, " ")) - - // define filecoin daemon process - td.process = exec.Command(filecoinBin, finalArgs...) - // disable REUSEPORT, it creates problems in tests - td.process.Env = append(os.Environ(), "IPFS_REUSEPORT=false") - - // setup process pipes - td.Stdout, err = td.process.StdoutPipe() - if err != nil { - t.Fatal(err) - } - // uncomment this and comment out the following 4 lines to output daemon stderr to os stderr - //td.process.Stderr = os.Stderr - td.Stderr, err = td.process.StderrPipe() - if err != nil { - t.Fatal(err) - } - td.Stdin, err = td.process.StdinPipe() - if err != nil { - t.Fatal(err) - } + td.daemonArgs = []string{filecoinBin, "daemon", repoDirFlag, cmdAPIAddrFlag, swarmListenFlag, blockTimeFlag} return td } @@ -809,3 +853,28 @@ func ProjectRoot(paths ...string) string { return filepath.Join(allPaths...) } + +func (td *TestDaemon) createNewProcess() { + td.test.Logf("(%s) run: %q\n", td.swarmAddr, strings.Join(td.daemonArgs, " ")) + + td.process = exec.Command(td.daemonArgs[0], td.daemonArgs[1:]...) + // disable REUSEPORT, it creates problems in tests + td.process.Env = append(os.Environ(), "IPFS_REUSEPORT=false") + + // setup process pipes + var err error + td.Stdout, err = td.process.StdoutPipe() + if err != nil { + td.test.Fatal(err) + } + // uncomment this and comment out the following 4 lines to output daemon stderr to os stderr + //td.process.Stderr = os.Stderr + td.Stderr, err = td.process.StderrPipe() + if err != nil { + td.test.Fatal(err) + } + td.Stdin, err = td.process.StdinPipe() + if err != nil { + td.test.Fatal(err) + } +} diff --git a/testhelpers/util.go b/testhelpers/util.go index 2d9c2232f7..a1b7255b4b 100644 --- a/testhelpers/util.go +++ b/testhelpers/util.go @@ -5,6 +5,7 @@ import ( "net" "os" "path/filepath" + "sync" "time" "gx/ipfs/QmdcULN1WCzgoQmcCaUAmEhwcxHYsDrbZ2LvRJKCL8dMrK/go-homedir" @@ -82,3 +83,19 @@ func WaitForIt(count int, delay time.Duration, cb func() (bool, error)) error { return nil } + +// WaitTimeout waits for the waitgroup for the specified max timeout. +// Returns true if waiting timed out. +func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +}