Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rhp/v4: Update for core changes #114

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.2

require (
go.etcd.io/bbolt v1.3.11
go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268
go.sia.tech/core v0.5.1-0.20241028200253-79ba0d10d81a
go.sia.tech/mux v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268 h1:Afh3x9rg6pI183LQVYIGQ3quhrRgHWez4987JSKmTpk=
go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g=
go.sia.tech/core v0.5.1-0.20241028200253-79ba0d10d81a h1:XvbHRJczUk4ZqKcivdxxf4UoJNSi42Ee0uXTNaEROgQ=
go.sia.tech/core v0.5.1-0.20241028200253-79ba0d10d81a/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g=
go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c=
go.sia.tech/mux v1.3.0/go.mod h1:I46++RD4beqA3cW9Xm9SwXbezwPqLvHhVs9HLpDtt58=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
18 changes: 8 additions & 10 deletions rhp/v4/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice
return RPCReadSectorResult{}, fmt.Errorf("failed to write request: %w", err)
}

var resp rhp4.RPCReadSectorStreamedResponse
var resp rhp4.RPCReadSectorResponse
if err := rhp4.ReadResponse(s, &resp); err != nil {
return RPCReadSectorResult{}, fmt.Errorf("failed to read response: %w", err)
}
Expand All @@ -237,19 +237,17 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice
}

