Skip to content

Commit

Permalink
Deals Persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
rosalinekarr authored and rkowalick committed Jan 2, 2019
1 parent dcd4398 commit 81d3361
Show file tree
Hide file tree
Showing 17 changed files with 471 additions and 148 deletions.
8 changes: 8 additions & 0 deletions api/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type DaemonInitConfig struct {
ClusterNightly bool
// AutoSealIntervalSeconds, when set, configures the daemon to check for and seal any staged sectors on an interval
AutoSealIntervalSeconds uint
DefaultAddress address.Address
}

// DaemonInitOpt is the signature a daemon init option has to fulfill.
Expand Down Expand Up @@ -85,3 +86,10 @@ func AutoSealIntervalSeconds(autoSealIntervalSeconds uint) DaemonInitOpt {
dc.AutoSealIntervalSeconds = autoSealIntervalSeconds
}
}

// DefaultAddress sets the daemons's default address to the provided address.
func DefaultAddress(address address.Address) DaemonInitOpt {
return func(dc *DaemonInitConfig) {
dc.DefaultAddress = address
}
}
8 changes: 8 additions & 0 deletions api/impl/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (nd *nodeDaemon) Init(ctx context.Context, opts ...api.DaemonInitOpt) error
}
}

if cfg.DefaultAddress != (address.Address{}) {
newConfig := rep.Config()
newConfig.Wallet.DefaultAddress = cfg.DefaultAddress
if err := rep.ReplaceConfig(newConfig); err != nil {
return err
}
}

if cfg.ClusterTest && cfg.ClusterNightly {
return fmt.Errorf(`cannot use both "--cluster-test" and "--cluster-nightly" options`)
}
Expand Down
2 changes: 1 addition & 1 deletion api2/porcelain/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/go-filecoin/address"
"github.com/filecoin-project/go-filecoin/types"
vmErrors "github.com/filecoin-project/go-filecoin/vm/errors"
"github.com/pkg/errors"
"gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors"
)

// plumbing is the subset of the PlumbingAPI that MessageSendWithRetry uses.
Expand Down
67 changes: 52 additions & 15 deletions commands/client_daemon_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package commands

