Skip to content

Commit

Permalink
Deals Persistence
Browse files Browse the repository at this point in the history
We persist deals to the disk to allow the continuation of deals after
nodes are restarted.

Additionally, we add:
- A default address option to init
- The ability to start, stop, and restart a test daemon

Co-authored-by: rkowalick <[email protected]>
  • Loading branch information
rosalinekarr and rkowalick committed Jan 16, 2019
1 parent 29325c3 commit 0c7dd90
Show file tree
Hide file tree
Showing 20 changed files with 676 additions and 218 deletions.
8 changes: 8 additions & 0 deletions api/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type DaemonInitConfig struct {
ClusterNightly bool
// AutoSealIntervalSeconds, when set, configures the daemon to check for and seal any staged sectors on an interval
AutoSealIntervalSeconds uint
DefaultAddress address.Address
}

// DaemonInitOpt is the signature a daemon init option has to fulfill.
Expand Down Expand Up @@ -85,3 +86,10 @@ func AutoSealIntervalSeconds(autoSealIntervalSeconds uint) DaemonInitOpt {
dc.AutoSealIntervalSeconds = autoSealIntervalSeconds
}
}

// DefaultAddress sets the daemons's default address to the provided address.
func DefaultAddress(address address.Address) DaemonInitOpt {
return func(dc *DaemonInitConfig) {
dc.DefaultAddress = address
}
}
8 changes: 8 additions & 0 deletions api/impl/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (nd *nodeDaemon) Init(ctx context.Context, opts ...api.DaemonInitOpt) error
}
}

if cfg.DefaultAddress != (address.Address{}) {
newConfig := rep.Config()
newConfig.Wallet.DefaultAddress = cfg.DefaultAddress
if err := rep.ReplaceConfig(newConfig); err != nil {
return err
}
}

if cfg.ClusterTest && cfg.ClusterNightly {
return fmt.Errorf(`cannot use both "--cluster-test" and "--cluster-nightly" options`)
}
Expand Down
6 changes: 3 additions & 3 deletions commands/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ be 2, 1 hour would be 120, and 1 day would be 2880.
Type: storage.DealResponse{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, resp *storage.DealResponse) error {
fmt.Fprintf(w, "State: %s\n", resp.State.String()) // nolint: errcheck
fmt.Fprintf(w, "Message: %s\n", resp.Message) // nolint: errcheck
fmt.Fprintf(w, "DealID: %s\n", resp.Proposal.String()) // nolint: errcheck
fmt.Fprintf(w, "State: %s\n", resp.State.String()) // nolint: errcheck
fmt.Fprintf(w, "Message: %s\n", resp.Message) // nolint: errcheck
fmt.Fprintf(w, "DealID: %s\n", resp.ProposalCid.String()) // nolint: errcheck
return nil
}),
},
Expand Down
81 changes: 64 additions & 17 deletions commands/client_daemon_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,82 @@
package commands

import (
"fmt"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/filecoin-project/go-filecoin/fixtures"
th "github.com/filecoin-project/go-filecoin/testhelpers"
"github.com/stretchr/testify/assert"
)