// RPCWriteSector writes a sector to the host.
func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, rl ReaderLen, duration uint64) (RPCWriteSectorResult, error) {
length, err := rl.Len()
if err != nil {
return RPCWriteSectorResult{}, fmt.Errorf("failed to get length: %w", err)
} else if length == 0 {
func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, data io.Reader, length uint64, duration uint64) (RPCWriteSectorResult, error) {
if length == 0 {
return RPCWriteSectorResult{}, errors.New("cannot write zero-length sector")
} else if length > rhp4.SectorSize {
return RPCWriteSectorResult{}, fmt.Errorf("sector length %d exceeds maximum %d", length, rhp4.SectorSize)
}

req := rhp4.RPCWriteSectorStreamingRequest{
req := rhp4.RPCWriteSectorRequest{
Prices: prices,
Token: token,
Duration: duration,
DataLength: uint64(length),
DataLength: length,
}

if err := req.Validate(t.PeerKey(), req.Duration); err != nil {
Expand All @@ -265,7 +263,7 @@ func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPric
return RPCWriteSectorResult{}, fmt.Errorf("failed to write request: %w", err)
}

sr := io.LimitReader(rl, int64(req.DataLength))
sr := io.LimitReader(data, int64(req.DataLength))
tr := io.TeeReader(sr, bw)
if req.DataLength < rhp4.SectorSize {
// if the data is less than a full sector, the reader needs to be padded
Expand Down
70 changes: 20 additions & 50 deletions rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rhp_test
import (
"bytes"
"context"
"io"
"net"
"reflect"
"strings"
Expand All @@ -21,27 +20,9 @@ import (
"go.sia.tech/coreutils/testutil"
"go.sia.tech/coreutils/wallet"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
)

type readerLen struct {
r io.Reader
length int
}

func NewReaderLen(buf []byte) rhp4.ReaderLen {
return &readerLen{r: bytes.NewReader(buf), length: len(buf)}
}

func (r *readerLen) Len() (int, error) {
return r.length, nil
}

func (r *readerLen) Read(p []byte) (int, error) {
return r.r.Read(p)
}

type fundAndSign struct {
w *wallet.SingleAddressWallet
pk types.PrivateKey
Expand Down Expand Up @@ -144,6 +125,7 @@ func mineAndSync(tb testing.TB, cm *chain.Manager, addr types.Address, n int, ti
testutil.MineBlocks(tb, cm, addr, n)

for {
time.Sleep(time.Millisecond)
equals := true
for _, tipper := range tippers {
if tipper.Tip() != cm.Tip() {
Expand All @@ -155,12 +137,10 @@ func mineAndSync(tb testing.TB, cm *chain.Manager, addr types.Address, n int, ti
if equals {
return
}
time.Sleep(time.Millisecond)
}
}

func TestSettings(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey := types.GeneratePrivateKey()

Expand Down Expand Up @@ -191,7 +171,7 @@ func TestSettings(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -221,7 +201,6 @@ func TestSettings(t *testing.T) {
}

func TestFormContract(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -252,7 +231,7 @@ func TestFormContract(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -281,7 +260,6 @@ func TestFormContract(t *testing.T) {
}

func TestFormContractBasis(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -312,7 +290,7 @@ func TestFormContractBasis(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -341,7 +319,6 @@ func TestFormContractBasis(t *testing.T) {
}

func TestRPCRefresh(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()
cm, s, w := startTestNode(t, n, genesis)
Expand Down Expand Up @@ -371,7 +348,7 @@ func TestRPCRefresh(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -455,7 +432,6 @@ func TestRPCRefresh(t *testing.T) {
}

func TestRPCRenew(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()
cm, s, w := startTestNode(t, n, genesis)
Expand Down Expand Up @@ -485,7 +461,7 @@ func TestRPCRenew(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -616,7 +592,6 @@ func TestRPCRenew(t *testing.T) {
}

func TestAccounts(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -647,7 +622,7 @@ func TestAccounts(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -718,7 +693,6 @@ func TestAccounts(t *testing.T) {
}

func TestReadWriteSector(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -749,7 +723,7 @@ func TestReadWriteSector(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -792,7 +766,7 @@ func TestReadWriteSector(t *testing.T) {
data := frand.Bytes(1024)

// store the sector
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5)
if err != nil {
t.Fatal(err)
}
Expand All @@ -815,7 +789,6 @@ func TestReadWriteSector(t *testing.T) {
}

func TestAppendSectors(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -846,7 +819,7 @@ func TestAppendSectors(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -893,7 +866,7 @@ func TestAppendSectors(t *testing.T) {
frand.Read(sector[:])
root := proto4.SectorRoot(&sector)

writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sector[:]), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sector[:]), proto4.SectorSize, 5)
if err != nil {
t.Fatal(err)
} else if writeResult.Root != root {
Expand Down Expand Up @@ -933,7 +906,6 @@ func TestAppendSectors(t *testing.T) {
}

func TestVerifySector(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -964,7 +936,7 @@ func TestVerifySector(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -1007,7 +979,7 @@ func TestVerifySector(t *testing.T) {
data := frand.Bytes(1024)

// store the sector
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1027,7 +999,6 @@ func TestVerifySector(t *testing.T) {
}

func TestRPCFreeSectors(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -1058,7 +1029,7 @@ func TestRPCFreeSectors(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -1104,7 +1075,7 @@ func TestRPCFreeSectors(t *testing.T) {
data := frand.Bytes(1024)

// store the sector
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1151,7 +1122,6 @@ func TestRPCFreeSectors(t *testing.T) {
}

func TestRPCSectorRoots(t *testing.T) {
log := zaptest.NewLogger(t)
n, genesis := testutil.V2Network()
hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey()

Expand Down Expand Up @@ -1182,7 +1152,7 @@ func TestRPCSectorRoots(t *testing.T) {
ss := testutil.NewEphemeralSectorStore()
c := testutil.NewEphemeralContractor(cm)

transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log)
transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop())

settings, err := rhp4.RPCSettings(context.Background(), transport)
if err != nil {
Expand Down Expand Up @@ -1246,7 +1216,7 @@ func TestRPCSectorRoots(t *testing.T) {
data := frand.Bytes(1024)

// store the sector
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1344,7 +1314,7 @@ func BenchmarkWrite(b *testing.B) {

for i := 0; i < b.N; i++ {
// store the sector
_, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sectors[i][:]), 5)
_, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sectors[i][:]), proto4.SectorSize, 5)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1429,7 +1399,7 @@ func BenchmarkRead(b *testing.B) {
sectors = append(sectors, sector)

// store the sector
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sectors[i][:]), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sectors[i][:]), proto4.SectorSize, 5)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1541,7 +1511,7 @@ func BenchmarkContractUpload(b *testing.B) {
wg.Add(1)
go func(i int) {
defer wg.Done()
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sectors[i][:]), 5)
writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sectors[i][:]), proto4.SectorSize, 5)
if err != nil {
b.Error(err)
} else if writeResult.Root != roots[i] {
Expand Down
15 changes: 10 additions & 5 deletions rhp/v4/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,19 @@ func (s *Server) handleRPCReadSector(stream net.Conn, log *zap.Logger) error {
proof := rhp4.BuildSectorProof(sector, start, end)
lap("build proof")

return rhp4.WriteResponse(stream, &rhp4.RPCReadSectorResponse{
Sector: segment,
Proof: proof,
})
if err := rhp4.WriteResponse(stream, &rhp4.RPCReadSectorResponse{
Proof: proof,
DataLength: uint64(len(segment)),
}); err != nil {
return fmt.Errorf("failed to write response: %w", err)
} else if _, err := stream.Write(segment); err != nil {
return fmt.Errorf("failed to write sector data: %w", err)
}
return nil
}

func (s *Server) handleRPCWriteSector(stream net.Conn) error {
var req rhp4.RPCWriteSectorStreamingRequest
var req rhp4.RPCWriteSectorRequest
if err := rhp4.ReadRequest(stream, &req); err != nil {
return errorDecodingError("failed to read request: %v", err)
}
Expand Down
Loading