import (
"fmt"
"github.com/stretchr/testify/require"
"strings"
"testing"

Expand All @@ -15,21 +15,58 @@ func TestListAsks(t *testing.T) {
t.Parallel()
assert := assert.New(t)

peer := th.NewDaemon(t, th.WithMiner(fixtures.TestMiners[0]), th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer peer.ShutdownSuccess()
d := th.NewDaemon(t, th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer d.ShutdownSuccess()
peer.ConnectSuccess(d)
minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
).Start()
defer minerDaemon.ShutdownSuccess()

// create a miner with one ask
minerAddr := d.CreateMinerAddr(peer, fixtures.TestAddresses[2])
d.CreateAsk(peer, minerAddr.String(), fixtures.TestAddresses[2], "20", "10")
minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")

// check ls
expectedBaseHeight := 2
expectedExpiry := expectedBaseHeight + 10
ls := d.RunSuccess("client", "list-asks")
expectedResult := fmt.Sprintf("%s 000 20 %d", minerAddr.String(), expectedExpiry)
assert.Equal(expectedResult, strings.Trim(ls.ReadStdout(), "\n"))
listAsksOutput := minerDaemon.RunSuccess("client", "list-asks").ReadStdoutTrimNewlines()
assert.Equal(fixtures.TestMiners[0]+" 000 20 11", listAsksOutput)
}

func TestQueryStorageDeal(t *testing.T) {
t.Parallel()

minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
).Start()
defer minerDaemon.ShutdownSuccess()

clientDaemon := th.NewDaemon(t,
th.KeyFile(fixtures.KeyFilePaths()[1]),
th.DefaultAddress(fixtures.TestAddresses[1]),
).Start()
defer clientDaemon.ShutdownSuccess()

require.Equal(t, minerDaemon.GetDefaultAddress(), fixtures.TestAddresses[0])
require.Equal(t, clientDaemon.GetDefaultAddress(), fixtures.TestAddresses[1])

minerDaemon.UpdatePeerID()
minerDaemon.RunSuccess("mining", "start")

minerDaemon.ConnectSuccess(clientDaemon)

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")
dataCid := clientDaemon.RunWithStdin(strings.NewReader("HODLHODLHODL"), "client", "import").ReadStdoutTrimNewlines()

proposeDealOutput := clientDaemon.RunSuccess("client", "propose-storage-deal", fixtures.TestMiners[0], dataCid, "0", "5").ReadStdoutTrimNewlines()

splitOnSpace := strings.Split(proposeDealOutput, " ")

dealCid := splitOnSpace[len(splitOnSpace)-1]

minerDaemon.Restart()
minerDaemon.RunSuccess("mining", "start")

clientDaemon.Restart()

minerDaemon.ConnectSuccess(clientDaemon)

clientDaemon.RunSuccess("client", "query-storage-deal", dealCid).ReadStdout()
}
11 changes: 11 additions & 0 deletions commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var initCmd = &cmds.Command{
cmdkit.StringOption(GenesisFile, "path of file or HTTP(S) URL containing archive of genesis block DAG data"),
cmdkit.StringOption(PeerKeyFile, "path of file containing key to use for new node's libp2p identity"),
cmdkit.StringOption(WithMiner, "when set, creates a custom genesis block with a pre generated miner account, requires running the daemon using dev mode (--dev)"),
cmdkit.StringOption(DefaultAddress, "when set, sets the daemons's default address to the provided address"),
cmdkit.UintOption(AutoSealIntervalSeconds, "when set to a number > 0, configures the daemon to check for and seal any staged sectors on an interval.").WithDefault(uint(120)),
cmdkit.BoolOption(ClusterTest, "when set, populates config bootstrap addrs with the dns multiaddrs of the test cluster and other test cluster specific bootstrap parameters."),
cmdkit.BoolOption(ClusterNightly, "when set, populates config bootstrap addrs with the dns multiaddrs of the nightly cluster and other nightly cluster specific bootstrap parameters"),
Expand All @@ -45,6 +46,15 @@ var initCmd = &cmds.Command{
}
}

var defaultAddress address.Address
if m, ok := req.Options[DefaultAddress].(string); ok {
var err error
defaultAddress, err = address.NewFromString(m)
if err != nil {
return err
}
}

return GetAPI(env).Daemon().Init(
req.Context,
api.RepoDir(repoDir),
Expand All @@ -54,6 +64,7 @@ var initCmd = &cmds.Command{
api.ClusterTest(clusterTest),
api.ClusterNightly(clusterNightly),
api.AutoSealIntervalSeconds(autoSealIntervalSeconds),
api.DefaultAddress(defaultAddress),
)
},
Encoders: cmds.EncoderMap{
Expand Down
3 changes: 3 additions & 0 deletions commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
// WithMiner when set, creates a custom genesis block with a pre generated miner account, requires to run the daemon using dev mode (--dev)
WithMiner = "with-miner"

// DefaultAddress when set, sets the daemons's default address to the provided address
DefaultAddress = "default-address"

// GenesisFile is the path of file containing archive of genesis block DAG data
GenesisFile = "genesisfile"

Expand Down
2 changes: 1 addition & 1 deletion functional-tests/retrieval
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ else
dd if=/dev/urandom of="${PIECE_1_PATH}" bs=1024 count=500
fi

trap finish EXIT
# trap finish EXIT

MN_REPO_DIR=$(mktemp -d)
MN_CMDAPI_PORT=$(free_port)
Expand Down
8 changes: 6 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ func (node *Node) Start(ctx context.Context) error {
node.Host(),
node.Lookup(),
node.CallQueryMethod)
node.StorageMinerClient = storage.NewClient(cni)
var err error
node.StorageMinerClient, err = storage.NewClient(cni, node.Repo.ClientDealsDatastore())
if err != nil {
return errors.Wrap(err, "Could not make new storage client")
}

node.RetrievalClient = retrieval.NewClient(node)
node.RetrievalMiner = retrieval.NewMiner(node)
Expand Down Expand Up @@ -816,7 +820,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e
return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup")
}

miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.PlumbingAPI)
miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.MinerDealsDatastore(), node.PlumbingAPI)
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate storage miner")
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestNodeStartMining(t *testing.T) {

seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), plumbingAPI)
assert.NoError(err)

assert.NoError(minerNode.Start(ctx))
Expand Down
94 changes: 74 additions & 20 deletions protocol/storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package storage
import (
"context"
"fmt"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query"
"math/big"
"sync"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor"
"gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors"
"gx/ipfs/QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8/go-multistream"
"gx/ipfs/QmahxMNoNuSsgQefo9rkpcfRFmQrMN6Q99aztKXf63K7YJ/go-libp2p-host"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore"

"github.com/filecoin-project/go-filecoin/abi"
"github.com/filecoin-project/go-filecoin/actor/builtin/miner"
"github.com/filecoin-project/go-filecoin/address"
cbu "github.com/filecoin-project/go-filecoin/cborutil"
"github.com/filecoin-project/go-filecoin/lookup"
"github.com/filecoin-project/go-filecoin/repo"
"github.com/filecoin-project/go-filecoin/types"
)