func TestListAsks(t *testing.T) {
t.Parallel()
assert := assert.New(t)

peer := th.NewDaemon(t, th.WithMiner(fixtures.TestMiners[0]), th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer peer.ShutdownSuccess()
d := th.NewDaemon(t, th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer d.ShutdownSuccess()
peer.ConnectSuccess(d)
minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
).Start()
defer minerDaemon.ShutdownSuccess()

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")

listAsksOutput := minerDaemon.RunSuccess("client", "list-asks").ReadStdoutTrimNewlines()
assert.Equal(fixtures.TestMiners[0]+" 000 20 11", listAsksOutput)
}

func TestStorageDealsAfterRestart(t *testing.T) {
t.Skip("Temporarily skipped to be fixed in subsequent refactor work")

minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
th.AutoSealInterval("1"),
).Start()
defer minerDaemon.ShutdownSuccess()

clientDaemon := th.NewDaemon(t,
th.KeyFile(fixtures.KeyFilePaths()[1]),
th.DefaultAddress(fixtures.TestAddresses[1]),
).Start()
defer clientDaemon.ShutdownSuccess()

minerDaemon.UpdatePeerID()
minerDaemon.RunSuccess("mining", "start")

minerDaemon.ConnectSuccess(clientDaemon)

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")
dataCid := clientDaemon.RunWithStdin(strings.NewReader("HODLHODLHODL"), "client", "import").ReadStdoutTrimNewlines()

proposeDealOutput := clientDaemon.RunSuccess("client", "propose-storage-deal", fixtures.TestMiners[0], dataCid, "0", "5").ReadStdoutTrimNewlines()

splitOnSpace := strings.Split(proposeDealOutput, " ")

dealCid := splitOnSpace[len(splitOnSpace)-1]

minerDaemon.Restart()
minerDaemon.RunSuccess("mining", "start")

// create a miner with one ask
minerAddr := d.CreateMinerAddr(peer, fixtures.TestAddresses[2])
d.CreateAsk(peer, minerAddr.String(), fixtures.TestAddresses[2], "20", "10")
clientDaemon.Restart()

// check ls
expectedBaseHeight := 2
expectedExpiry := expectedBaseHeight + 10
ls := d.RunSuccess("client", "list-asks")
expectedResult := fmt.Sprintf("%s 000 20 %d", minerAddr.String(), expectedExpiry)
assert.Equal(expectedResult, strings.Trim(ls.ReadStdout(), "\n"))
minerDaemon.ConnectSuccess(clientDaemon)

var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
queryDealOutput := clientDaemon.RunSuccess("client", "query-storage-deal", dealCid).ReadStdout()
if strings.Contains(queryDealOutput, "posted") {
wg.Done()
break
}
time.Sleep(500 * time.Millisecond)
}
}()
th.WaitTimeout(&wg, 120*time.Second)
}
11 changes: 11 additions & 0 deletions commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var initCmd = &cmds.Command{
cmdkit.StringOption(GenesisFile, "path of file or HTTP(S) URL containing archive of genesis block DAG data"),
cmdkit.StringOption(PeerKeyFile, "path of file containing key to use for new node's libp2p identity"),
cmdkit.StringOption(WithMiner, "when set, creates a custom genesis block with a pre generated miner account, requires running the daemon using dev mode (--dev)"),
cmdkit.StringOption(DefaultAddress, "when set, sets the daemons's default address to the provided address"),
cmdkit.UintOption(AutoSealIntervalSeconds, "when set to a number > 0, configures the daemon to check for and seal any staged sectors on an interval.").WithDefault(uint(120)),
cmdkit.BoolOption(ClusterTest, "when set, populates config bootstrap addrs with the dns multiaddrs of the test cluster and other test cluster specific bootstrap parameters."),
cmdkit.BoolOption(ClusterNightly, "when set, populates config bootstrap addrs with the dns multiaddrs of the nightly cluster and other nightly cluster specific bootstrap parameters"),
Expand All @@ -45,6 +46,15 @@ var initCmd = &cmds.Command{
}
}

var defaultAddress address.Address
if m, ok := req.Options[DefaultAddress].(string); ok {
var err error
defaultAddress, err = address.NewFromString(m)
if err != nil {
return err
}
}

return GetAPI(env).Daemon().Init(
req.Context,
api.RepoDir(repoDir),
Expand All @@ -54,6 +64,7 @@ var initCmd = &cmds.Command{
api.ClusterTest(clusterTest),
api.ClusterNightly(clusterNightly),
api.AutoSealIntervalSeconds(autoSealIntervalSeconds),
api.DefaultAddress(defaultAddress),
)
},
Encoders: cmds.EncoderMap{
Expand Down
3 changes: 3 additions & 0 deletions commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
// WithMiner when set, creates a custom genesis block with a pre generated miner account, requires to run the daemon using dev mode (--dev)
WithMiner = "with-miner"

// DefaultAddress when set, sets the daemons's default address to the provided address
DefaultAddress = "default-address"

// GenesisFile is the path of file containing archive of genesis block DAG data
GenesisFile = "genesisfile"

Expand Down
2 changes: 1 addition & 1 deletion node/block_propagate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions)
)
seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.PlumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), minerNode.PlumbingAPI)
assertions.NoError(err)
clientNode := MakeNodeWithChainSeed(t, seed, configOpts)
nodes := []*Node{minerNode, clientNode}
Expand Down
8 changes: 6 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,11 @@ func (node *Node) Start(ctx context.Context) error {
node.Host(),
node.Lookup(),
node.CallQueryMethod)
node.StorageMinerClient = storage.NewClient(cni)
var err error
node.StorageMinerClient, err = storage.NewClient(cni, node.Repo.ClientDealsDatastore())
if err != nil {
return errors.Wrap(err, "Could not make new storage client")
}

node.RetrievalClient = retrieval.NewClient(node)
node.RetrievalMiner = retrieval.NewMiner(node)
Expand Down Expand Up @@ -846,7 +850,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e
return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup")
}

miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.PlumbingAPI)
miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.MinerDealsDatastore(), node.Repo.DealsAwaitingSealDatastore(), node.PlumbingAPI)
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate storage miner")
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestNodeStartMining(t *testing.T) {

seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI)
assert.NoError(err)

