Skip to content

Commit

Permalink
ndndpdk-godemo: ls & fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Aug 16, 2023
1 parent 3735614 commit b1080e8
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 87 deletions.
2 changes: 1 addition & 1 deletion app/fetch/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package fetch simulates bulk file transfer traffic patterns.
// Package fetch retrieves segmented objects.
package fetch

/*
Expand Down
45 changes: 15 additions & 30 deletions app/fileserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
iofs "io/fs"
"math"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -59,16 +58,6 @@ func (f *FileServerFixture) RetrieveMetadataOpts(name string, opts endpoint.Cons
return
}

func (FileServerFixture) LastSeg(t testing.TB, finalBlock ndn.NameComponent) (lastSeg tlv.NNI) {
assert, _ := makeAR(t)
lastSeg = math.MaxUint64
if assert.True(finalBlock.Valid()) {
assert.EqualValues(an.TtSegmentNameComponent, finalBlock.Type)
assert.NoError(lastSeg.UnmarshalBinary(finalBlock.Value))
}
return
}

func (FileServerFixture) ChangeVersion(name ndn.Name, f func(uint64) uint64) ndn.Name {
name = slices.Clone(name)
versionComp := &name[len(name)-1]
Expand All @@ -80,14 +69,12 @@ func (FileServerFixture) ChangeVersion(name ndn.Name, f func(uint64) uint64) ndn
return name
}

func (f *FileServerFixture) FetchPayload(name ndn.Name, lastSeg *tlv.NNI) (payload []byte, e error) {
func (f *FileServerFixture) FetchPayload(name ndn.Name, segmentEnd uint64) (payload []byte, e error) {
opts := segmented.FetchOptions{
RetxLimit: 3,
MaxCwnd: 256,
}
if lastSeg != nil {
opts.SegmentEnd = 1 + uint64(*lastSeg)
}
opts.SegmentEnd = segmentEnd
return f.FetchPayloadOpts(name, opts)
}

Expand All @@ -98,7 +85,7 @@ func (f *FileServerFixture) FetchPayloadOpts(name ndn.Name, opts segmented.Fetch
}

func (f *FileServerFixture) ListDirectory(name ndn.Name) (ls ndn6file.DirectoryListing, e error) {
payload, e := f.FetchPayload(name, nil)
payload, e := f.FetchPayload(name, 0)
if e != nil {
return ls, e
}
Expand Down Expand Up @@ -170,16 +157,16 @@ func TestServer(t *testing.T) {
m, e := f.RetrieveMetadata(tt.Name)
require.NoError(e)

lastSeg := f.LastSeg(t, m.FinalBlock)
segmentEnd := m.SegmentEnd()
assert.NotZero(segmentEnd)
assert.EqualValues(cfg.SegmentLen, m.SegmentSize)
assert.EqualValues(len(content), m.Size)
assert.False(m.Mtime.IsZero())

fetcherLastSeg := &lastSeg
if !tt.SetSegmentEnd {
fetcherLastSeg = nil
segmentEnd = 0
}
payload, e := f.FetchPayload(m.Name, fetcherLastSeg)
payload, e := f.FetchPayload(m.Name, segmentEnd)
require.NoError(e)
assert.Len(payload, len(content))
assert.Equal(digest, sha256.Sum256(payload))
Expand Down Expand Up @@ -563,9 +550,9 @@ func TestFuse(t *testing.T) {
m, e := f.RetrieveMetadata("/fs/X")
require.NoError(e)
assert.EqualValues(0, m.Size)
assert.EqualValues(0, f.LastSeg(t, m.FinalBlock))
assert.EqualValues(1, m.SegmentEnd())

payload, e := f.FetchPayload(m.Name, nil)
payload, e := f.FetchPayload(m.Name, 0)
require.NoError(e)
assert.Len(payload, 0)
})
Expand All @@ -579,11 +566,11 @@ func TestFuse(t *testing.T) {
assert.True(m.IsFile())
assert.False(m.IsDir())
assert.EqualValues(len(fs.payloadY), m.Size)
assert.EqualValues(15, f.LastSeg(t, m.FinalBlock))
assert.EqualValues(16, m.SegmentEnd())
assert.Equal(fs.ctime, m.Ctime)
assert.Equal(fs.mtime, m.Mtime)

payload, e := f.FetchPayload(m.Name, nil)
payload, e := f.FetchPayload(m.Name, 0)
require.NoError(e)
assert.Equal(fs.payloadY, payload)

Expand Down Expand Up @@ -611,8 +598,7 @@ func TestFuse(t *testing.T) {
assert, _ := makeAR(t)

name := f.ChangeVersion(m.Name, func(version uint64) uint64 { return uint64(f.p.VersionBypassHi)<<32 | uint64(rand.Uint32()) })
lastSeg := tlv.NNI(0)
_, e := f.FetchPayload(name, &lastSeg)
_, e := f.FetchPayload(name, 1)
assert.NoError(e)
})
})
Expand All @@ -624,12 +610,11 @@ func TestFuse(t *testing.T) {
m, e := f.RetrieveMetadata("/fs/Z")
require.NoError(e)
assert.EqualValues(fs.sizeZ, m.Size)
assert.EqualValues(79999, f.LastSeg(t, m.FinalBlock))
assert.EqualValues(80000, m.SegmentEnd())
assert.Equal(fs.ctime, m.Ctime)
assert.Equal(fs.mtime, m.Mtime)

lastSeg := tlv.NNI(3)
payload, e := f.FetchPayload(m.Name, &lastSeg)
payload, e := f.FetchPayload(m.Name, 4)
if assert.NoError(e) {
assert.Len(payload, cfg.SegmentLen*4)
}
Expand All @@ -638,7 +623,7 @@ func TestFuse(t *testing.T) {
fs.mtimeZ.Store(mtimeZ.UnixNano())
time.Sleep(2 * cfg.StatValidity.Duration())
var fetchOpts segmented.FetchOptions
fetchOpts.SegmentEnd = 1 + uint64(lastSeg)
fetchOpts.SegmentEnd = 4
fetchOpts.RetxLimit = 1
_, e = f.FetchPayloadOpts(m.Name, fetchOpts)
assert.Error(e)
Expand Down
14 changes: 12 additions & 2 deletions cmd/ndndpdk-godemo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,20 @@ openssl sha256 /tmp/1GB.bin
# start producer
ndndpdk-godemo --mtu 6000 put --name /segmented/1GB.bin --file /tmp/1GB.bin --chunk-size 4096

# (on another console) run consumer and compute downloaded digest
ndndpdk-godemo --mtu 6000 get --name /segmented/1GB.bin | openssl sha256
# (on another console) run consumer
ndndpdk-godemo --mtu 6000 get --name /segmented/1GB.bin >/tmp/1GB.retrieved

# compare original and retrieved files
sha256sum /tmp/1GB.bin /tmp/1GB.retrieved
```

## File Server API

[ndn6file.go](ndn6file.go) implements a client for ndn6-file-server compatible file servers.
This subcommand requires a local forwarder that connects to a file server.
This subcommand does not need sudo privilege, but you may need to manually create `/run/ndn` directory beforehand.
See [NDN-DPDK file server](../../docs/fileserver.md) for a usage example.

## NFD Management API

[nfdreg.go](nfdreg.go) implements a prefix registration tool using [NFD management API](../../ndn/mgmt/nfdmgmt).
Expand Down
118 changes: 118 additions & 0 deletions cmd/ndndpdk-godemo/ndn6file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"fmt"
"log"
"strconv"

"github.com/kballard/go-shellquote"
"github.com/urfave/cli/v2"
"github.com/usnistgov/ndn-dpdk/ndn"
"github.com/usnistgov/ndn-dpdk/ndn/endpoint"
"github.com/usnistgov/ndn-dpdk/ndn/rdr"
"github.com/usnistgov/ndn-dpdk/ndn/rdr/ndn6file"
"github.com/usnistgov/ndn-dpdk/ndn/segmented"
)

func retrieveFileMetadata(ctx context.Context, name string, fetchOptions segmented.FetchOptions) (m ndn6file.Metadata, e error) {
e = rdr.RetrieveMetadata(ctx, &m, ndn.ParseName(name), endpoint.ConsumerOptions{
Retx: endpoint.RetxOptions{Limit: fetchOptions.RetxLimit},
})
if e != nil {
return
}
log.Printf("retrieved metadata %s", m.Name)
return
}

func init() {
var name string
var fetchOptions segmented.FetchOptions
defineCommand(&cli.Command{
Name: "ls",
Usage: "List directory on file server.",
Flags: defineFetchOptionsFlags(&fetchOptions, []cli.Flag{
&cli.StringFlag{
Name: "name",
Usage: "name",
Destination: &name,
Required: true,
},
}),
Before: openUplink,
Action: func(c *cli.Context) error {
m, e := retrieveFileMetadata(c.Context, name, fetchOptions)
if e != nil {
return e
}
if !m.IsDir() {
return fmt.Errorf("mode %o is not a directory", m.Mode)
}

fetcher := segmented.Fetch(m.Name, fetchOptions)
payload, e := fetcher.Payload(c.Context)
if e != nil {
return e
}

var ls ndn6file.DirectoryListing
if e = ls.UnmarshalBinary(payload); e != nil {
return e
}

log.Printf("retrieved %d entries in directory listing", len(ls))
fmt.Println(ls)
return nil
},
})
}

func init() {
var name, filename string
var fetchOptions segmented.FetchOptions
var useTgFetcher bool
defineCommand(&cli.Command{
Name: "fetch",
Usage: "Fetch file from file server.",
Flags: defineFetchOptionsFlags(&fetchOptions, []cli.Flag{
&cli.StringFlag{
Name: "name",
Usage: "name",
Destination: &name,
Required: true,
},
&cli.StringFlag{
Name: "filename",
Usage: "output file name",
DefaultText: "write to stdout",
Destination: &filename,
},
&cli.BoolFlag{
Name: "tg-fetcher",
Usage: "generate arguments for 'ndndpdk-ctrl start-fetch' command to fetch with NDN-DPDK service",
Destination: &useTgFetcher,
},
}),
Before: openUplink,
Action: func(c *cli.Context) error {
m, e := retrieveFileMetadata(c.Context, name, fetchOptions)
if e != nil {
return e
}
if !m.IsFile() {
return fmt.Errorf("mode %o is not a file", m.Mode)
}
log.Printf("file has %d octets in %d segments of size %d", m.Size, m.SegmentEnd(), m.SegmentSize)
if useTgFetcher {
fmt.Println(shellquote.Join(
"--name", m.Name.String(),
"--segment-end", strconv.FormatUint(m.SegmentEnd(), 10),
"--segment-len", strconv.Itoa(m.SegmentSize),
))
return nil
}
return retrieveSegmented(c.Context, m.Name, filename, m.SegmentSize, fetchOptions)
},
})
}
Loading

0 comments on commit b1080e8

Please sign in to comment.