Skip to content

Commit

Permalink
rhp/v4: RPC read sector write to writer
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Sep 17, 2024
1 parent 2bad9a0 commit cce2d6b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
go.etcd.io/bbolt v1.3.11
go.sia.tech/core v0.4.7-0.20240917165851-057286af3cdc
go.sia.tech/core v0.4.7-0.20240917173008-41a66391e272
go.sia.tech/mux v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.27.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.sia.tech/core v0.4.7-0.20240917165851-057286af3cdc h1:nndMsekNg6lm4IytkILa53NYia3zqeIEdRqAgk7XM/E=
go.sia.tech/core v0.4.7-0.20240917165851-057286af3cdc/go.mod h1:j2Ke8ihV8or7d2VDrFZWcCkwSVHO0DNMQJAGs9Qop2M=
go.sia.tech/core v0.4.7-0.20240917173008-41a66391e272 h1:IGlUTj7jhvum5oQ0mUoZiaDmB5ErPODWW5Wc/P1MAJQ=
go.sia.tech/core v0.4.7-0.20240917173008-41a66391e272/go.mod h1:j2Ke8ihV8or7d2VDrFZWcCkwSVHO0DNMQJAGs9Qop2M=
go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c=
go.sia.tech/mux v1.3.0/go.mod h1:I46++RD4beqA3cW9Xm9SwXbezwPqLvHhVs9HLpDtt58=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
24 changes: 16 additions & 8 deletions rhp/v4/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"

"go.sia.tech/core/consensus"
Expand Down Expand Up @@ -89,7 +90,7 @@ func RPCSettings(ctx context.Context, t TransportClient) (rhp4.HostSettings, err
}

// RPCReadSector reads a sector from the host
func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, root types.Hash256, offset, length uint64) ([]byte, error) {
func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, root types.Hash256, offset, length uint64, w io.Writer) error {
req := &rhp4.RPCReadSectorRequest{
Prices: prices,
Token: token,
Expand All @@ -98,31 +99,38 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice
Length: length,
}
if err := req.Validate(); err != nil {
return nil, fmt.Errorf("invalid request: %w", err)
return fmt.Errorf("invalid request: %w", err)
}

s := t.DialStream(ctx)
defer s.Close()

if err := rhp4.WriteRequest(s, rhp4.RPCReadSectorID, req); err != nil {
return nil, fmt.Errorf("failed to write request: %w", err)
return fmt.Errorf("failed to write request: %w", err)
}

var resp rhp4.RPCReadSectorResponse
var resp rhp4.RPCReadSectorStreamedResponse
if err := rhp4.ReadResponse(s, &resp); err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
return fmt.Errorf("failed to read response: %w", err)
}
// TODO: verify proof + stream to writer
return resp.Sector, nil

// TODO: verify proof
n, err := io.Copy(w, io.LimitReader(s, int64(resp.DataLength)))
if err != nil {
return fmt.Errorf("failed to read data: %w", err)
} else if n != int64(resp.DataLength) {
return io.ErrUnexpectedEOF
}
return nil
}

// RPCWriteSector writes a sector to the host
func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, data []byte, duration uint64) (types.Hash256, error) {
req := &rhp4.RPCWriteSectorRequest{
Prices: prices,
Token: token,
Sector: data,
Duration: duration,
Sector: data,
}

if err := req.Validate(); err != nil {
Expand Down
11 changes: 7 additions & 4 deletions rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,10 +785,11 @@ func TestReadWriteSector(t *testing.T) {
}

// read the sector back
buf, err := rhp4.RPCReadSector(context.Background(), transport, settings.Prices, token, root, 0, 64)
buf := bytes.NewBuffer(nil)
err = rhp4.RPCReadSector(context.Background(), transport, settings.Prices, token, root, 0, 64, buf)
if err != nil {
t.Fatal(err)
} else if !bytes.Equal(buf, data[:64]) {
} else if !bytes.Equal(buf.Bytes(), data[:64]) {
t.Fatal("data mismatch")
}
}
Expand Down Expand Up @@ -1233,12 +1234,14 @@ func BenchmarkRead(b *testing.B) {
b.ReportAllocs()
b.SetBytes(proto4.SectorSize)

buf := bytes.NewBuffer(make([]byte, 0, proto4.SectorSize))
for i := 0; i < b.N; i++ {
buf.Reset()
// store the sector
buf, err := rhp4.RPCReadSector(context.Background(), transport, settings.Prices, token, roots[i], 0, proto4.SectorSize)
err := rhp4.RPCReadSector(context.Background(), transport, settings.Prices, token, roots[i], 0, proto4.SectorSize, buf)
if err != nil {
b.Fatal(err)
} else if !bytes.Equal(buf, sectors[i][:]) {
} else if !bytes.Equal(buf.Bytes(), sectors[i][:]) {
b.Fatal("data mismatch")
}
}
Expand Down

0 comments on commit cce2d6b

Please sign in to comment.