Skip to content

Commit

Permalink
Mostly implemented prove, cleanup in Encode
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jun 18, 2024
1 parent 1915191 commit 647cc99
Show file tree
Hide file tree
Showing 10 changed files with 633 additions and 294 deletions.
143 changes: 112 additions & 31 deletions lib/ffi/snap_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package ffi
import (
"bytes"
"context"
"encoding/json"
"github.com/detailyang/go-fallocate"
"github.com/filecoin-project/curio/lib/asyncwrite"
"github.com/filecoin-project/curio/lib/ffiselect"
"io"
"os"

paths2 "github.com/filecoin-project/curio/lib/paths"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/xerrors"
"io"
"os"
"path/filepath"

"github.com/filecoin-project/go-state-types/abi"

Expand All @@ -24,6 +27,7 @@ import (

func (sb *SealCalls) EncodeUpdate(
ctx context.Context,
sectorKeyCid cid.Cid,
taskID harmonytask.TaskID,
proofType abi.RegisteredUpdateProof,
sector storiface.SectorRef,
Expand All @@ -40,44 +44,81 @@ func (sb *SealCalls) EncodeUpdate(
return cid.Undef, cid.Undef, xerrors.Errorf("update paths not set")
}

keyPath := "" // can this be a named pipe - no, mmap in proofs
keyCachePath := "" // some temp copy
stagedDataPath := "" // can this be a named pipe - no, mmap in proofs
// remove old Update/UpdateCache files if they exist
if err := os.Remove(paths.Update); err != nil && !os.IsNotExist(err) {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("removing old update file: %w", err)
}
if err := os.RemoveAll(paths.UpdateCache); err != nil {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("removing old update cache: %w", err)
}

// ensure update cache dir exists
if err := os.MkdirAll(paths.UpdateCache, 0755); err != nil {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("mkdir update cache: %w", err)
}

keyPath := filepath.Join(paths.UpdateCache, "cu-sector-key.dat") // can this be a named pipe - no, mmap in proofs
keyCachePath := filepath.Join(paths.UpdateCache, "cu-sector-key-fincache") // some temp copy (finalized cache directory)
stagedDataPath := filepath.Join(paths.UpdateCache, "cu-staged.dat") // can this be a named pipe - no, mmap in proofs

var cleanupStagedFiles func() error
{
// hack until we do snap encode ourselves and just call into proofs for CommR

// todo use storage subsystem for temp files
keyFile, err := os.CreateTemp("", "cutmp-key-")
keyFile, err := os.Create(keyPath)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("creating temp file: %w", err)
return cid.Undef, cid.Undef, xerrors.Errorf("creating key file: %w", err)
}

keyCachePath, err = os.MkdirTemp("", "cutmp-keycache-")
err = os.Mkdir(keyCachePath, 0755)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("creating temp file: %w", err)
return cid.Undef, cid.Undef, xerrors.Errorf("creating key cache dir: %w", err)
}

stagedFile, err := os.CreateTemp("", "cutmp-staged-")
stagedFile, err := os.Create(stagedDataPath)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("creating temp file: %w", err)
}

keyPath = keyFile.Name()
stagedDataPath = stagedFile.Name()

defer func() {
var cleanupDone bool
cleanupStagedFiles = func() error {
if cleanupDone {
return nil
}
cleanupDone = true

if keyFile != nil {
_ = keyFile.Close()
if err := keyFile.Close(); err != nil {
return xerrors.Errorf("closing key file: %w", err)
}
}
if stagedFile != nil {
_ = stagedFile.Close()
if err := stagedFile.Close(); err != nil {
return xerrors.Errorf("closing staged file: %w", err)
}
}

_ = os.Remove(keyPath)
_ = os.Remove(keyCachePath)
if err := os.Remove(keyPath); err != nil {
return xerrors.Errorf("removing key file: %w", err)
}
if err := os.RemoveAll(keyCachePath); err != nil {
return xerrors.Errorf("removing key cache: %w", err)
}
if err := os.Remove(stagedDataPath); err != nil {
return xerrors.Errorf("removing staged file: %w", err)
}

_ = os.Remove(stagedDataPath)
return nil
}

defer func() {
clerr := cleanupStagedFiles()
if clerr != nil {
log.Errorf("cleanup error: %+v", clerr)
}
}()

log.Debugw("get key data", "keyPath", keyPath, "keyCachePath", keyCachePath, "sectorID", sector.ID, "taskID", taskID)
Expand Down Expand Up @@ -137,19 +178,6 @@ func (sb *SealCalls) EncodeUpdate(
}
}

// remove old Update/UpdateCache files if they exist
if err := os.Remove(paths.Update); err != nil && !os.IsNotExist(err) {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("removing old update file: %w", err)
}
if err := os.RemoveAll(paths.UpdateCache); err != nil {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("removing old update cache: %w", err)
}

// ensure update cache dir exists
if err := os.MkdirAll(paths.UpdateCache, 0755); err != nil {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("mkdir update cache: %w", err)
}

// allocate update file
{
s, err := os.Stat(keyPath)
Expand All @@ -176,9 +204,62 @@ func (sb *SealCalls) EncodeUpdate(
return cid.Undef, cid.Undef, xerrors.Errorf("ffi update encode: %w", err)
}

vps, err := ffi.SectorUpdate.GenerateUpdateVanillaProofs(proofType, sectorKeyCid, out.Sealed, out.Unsealed, paths.Update, paths.UpdateCache, keyPath, keyCachePath)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("generate vanilla update proofs: %w", err)
}

ok, err := ffi.SectorUpdate.VerifyVanillaProofs(proofType, sectorKeyCid, out.Sealed, out.Unsealed, vps)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("verify vanilla update proofs: %w", err)
}
if !ok {
return cid.Undef, cid.Undef, xerrors.Errorf("vanilla update proofs invalid")
}