Expand All @@ -29,26 +32,43 @@ type clientNode interface {
GetAskPrice(ctx context.Context, miner address.Address, askid uint64) (*types.AttoFIL, error)
}

type clientDealState struct {
Miner address.Address
Proposal *DealProposal
State *DealResponse
}

// Client is used to make deals directly with storage miners.
type Client struct {
deals map[cid.Cid]*clientDealState
dealsLk sync.Mutex
deals map[cid.Cid]*clientDealState
ClientDealsDs repo.Datastore
dealsLk sync.Mutex

node clientNode
}

type clientDealState struct {
miner address.Address
proposal *DealProposal
lastState *DealResponse
func init() {
cbor.RegisterCborType(clientDealState{})
}

// NewClient creaters a new storage miner client.
func NewClient(nd clientNode) *Client {
return &Client{
deals: make(map[cid.Cid]*clientDealState),
node: nd,
// NewClient creates a new storage client.
func NewClient(nd clientNode, dealsDs repo.Datastore) (*Client, error) {
if dealsDs == nil {
fmt.Println("dub tee eff")
}
smc := &Client{
deals: make(map[cid.Cid]*clientDealState),
node: nd,
ClientDealsDs: dealsDs,
}
loadedDeals, err := listClientDeals(smc.ClientDealsDs)
if err != nil {
return nil, nil
}
for _, deal := range loadedDeals {
smc.deals[deal.State.Proposal] = deal
}
return smc, nil
}

// ProposeDeal is
Expand Down Expand Up @@ -119,10 +139,19 @@ func (smc *Client) recordResponse(resp *DealResponse, miner address.Address, p *
return fmt.Errorf("deal [%s] is already in progress", resp.Proposal.String())
}

smc.deals[resp.Proposal] = &clientDealState{
lastState: resp,
miner: miner,
proposal: p,
thing := &clientDealState{
Miner: miner,
Proposal: p,
State: resp,
}
smc.deals[resp.Proposal] = thing
datum, err := cbor.DumpObject(smc.deals[resp.Proposal])
if err != nil {
return errors.Wrap(err, "could not marshal storageDealState")
}
err = smc.ClientDealsDs.Put(datastore.NewKey(resp.Proposal.String()), datum)
if err != nil {
return errors.Wrap(err, "could not save client storage deal")
}

return nil
Expand All @@ -144,15 +173,16 @@ func (smc *Client) checkDealResponse(ctx context.Context, resp *DealResponse) er
func (smc *Client) minerForProposal(c cid.Cid) (address.Address, error) {
smc.dealsLk.Lock()
defer smc.dealsLk.Unlock()
fmt.Println(smc.deals)
st, ok := smc.deals[c]
if !ok {
return address.Address{}, fmt.Errorf("no such proposal by cid: %s", c)
return address.Address{}, fmt.Errorf("no such Proposal by cid: %s", c)
}

return st.miner, nil
return st.Miner, nil
}

// QueryDeal queries an in-progress proposal.
// QueryDeal queries an in-progress Proposal.
func (smc *Client) QueryDeal(ctx context.Context, proposalCid cid.Cid) (*DealResponse, error) {
mineraddr, err := smc.minerForProposal(proposalCid)
if err != nil {
Expand Down Expand Up @@ -217,7 +247,7 @@ func (cni *ClientNodeImpl) Lookup() lookup.PeerLookupService {
return cni.lookup
}

// GetAskPrice returns the price of the ask referenced by 'askid' on miner 'maddr'
// GetAskPrice returns the price of the ask referenced by 'askid' on Miner 'maddr'
func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Address, askid uint64) (*types.AttoFIL, error) {
args, err := abi.ToEncodedValues(big.NewInt(0).SetUint64(askid))
if err != nil {
Expand All @@ -237,3 +267,27 @@ func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Addres

return ask.Price, nil
}

func listClientDeals(ds repo.Datastore) ([]*clientDealState, error) {
res, err := ds.Query(query.Query{})
if err != nil {
return nil, errors.Wrap(err, "failed to query deals from datastore")
}

entries, err := res.Rest()
if err != nil {
return nil, errors.Wrap(err, "failed to load deals from datastore")
}

values := make([]*clientDealState, len(entries))

for i, entry := range entries {
var thing clientDealState
if err := cbor.DecodeInto(entry.Value, &thing); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal deals from datastore")
}
values[i] = &thing
}

return values, nil
}
Loading

0 comments on commit 81d3361

Please sign in to comment.