diff --git a/go.mod b/go.mod index c5f5af2..d538e6b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.23.2 require ( go.etcd.io/bbolt v1.3.11 - go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268 + go.sia.tech/core v0.5.1-0.20241028200253-79ba0d10d81a go.sia.tech/mux v1.3.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.28.0 diff --git a/go.sum b/go.sum index e6091da..c3a1430 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= -go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268 h1:Afh3x9rg6pI183LQVYIGQ3quhrRgHWez4987JSKmTpk= -go.sia.tech/core v0.5.1-0.20241028140321-8319d4147268/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g= +go.sia.tech/core v0.5.1-0.20241028200253-79ba0d10d81a h1:XvbHRJczUk4ZqKcivdxxf4UoJNSi42Ee0uXTNaEROgQ= +go.sia.tech/core v0.5.1-0.20241028200253-79ba0d10d81a/go.mod h1:P3C1BWa/7J4XgdzWuaYHBvLo2RzZ0UBaJM4TG1GWB2g= go.sia.tech/mux v1.3.0 h1:hgR34IEkqvfBKUJkAzGi31OADeW2y7D6Bmy/Jcbop9c= go.sia.tech/mux v1.3.0/go.mod h1:I46++RD4beqA3cW9Xm9SwXbezwPqLvHhVs9HLpDtt58= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/rhp/v4/rpc.go b/rhp/v4/rpc.go index bbdb169..740d206 100644 --- a/rhp/v4/rpc.go +++ b/rhp/v4/rpc.go @@ -216,7 +216,7 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice return RPCReadSectorResult{}, fmt.Errorf("failed to write request: %w", err) } - var resp rhp4.RPCReadSectorStreamedResponse + var resp rhp4.RPCReadSectorResponse if err := rhp4.ReadResponse(s, &resp); err != nil { return RPCReadSectorResult{}, fmt.Errorf("failed to read response: %w", err) } @@ -237,19 +237,17 @@ func RPCReadSector(ctx context.Context, t TransportClient, prices rhp4.HostPrice } // RPCWriteSector writes a sector to the host. -func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, rl ReaderLen, duration uint64) (RPCWriteSectorResult, error) { - length, err := rl.Len() - if err != nil { - return RPCWriteSectorResult{}, fmt.Errorf("failed to get length: %w", err) - } else if length == 0 { +func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPrices, token rhp4.AccountToken, data io.Reader, length uint64, duration uint64) (RPCWriteSectorResult, error) { + if length == 0 { return RPCWriteSectorResult{}, errors.New("cannot write zero-length sector") + } else if length > rhp4.SectorSize { + return RPCWriteSectorResult{}, fmt.Errorf("sector length %d exceeds maximum %d", length, rhp4.SectorSize) } - - req := rhp4.RPCWriteSectorStreamingRequest{ + req := rhp4.RPCWriteSectorRequest{ Prices: prices, Token: token, Duration: duration, - DataLength: uint64(length), + DataLength: length, } if err := req.Validate(t.PeerKey(), req.Duration); err != nil { @@ -265,7 +263,7 @@ func RPCWriteSector(ctx context.Context, t TransportClient, prices rhp4.HostPric return RPCWriteSectorResult{}, fmt.Errorf("failed to write request: %w", err) } - sr := io.LimitReader(rl, int64(req.DataLength)) + sr := io.LimitReader(data, int64(req.DataLength)) tr := io.TeeReader(sr, bw) if req.DataLength < rhp4.SectorSize { // if the data is less than a full sector, the reader needs to be padded diff --git a/rhp/v4/rpc_test.go b/rhp/v4/rpc_test.go index 969fccf..2f75957 100644 --- a/rhp/v4/rpc_test.go +++ b/rhp/v4/rpc_test.go @@ -3,7 +3,6 @@ package rhp_test import ( "bytes" "context" - "io" "net" "reflect" "strings" @@ -21,27 +20,9 @@ import ( "go.sia.tech/coreutils/testutil" "go.sia.tech/coreutils/wallet" "go.uber.org/zap" - "go.uber.org/zap/zaptest" "lukechampine.com/frand" ) -type readerLen struct { - r io.Reader - length int -} - -func NewReaderLen(buf []byte) rhp4.ReaderLen { - return &readerLen{r: bytes.NewReader(buf), length: len(buf)} -} - -func (r *readerLen) Len() (int, error) { - return r.length, nil -} - -func (r *readerLen) Read(p []byte) (int, error) { - return r.r.Read(p) -} - type fundAndSign struct { w *wallet.SingleAddressWallet pk types.PrivateKey @@ -144,6 +125,7 @@ func mineAndSync(tb testing.TB, cm *chain.Manager, addr types.Address, n int, ti testutil.MineBlocks(tb, cm, addr, n) for { + time.Sleep(time.Millisecond) equals := true for _, tipper := range tippers { if tipper.Tip() != cm.Tip() { @@ -155,12 +137,10 @@ func mineAndSync(tb testing.TB, cm *chain.Manager, addr types.Address, n int, ti if equals { return } - time.Sleep(time.Millisecond) } } func TestSettings(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey := types.GeneratePrivateKey() @@ -191,7 +171,7 @@ func TestSettings(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -221,7 +201,6 @@ func TestSettings(t *testing.T) { } func TestFormContract(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -252,7 +231,7 @@ func TestFormContract(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -281,7 +260,6 @@ func TestFormContract(t *testing.T) { } func TestFormContractBasis(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -312,7 +290,7 @@ func TestFormContractBasis(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -341,7 +319,6 @@ func TestFormContractBasis(t *testing.T) { } func TestRPCRefresh(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() cm, s, w := startTestNode(t, n, genesis) @@ -371,7 +348,7 @@ func TestRPCRefresh(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -455,7 +432,6 @@ func TestRPCRefresh(t *testing.T) { } func TestRPCRenew(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() cm, s, w := startTestNode(t, n, genesis) @@ -485,7 +461,7 @@ func TestRPCRenew(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -616,7 +592,6 @@ func TestRPCRenew(t *testing.T) { } func TestAccounts(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -647,7 +622,7 @@ func TestAccounts(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -718,7 +693,6 @@ func TestAccounts(t *testing.T) { } func TestReadWriteSector(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -749,7 +723,7 @@ func TestReadWriteSector(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -792,7 +766,7 @@ func TestReadWriteSector(t *testing.T) { data := frand.Bytes(1024) // store the sector - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5) if err != nil { t.Fatal(err) } @@ -815,7 +789,6 @@ func TestReadWriteSector(t *testing.T) { } func TestAppendSectors(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -846,7 +819,7 @@ func TestAppendSectors(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -893,7 +866,7 @@ func TestAppendSectors(t *testing.T) { frand.Read(sector[:]) root := proto4.SectorRoot(§or) - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sector[:]), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sector[:]), proto4.SectorSize, 5) if err != nil { t.Fatal(err) } else if writeResult.Root != root { @@ -933,7 +906,6 @@ func TestAppendSectors(t *testing.T) { } func TestVerifySector(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -964,7 +936,7 @@ func TestVerifySector(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -1007,7 +979,7 @@ func TestVerifySector(t *testing.T) { data := frand.Bytes(1024) // store the sector - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5) if err != nil { t.Fatal(err) } @@ -1027,7 +999,6 @@ func TestVerifySector(t *testing.T) { } func TestRPCFreeSectors(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -1058,7 +1029,7 @@ func TestRPCFreeSectors(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -1104,7 +1075,7 @@ func TestRPCFreeSectors(t *testing.T) { data := frand.Bytes(1024) // store the sector - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5) if err != nil { t.Fatal(err) } @@ -1151,7 +1122,6 @@ func TestRPCFreeSectors(t *testing.T) { } func TestRPCSectorRoots(t *testing.T) { - log := zaptest.NewLogger(t) n, genesis := testutil.V2Network() hostKey, renterKey := types.GeneratePrivateKey(), types.GeneratePrivateKey() @@ -1182,7 +1152,7 @@ func TestRPCSectorRoots(t *testing.T) { ss := testutil.NewEphemeralSectorStore() c := testutil.NewEphemeralContractor(cm) - transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, log) + transport := testRenterHostPair(t, hostKey, cm, s, w, c, sr, ss, zap.NewNop()) settings, err := rhp4.RPCSettings(context.Background(), transport) if err != nil { @@ -1246,7 +1216,7 @@ func TestRPCSectorRoots(t *testing.T) { data := frand.Bytes(1024) // store the sector - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(data), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(data), uint64(len(data)), 5) if err != nil { t.Fatal(err) } @@ -1344,7 +1314,7 @@ func BenchmarkWrite(b *testing.B) { for i := 0; i < b.N; i++ { // store the sector - _, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sectors[i][:]), 5) + _, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sectors[i][:]), proto4.SectorSize, 5) if err != nil { b.Fatal(err) } @@ -1429,7 +1399,7 @@ func BenchmarkRead(b *testing.B) { sectors = append(sectors, sector) // store the sector - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sectors[i][:]), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sectors[i][:]), proto4.SectorSize, 5) if err != nil { b.Fatal(err) } @@ -1541,7 +1511,7 @@ func BenchmarkContractUpload(b *testing.B) { wg.Add(1) go func(i int) { defer wg.Done() - writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, NewReaderLen(sectors[i][:]), 5) + writeResult, err := rhp4.RPCWriteSector(context.Background(), transport, settings.Prices, token, bytes.NewReader(sectors[i][:]), proto4.SectorSize, 5) if err != nil { b.Error(err) } else if writeResult.Root != roots[i] { diff --git a/rhp/v4/server.go b/rhp/v4/server.go index fb41573..af45916 100644 --- a/rhp/v4/server.go +++ b/rhp/v4/server.go @@ -217,14 +217,19 @@ func (s *Server) handleRPCReadSector(stream net.Conn, log *zap.Logger) error { proof := rhp4.BuildSectorProof(sector, start, end) lap("build proof") - return rhp4.WriteResponse(stream, &rhp4.RPCReadSectorResponse{ - Sector: segment, - Proof: proof, - }) + if err := rhp4.WriteResponse(stream, &rhp4.RPCReadSectorResponse{ + Proof: proof, + DataLength: uint64(len(segment)), + }); err != nil { + return fmt.Errorf("failed to write response: %w", err) + } else if _, err := stream.Write(segment); err != nil { + return fmt.Errorf("failed to write sector data: %w", err) + } + return nil } func (s *Server) handleRPCWriteSector(stream net.Conn) error { - var req rhp4.RPCWriteSectorStreamingRequest + var req rhp4.RPCWriteSectorRequest if err := rhp4.ReadRequest(stream, &req); err != nil { return errorDecodingError("failed to read request: %v", err) }