Skip to content

Commit

Permalink
Deals Persistence
Browse files Browse the repository at this point in the history
We persist deals to the disk to allow the continuation of deals after
nodes are restarted.

Additionally, we add:
- A default address option to init
- The ability to start, stop, and restart a test daemon

Co-authored-by: rkowalick <[email protected]>
  • Loading branch information
rosalinekarr and rkowalick committed Jan 4, 2019
1 parent 1d7cd11 commit 45c8593
Show file tree
Hide file tree
Showing 17 changed files with 628 additions and 190 deletions.
8 changes: 8 additions & 0 deletions api/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type DaemonInitConfig struct {
ClusterNightly bool
// AutoSealIntervalSeconds, when set, configures the daemon to check for and seal any staged sectors on an interval
AutoSealIntervalSeconds uint
DefaultAddress address.Address
}

// DaemonInitOpt is the signature a daemon init option has to fulfill.
Expand Down Expand Up @@ -85,3 +86,10 @@ func AutoSealIntervalSeconds(autoSealIntervalSeconds uint) DaemonInitOpt {
dc.AutoSealIntervalSeconds = autoSealIntervalSeconds
}
}

// DefaultAddress sets the daemons's default address to the provided address.
func DefaultAddress(address address.Address) DaemonInitOpt {
return func(dc *DaemonInitConfig) {
dc.DefaultAddress = address
}
}
8 changes: 8 additions & 0 deletions api/impl/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (nd *nodeDaemon) Init(ctx context.Context, opts ...api.DaemonInitOpt) error
}
}

if cfg.DefaultAddress != (address.Address{}) {
newConfig := rep.Config()
newConfig.Wallet.DefaultAddress = cfg.DefaultAddress
if err := rep.ReplaceConfig(newConfig); err != nil {
return err
}
}

if cfg.ClusterTest && cfg.ClusterNightly {
return fmt.Errorf(`cannot use both "--cluster-test" and "--cluster-nightly" options`)
}
Expand Down
79 changes: 62 additions & 17 deletions commands/client_daemon_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,80 @@
package commands

import (
"fmt"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/filecoin-project/go-filecoin/fixtures"
th "github.com/filecoin-project/go-filecoin/testhelpers"
"github.com/stretchr/testify/assert"
)