assert.NoError(minerNode.Start(ctx))
Expand Down
87 changes: 67 additions & 20 deletions protocol/storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ import (
"math/big"
"sync"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor"
"gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors"
"gx/ipfs/QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8/go-multistream"
"gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query"

"github.com/filecoin-project/go-filecoin/abi"
"github.com/filecoin-project/go-filecoin/actor/builtin/miner"
"github.com/filecoin-project/go-filecoin/address"
cbu "github.com/filecoin-project/go-filecoin/cborutil"
"github.com/filecoin-project/go-filecoin/lookup"
"github.com/filecoin-project/go-filecoin/repo"
"github.com/filecoin-project/go-filecoin/types"
)

Expand All @@ -29,26 +32,36 @@ type clientNode interface {
GetAskPrice(ctx context.Context, miner address.Address, askid uint64) (*types.AttoFIL, error)
}

type clientDeal struct {
Miner address.Address
Proposal *DealProposal
Response *DealResponse
}

// Client is used to make deals directly with storage miners.
type Client struct {
deals map[cid.Cid]*clientDealState
deals map[cid.Cid]*clientDeal
dealsDs repo.Datastore
dealsLk sync.Mutex

node clientNode
}

type clientDealState struct {
miner address.Address
proposal *DealProposal
lastState *DealResponse
func init() {
cbor.RegisterCborType(clientDeal{})
}

// NewClient creaters a new storage miner client.
func NewClient(nd clientNode) *Client {
return &Client{
deals: make(map[cid.Cid]*clientDealState),
node: nd,
// NewClient creates a new storage client.
func NewClient(nd clientNode, dealsDs repo.Datastore) (*Client, error) {
smc := &Client{
deals: make(map[cid.Cid]*clientDeal),
node: nd,
dealsDs: dealsDs,
}
if err := smc.loadDeals(); err != nil {
return nil, errors.Wrap(err, "failed to load client deals")
}
return smc, nil
}

// ProposeDeal is
Expand Down Expand Up @@ -114,18 +127,17 @@ func (smc *Client) ProposeDeal(ctx context.Context, miner address.Address, data
func (smc *Client) recordResponse(resp *DealResponse, miner address.Address, p *DealProposal) error {
smc.dealsLk.Lock()
defer smc.dealsLk.Unlock()
_, ok := smc.deals[resp.Proposal]
_, ok := smc.deals[resp.ProposalCid]
if ok {
return fmt.Errorf("deal [%s] is already in progress", resp.Proposal.String())
return fmt.Errorf("deal [%s] is already in progress", resp.ProposalCid.String())
}

smc.deals[resp.Proposal] = &clientDealState{
lastState: resp,
miner: miner,
proposal: p,
smc.deals[resp.ProposalCid] = &clientDeal{
Miner: miner,
Proposal: p,
Response: resp,
}

return nil
return smc.saveDeal(resp.ProposalCid)
}

func (smc *Client) checkDealResponse(ctx context.Context, resp *DealResponse) error {
Expand All @@ -149,7 +161,7 @@ func (smc *Client) minerForProposal(c cid.Cid) (address.Address, error) {
return address.Address{}, fmt.Errorf("no such proposal by cid: %s", c)
}

return st.miner, nil
return st.Miner, nil
}

// QueryDeal queries an in-progress proposal.
Expand Down Expand Up @@ -237,3 +249,38 @@ func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Addres

return ask.Price, nil
}

func (smc *Client) loadDeals() error {
res, err := smc.dealsDs.Query(query.Query{})
if err != nil {
return errors.Wrap(err, "failed to query deals from datastore")
}

smc.deals = make(map[cid.Cid]*clientDeal)

for entry := range res.Next() {
var deal clientDeal
if err := cbor.DecodeInto(entry.Value, &deal); err != nil {
return errors.Wrap(err, "failed to unmarshal deals from datastore")
}
smc.deals[deal.Response.ProposalCid] = &deal
}

return nil
}

func (smc *Client) saveDeal(cid cid.Cid) error {
deal, ok := smc.deals[cid]
if !ok {
return errors.Errorf("Could not find client deal with cid: %s", cid.String())
}
datum, err := cbor.DumpObject(deal)
if err != nil {
return errors.Wrap(err, "could not marshal storageDeal")
}
err = smc.dealsDs.Put(datastore.NewKey(cid.String()), datum)
if err != nil {
return errors.Wrap(err, "could not save client deal to disk, in-memory deals differ from persisted deals!")
}
return nil
}
Loading

0 comments on commit 0c7dd90

Please sign in to comment.