// persist in UpdateCache/snap-vproof.json
jb, err := json.Marshal(vps)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("marshal vanilla proofs: %w", err)
}

vpPath := filepath.Join(paths.UpdateCache, paths2.SnapVproofFile)
if err := os.WriteFile(vpPath, jb, 0644); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("write vanilla proofs: %w", err)
}

ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
}

// cleanup
if err := cleanupStagedFiles(); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("cleanup staged files: %w", err)
}

if err := ffi.ClearCache(uint64(ssize), paths.UpdateCache); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("clear cache: %w", err)
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("ensure one copy: %w", err)
}

return out.Sealed, out.Unsealed, nil
}

func (sb *SealCalls) ProveUpdate(ctx context.Context, proofType abi.RegisteredUpdateProof, sector storiface.SectorRef, key, sealed, unsealed cid.Cid) ([]byte, error) {
jsonb, err := sb.sectors.storage.ReadSnapVanillaProof(ctx, sector)
if err != nil {
return nil, xerrors.Errorf("read snap vanilla proof: %w", err)
}

var vproofs [][]byte
if err := json.Unmarshal(jsonb, &vproofs); err != nil {
return nil, xerrors.Errorf("unmarshal snap vanilla proof: %w", err)
}

ctx = ffiselect.WithLogCtx(ctx, "sector", sector.ID, "key", key, "sealed", sealed, "unsealed", unsealed)
return ffiselect.FFISelect.GenerateUpdateProofWithVanilla(ctx, proofType, key, sealed, unsealed, vproofs)
}
8 changes: 8 additions & 0 deletions lib/ffiselect/ffidirect/ffi-direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func (FFI) EncodeInto(
}, nil
}

func (FFI) GenerateUpdateProofWithVanilla(
proofType abi.RegisteredUpdateProof,
key, sealed, unsealed cid.Cid,
vproofs [][]byte,
) ([]byte, error) {
return ffi.SectorUpdate.GenerateUpdateProofWithVanilla(proofType, key, sealed, unsealed, vproofs)
}

func (FFI) SelfTest(val1 int, val2 cid.Cid) (cid.Cid, error) {
if val1 != 12345678 {
return cid.Undef, errors.New("val1 was not as expected")
Expand Down
7 changes: 7 additions & 0 deletions lib/ffiselect/ffiselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ var FFISelect struct {
pieces []abi.PieceInfo,
) (out storiface.SectorCids, err error)

GenerateUpdateProofWithVanilla func(
ctx context.Context,
proofType abi.RegisteredUpdateProof,
key, sealed, unsealed cid.Cid,
vproofs [][]byte,
) ([]byte, error)

SelfTest func(ctx context.Context, val1 int, val2 cid.Cid) (cid.Cid, error)
}

Expand Down
22 changes: 22 additions & 0 deletions lib/paths/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/remote/stat/{id}", handler.remoteStatFs).Methods("GET")
mux.HandleFunc("/remote/vanilla/single", handler.generateSingleVanillaProof).Methods("POST")
mux.HandleFunc("/remote/vanilla/porep", handler.generatePoRepVanillaProof).Methods("POST")
mux.HandleFunc("/remote/vanilla/snap", handler.readSnapVanillaProof).Methods("POST")
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", handler.remoteGetAllocated).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
Expand Down Expand Up @@ -344,6 +345,27 @@ func (handler *FetchHandler) generatePoRepVanillaProof(w http.ResponseWriter, r
http.ServeContent(w, r, "", time.Time{}, bytes.NewReader(vanilla))
}

type SnapVanillaParams struct {
Sector storiface.SectorRef
}

func (handler *FetchHandler) readSnapVanillaProof(w http.ResponseWriter, r *http.Request) {
var params SnapVanillaParams
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
http.Error(w, err.Error(), 500)
return
}

vanilla, err := handler.Local.ReadSnapVanillaProof(r.Context(), params.Sector)
if err != nil {
http.Error(w, err.Error(), 500)
return
}

w.Header().Set("Content-Type", "application/octet-stream")
http.ServeContent(w, r, "", time.Time{}, bytes.NewReader(vanilla))
}

func FileTypeFromString(t string) (storiface.SectorFileType, error) {
return storiface.TypeFromString(t)
}
1 change: 1 addition & 0 deletions lib/paths/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ type Store interface {

GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error)
GeneratePoRepVanillaProof(ctx context.Context, sr storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error)
ReadSnapVanillaProof(ctx context.Context, sr storiface.SectorRef) ([]byte, error)
}
19 changes: 19 additions & 0 deletions lib/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type LocalStorage interface {
}

const MetaFile = "sectorstore.json"
const SnapVproofFile = "snap-vproof.json"

const MinFreeStoragePercentage = float64(0)

Expand Down Expand Up @@ -980,4 +981,22 @@ func (st *Local) GeneratePoRepVanillaProof(ctx context.Context, sr storiface.Sec
return ffi.SealCommitPhase1(sr.ProofType, sealed, unsealed, src.Cache, src.Sealed, sr.ID.Number, sr.ID.Miner, ticket, seed, secPiece)
}

func (st *Local) ReadSnapVanillaProof(ctx context.Context, sr storiface.SectorRef) ([]byte, error) {
src, _, err := st.AcquireSector(ctx, sr, storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire sector: %w", err)
}

if src.UpdateCache == "" {
return nil, errPathNotFound
}

out, err := os.ReadFile(filepath.Join(src.UpdateCache, SnapVproofFile))
if err != nil {
return nil, xerrors.Errorf("read snap vanilla proof: %w", err)
}

return out, nil
}

var _ Store = &Local{}
Loading

0 comments on commit 647cc99

Please sign in to comment.