func TestListAsks(t *testing.T) {
t.Parallel()
assert := assert.New(t)

peer := th.NewDaemon(t, th.WithMiner(fixtures.TestMiners[0]), th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer peer.ShutdownSuccess()
d := th.NewDaemon(t, th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer d.ShutdownSuccess()
peer.ConnectSuccess(d)
minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
).Start()
defer minerDaemon.ShutdownSuccess()

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")

listAsksOutput := minerDaemon.RunSuccess("client", "list-asks").ReadStdoutTrimNewlines()
assert.Equal(fixtures.TestMiners[0]+" 000 20 11", listAsksOutput)
}

func TestStorageDealsAfterRestart(t *testing.T) {
t.Parallel()

minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
).Start()
defer minerDaemon.ShutdownSuccess()

clientDaemon := th.NewDaemon(t,
th.KeyFile(fixtures.KeyFilePaths()[1]),
th.DefaultAddress(fixtures.TestAddresses[1]),
).Start()
defer clientDaemon.ShutdownSuccess()

minerDaemon.UpdatePeerID()
minerDaemon.RunSuccess("mining", "start")

minerDaemon.ConnectSuccess(clientDaemon)

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")
dataCid := clientDaemon.RunWithStdin(strings.NewReader("HODLHODLHODL"), "client", "import").ReadStdoutTrimNewlines()

proposeDealOutput := clientDaemon.RunSuccess("client", "propose-storage-deal", fixtures.TestMiners[0], dataCid, "0", "5").ReadStdoutTrimNewlines()

splitOnSpace := strings.Split(proposeDealOutput, " ")

dealCid := splitOnSpace[len(splitOnSpace)-1]

minerDaemon.Restart()
minerDaemon.RunSuccess("mining", "start")

// create a miner with one ask
minerAddr := d.CreateMinerAddr(peer, fixtures.TestAddresses[2])
d.CreateAsk(peer, minerAddr.String(), fixtures.TestAddresses[2], "20", "10")
clientDaemon.Restart()

// check ls
expectedBaseHeight := 2
expectedExpiry := expectedBaseHeight + 10
ls := d.RunSuccess("client", "list-asks")
expectedResult := fmt.Sprintf("%s 000 20 %d", minerAddr.String(), expectedExpiry)
assert.Equal(expectedResult, strings.Trim(ls.ReadStdout(), "\n"))
minerDaemon.ConnectSuccess(clientDaemon)

var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
queryDealOutput := clientDaemon.RunSuccess("client", "query-storage-deal", dealCid).ReadStdout()
if strings.Contains(queryDealOutput, "posted") {
wg.Done()
break
}
}
}()
th.WaitTimeout(&wg, 120*time.Second)
}
11 changes: 11 additions & 0 deletions commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var initCmd = &cmds.Command{
cmdkit.StringOption(GenesisFile, "path of file or HTTP(S) URL containing archive of genesis block DAG data"),
cmdkit.StringOption(PeerKeyFile, "path of file containing key to use for new node's libp2p identity"),
cmdkit.StringOption(WithMiner, "when set, creates a custom genesis block with a pre generated miner account, requires running the daemon using dev mode (--dev)"),
cmdkit.StringOption(DefaultAddress, "when set, sets the daemons's default address to the provided address"),
cmdkit.UintOption(AutoSealIntervalSeconds, "when set to a number > 0, configures the daemon to check for and seal any staged sectors on an interval.").WithDefault(uint(120)),
cmdkit.BoolOption(ClusterTest, "when set, populates config bootstrap addrs with the dns multiaddrs of the test cluster and other test cluster specific bootstrap parameters."),
cmdkit.BoolOption(ClusterNightly, "when set, populates config bootstrap addrs with the dns multiaddrs of the nightly cluster and other nightly cluster specific bootstrap parameters"),
Expand All @@ -45,6 +46,15 @@ var initCmd = &cmds.Command{
}
}

var defaultAddress address.Address
if m, ok := req.Options[DefaultAddress].(string); ok {
var err error
defaultAddress, err = address.NewFromString(m)
if err != nil {
return err
}
}

return GetAPI(env).Daemon().Init(
req.Context,
api.RepoDir(repoDir),
Expand All @@ -54,6 +64,7 @@ var initCmd = &cmds.Command{
api.ClusterTest(clusterTest),
api.ClusterNightly(clusterNightly),
api.AutoSealIntervalSeconds(autoSealIntervalSeconds),
api.DefaultAddress(defaultAddress),
)
},
Encoders: cmds.EncoderMap{
Expand Down
3 changes: 3 additions & 0 deletions commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
// WithMiner when set, creates a custom genesis block with a pre generated miner account, requires to run the daemon using dev mode (--dev)
WithMiner = "with-miner"

// DefaultAddress when set, sets the daemons's default address to the provided address
DefaultAddress = "default-address"

// GenesisFile is the path of file containing archive of genesis block DAG data
GenesisFile = "genesisfile"

Expand Down
2 changes: 1 addition & 1 deletion node/block_propagate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions)
minerNode := MakeNodeWithChainSeed(t, seed, PeerKeyOpt(PeerKeys[0]), AutoSealIntervalSecondsOpt(1))
seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.PlumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), minerNode.PlumbingAPI)
assertions.NoError(err)
clientNode := MakeNodeWithChainSeed(t, seed)
nodes := []*Node{minerNode, clientNode}
Expand Down
8 changes: 6 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,11 @@ func (node *Node) Start(ctx context.Context) error {
node.Host(),
node.Lookup(),
node.CallQueryMethod)
node.StorageMinerClient = storage.NewClient(cni)
var err error
node.StorageMinerClient, err = storage.NewClient(cni, node.Repo.ClientDealsDatastore())
if err != nil {
return errors.Wrap(err, "Could not make new storage client")
}

node.RetrievalClient = retrieval.NewClient(node)
node.RetrievalMiner = retrieval.NewMiner(node)
Expand Down Expand Up @@ -830,7 +834,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e
return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup")
}

miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.PlumbingAPI)
miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.MinerDealsDatastore(), node.Repo.DealsAwaitingSealDatastore(), node.PlumbingAPI)
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate storage miner")
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestNodeStartMining(t *testing.T) {

seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI)
assert.NoError(err)

assert.NoError(minerNode.Start(ctx))
Expand Down
80 changes: 64 additions & 16 deletions protocol/storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ import (
"math/big"
"sync"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor"
"gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors"
"gx/ipfs/QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8/go-multistream"
"gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query"

"github.com/filecoin-project/go-filecoin/abi"
"github.com/filecoin-project/go-filecoin/actor/builtin/miner"
"github.com/filecoin-project/go-filecoin/address"
cbu "github.com/filecoin-project/go-filecoin/cborutil"
"github.com/filecoin-project/go-filecoin/lookup"
"github.com/filecoin-project/go-filecoin/repo"
"github.com/filecoin-project/go-filecoin/types"
)

Expand All @@ -29,26 +32,36 @@ type clientNode interface {
GetAskPrice(ctx context.Context, miner address.Address, askid uint64) (*types.AttoFIL, error)
}

type clientDealState struct {
Miner address.Address
Proposal *DealProposal
State *DealResponse
}

// Client is used to make deals directly with storage miners.
type Client struct {
deals map[cid.Cid]*clientDealState
dealsDs repo.Datastore
dealsLk sync.Mutex

node clientNode
}

type clientDealState struct {
miner address.Address
proposal *DealProposal
lastState *DealResponse
func init() {
cbor.RegisterCborType(clientDealState{})
}

// NewClient creaters a new storage miner client.
func NewClient(nd clientNode) *Client {
return &Client{
deals: make(map[cid.Cid]*clientDealState),
node: nd,
// NewClient creates a new storage client.
func NewClient(nd clientNode, dealsDs repo.Datastore) (*Client, error) {
smc := &Client{
deals: make(map[cid.Cid]*clientDealState),
node: nd,
dealsDs: dealsDs,
}
if err := smc.loadDeals(); err != nil {
return nil, errors.Wrap(err, "failed to load client deals")
}
return smc, nil
}

// ProposeDeal is
Expand Down Expand Up @@ -120,12 +133,11 @@ func (smc *Client) recordResponse(resp *DealResponse, miner address.Address, p *
}

smc.deals[resp.Proposal] = &clientDealState{
lastState: resp,
miner: miner,
proposal: p,
Miner: miner,
Proposal: p,
State: resp,
}

return nil
return smc.saveDeal(resp.Proposal)
}

func (smc *Client) checkDealResponse(ctx context.Context, resp *DealResponse) error {
Expand All @@ -149,7 +161,7 @@ func (smc *Client) minerForProposal(c cid.Cid) (address.Address, error) {
return address.Address{}, fmt.Errorf("no such proposal by cid: %s", c)
}

return st.miner, nil
return st.Miner, nil
}

// QueryDeal queries an in-progress proposal.
Expand Down Expand Up @@ -237,3 +249,39 @@ func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Addres

return ask.Price, nil
}

func (smc *Client) loadDeals() error {
res, err := smc.dealsDs.Query(query.Query{})
if err != nil {
return errors.Wrap(err, "failed to query deals from datastore")
}

entries, err := res.Rest()
if err != nil {
return errors.Wrap(err, "failed to load deals from datastore")
}

smc.deals = make(map[cid.Cid]*clientDealState)

for _, entry := range entries {
var deal clientDealState
if err := cbor.DecodeInto(entry.Value, &deal); err != nil {
return errors.Wrap(err, "failed to unmarshal deals from datastore")
}
smc.deals[deal.State.Proposal] = &deal
}

return nil
}

func (smc *Client) saveDeal(cid cid.Cid) error {
datum, err := cbor.DumpObject(smc.deals[cid])
if err != nil {
return errors.Wrap(err, "could not marshal storageDealState")
}
err = smc.dealsDs.Put(datastore.NewKey(cid.String()), datum)
if err != nil {
return errors.Wrap(err, "could not save client storage deal")
}
return nil
}
Loading

0 comments on commit 45c8593

Please sign in to comment.