Skip to content

Commit

Permalink
move (copy|transform)-object interface
Browse files Browse the repository at this point in the history
* remove data-mover typecast
* simplify and refactor

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 13, 2024
1 parent d9a141c commit 7fa3cb3
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 97 deletions.
9 changes: 5 additions & 4 deletions ais/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func initDaemon(version, buildTime string) cos.Runner {
// fork (proxy | target)
co := newConfigOwner(config)
if daemon.cli.role == apc.Proxy {
xs.Xreg(true /* x-ele only */)
xs.Preg()
p := newProxy(co)
p.init(config)
title := _loghdr2(p.si, loghdr)
Expand All @@ -250,12 +250,13 @@ func initDaemon(version, buildTime string) cos.Runner {
return p
}

t := newTarget(co)
t.init(config)

// reg xaction factories
xs.Xreg(false /* x-ele only */)
xs.Treg(t)
space.Xreg()

t := newTarget(co)
t.init(config)
title := _loghdr2(t.si, loghdr)
nlog.Infoln(title)

Expand Down
6 changes: 3 additions & 3 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,7 @@ func (t *target) objMv(lom *core.LOM, msg *apc.ActMsg) (err error) {
}

buf, slab := t.gmm.Alloc()
coiParams := core.AllocCOI()
coiParams := xs.AllocCOI()
{
coiParams.BckTo = lom.Bck()
coiParams.ObjnameTo = msg.Name /* new object name */
Expand All @@ -1372,9 +1372,9 @@ func (t *target) objMv(lom *core.LOM, msg *apc.ActMsg) (err error) {
coiParams.OWT = cmn.OwtCopy
coiParams.Finalize = true
}
coi := (*copyOI)(coiParams)
coi := (*coi)(coiParams)
_, err = coi.do(t, nil /*DM*/, lom)
core.FreeCOI(coiParams)
xs.FreeCOI(coiParams)
slab.Free(buf)
if err != nil {
return err
Expand Down
28 changes: 10 additions & 18 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/transport/bundle"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/NVIDIA/aistore/xact/xs"
)

func (*target) DataClient() *http.Client { return g.client.data }
Expand Down Expand Up @@ -134,19 +135,9 @@ func (t *target) HeadObjT2T(lom *core.LOM, si *meta.Snode) bool {
// the AIS cluster (by performing a cold GET if need be).
// - if the dst is cloud, we perform a regular PUT logic thus also making sure that the new
// replica gets created in the cloud bucket of _this_ AIS cluster.
func (t *target) CopyObject(lom *core.LOM, dm core.DM, params *core.CopyParams) (size int64, err error) {
coi := (*copyOI)(params)
// defaults
coi.OWT = cmn.OwtCopy
coi.Finalize = false
if coi.ObjnameTo == "" {
coi.ObjnameTo = lom.ObjName
}
realDM, ok := dm.(*bundle.DataMover) // TODO -- FIXME: eliminate typecast
debug.Assert(ok)

size, err = coi.do(t, realDM, lom)

func (t *target) CopyObject(lom *core.LOM, dm *bundle.DataMover, params *xs.CoiParams) (size int64, err error) {
coi := (*coi)(params)
size, err = coi.do(t, dm, lom)
coi.stats(size, err)
return size, err
}
Expand Down Expand Up @@ -347,8 +338,9 @@ func (t *target) _promLocal(params *core.PromoteParams, lom *core.LOM) (fileSize
return
}

// TODO: use DM streams
// TODO: Xact.InObjsAdd on the receive side
// [TODO]
// - use DM streams
// - Xact.InObjsAdd on the receive side
func (t *target) _promRemote(params *core.PromoteParams, lom *core.LOM, tsi *meta.Snode, smap *smapX) (int64, error) {
lom.FQN = params.SrcFQN

Expand All @@ -357,16 +349,16 @@ func (t *target) _promRemote(params *core.PromoteParams, lom *core.LOM, tsi *met
return -1, nil
}

coiParams := core.AllocCOI()
coiParams := xs.AllocCOI()
{
coiParams.BckTo = lom.Bck()
coiParams.OWT = cmn.OwtPromote
coiParams.Xact = params.Xact
coiParams.Config = params.Config
}
coi := (*copyOI)(coiParams)
coi := (*coi)(coiParams)
size, err := coi.send(t, nil /*DM*/, lom, lom.ObjName, tsi)
core.FreeCOI(coiParams)
xs.FreeCOI(coiParams)

return size, err
}
Expand Down
21 changes: 11 additions & 10 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/NVIDIA/aistore/transport"
"github.com/NVIDIA/aistore/transport/bundle"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/NVIDIA/aistore/xact/xs"
)

//
Expand Down Expand Up @@ -115,7 +116,7 @@ type (
size int64 // Content-Length
}

copyOI core.CopyParams
coi xs.CoiParams

sendArgs struct {
reader cos.ReadOpenCloser
Expand Down Expand Up @@ -1399,7 +1400,7 @@ func (a *apndOI) pack(workFQN string) string {
//

// main method
func (coi *copyOI) do(t *target, dm *bundle.DataMover, lom *core.LOM) (size int64, err error) {
func (coi *coi) do(t *target, dm *bundle.DataMover, lom *core.LOM) (size int64, err error) {
if coi.DryRun {
return coi._dryRun(lom, coi.ObjnameTo)
}
Expand Down Expand Up @@ -1438,7 +1439,7 @@ func (coi *copyOI) do(t *target, dm *bundle.DataMover, lom *core.LOM) (size int6
return size, err
}

func (coi *copyOI) _dryRun(lom *core.LOM, objnameTo string) (size int64, err error) {
func (coi *coi) _dryRun(lom *core.LOM, objnameTo string) (size int64, err error) {
if coi.DP == nil {
uname := coi.BckTo.MakeUname(objnameTo)
if lom.Uname() != cos.UnsafeS(uname) {
Expand Down Expand Up @@ -1470,7 +1471,7 @@ func (coi *copyOI) _dryRun(lom *core.LOM, objnameTo string) (size int64, err err
//
// An option for _not_ storing the object _in_ the cluster would be a _feature_ that can be
// further debated.
func (coi *copyOI) _reader(t *target, dm *bundle.DataMover, lom, dst *core.LOM) (size int64, _ int, _ error) {
func (coi *coi) _reader(t *target, dm *bundle.DataMover, lom, dst *core.LOM) (size int64, _ int, _ error) {
reader, oah, errN := coi.DP.Reader(lom, coi.LatestVer, coi.Sync)
if errN != nil {
return 0, 0, errN
Expand Down Expand Up @@ -1511,7 +1512,7 @@ func (coi *copyOI) _reader(t *target, dm *bundle.DataMover, lom, dst *core.LOM)
return size, ecode, err
}

func (coi *copyOI) _regular(t *target, lom, dst *core.LOM) (size int64, _ error) {
func (coi *coi) _regular(t *target, lom, dst *core.LOM) (size int64, _ error) {
if lom.FQN == dst.FQN { // resilvering with a single mountpath?
return
}
Expand Down Expand Up @@ -1554,7 +1555,7 @@ func (coi *copyOI) _regular(t *target, lom, dst *core.LOM) (size int64, _ error)
// send object => designated target
// * source is a LOM or a reader (that may be reading from remote)
// * one of the two equivalent transmission mechanisms: PUT or transport Send
func (coi *copyOI) send(t *target, dm *bundle.DataMover, lom *core.LOM, objNameTo string, tsi *meta.Snode) (size int64, err error) {
func (coi *coi) send(t *target, dm *bundle.DataMover, lom *core.LOM, objNameTo string, tsi *meta.Snode) (size int64, err error) {
debug.Assert(coi.OWT > 0)
sargs := allocSnda()
{
Expand All @@ -1572,7 +1573,7 @@ func (coi *copyOI) send(t *target, dm *bundle.DataMover, lom *core.LOM, objNameT
return
}

func (coi *copyOI) _send(t *target, lom *core.LOM, sargs *sendArgs) (size int64, _ error) {
func (coi *coi) _send(t *target, lom *core.LOM, sargs *sendArgs) (size int64, _ error) {
debug.Assert(!coi.DryRun)
if sargs.dm != nil {
// clone the `lom` to use it in the async operation (free it via `_sendObjDM` callback)
Expand Down Expand Up @@ -1638,7 +1639,7 @@ func (coi *copyOI) _send(t *target, lom *core.LOM, sargs *sendArgs) (size int64,

// use data mover to transmit objects to other targets
// (compare with coi.put())
func (coi *copyOI) _dm(lom *core.LOM, sargs *sendArgs) error {
func (coi *coi) _dm(lom *core.LOM, sargs *sendArgs) error {
debug.Assert(sargs.dm.OWT() == sargs.owt)
debug.Assert(sargs.dm.GetXact() == coi.Xact || sargs.dm.GetXact().ID() == coi.Xact.ID())
o := transport.AllocSend()
Expand All @@ -1656,7 +1657,7 @@ func (coi *copyOI) _dm(lom *core.LOM, sargs *sendArgs) error {

// PUT(lom) => destination target (compare with coi.dm())
// always closes params.Reader, either explicitly or via Do()
func (coi *copyOI) put(t *target, sargs *sendArgs) error {
func (coi *coi) put(t *target, sargs *sendArgs) error {
var (
hdr = make(http.Header, 8)
query = sargs.bckTo.NewQuery()
Expand Down Expand Up @@ -1690,7 +1691,7 @@ func (coi *copyOI) put(t *target, sargs *sendArgs) error {
return nil
}

func (coi *copyOI) stats(size int64, err error) {
func (coi *coi) stats(size int64, err error) {
if err == nil && coi.Xact != nil {
coi.Xact.ObjsAdd(1, size)
}
Expand Down
7 changes: 4 additions & 3 deletions ais/tgts3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/ec"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/xact/xs"
)

const fmtErrBckObj = "invalid %s request: expecting bucket and object (names) in the URL, have %v"
Expand Down Expand Up @@ -143,16 +144,16 @@ func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.C
return
}

coiParams := core.AllocCOI()
coiParams := xs.AllocCOI()
{
coiParams.Config = config
coiParams.BckTo = bckTo
coiParams.ObjnameTo = s3.ObjName(items)
coiParams.OWT = cmn.OwtCopy
}
coi := (*copyOI)(coiParams)
coi := (*coi)(coiParams)
_, err = coi.do(t, nil /*DM*/, lom)
core.FreeCOI(coiParams)
xs.FreeCOI(coiParams)

if err != nil {
if err == cmn.ErrSkip {
Expand Down
18 changes: 11 additions & 7 deletions cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ Usage examples:
`

// ais cp
const copyBucketUsage = "copy entire bucket or selected objects (to select, use '--list', '--template', or '--prefix'), e.g.:\n" +
const copyBucketUsage = "copy entire bucket or selected objects (to select, use '--list', '--template', or '--prefix'),\n" +
indent1 + "\te.g.:\n" +
indent1 + "\t- 'ais cp gs://webdaset-coco ais://dst'\t- copy entire Cloud bucket;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --all'\t- copy entire Cloud bucket that may not be _present_ in the cluster;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --all'\t- copy entire Cloud bucket that may _not_ be in-cluster;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --all --num-workers 16'\t- same as above employing 16 concurrent workers;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --all --num-workers 16 --prefix dir/subdir/'\t- same as above, but limit copying to a given virtual subdirectory;\n" +
indent1 + "\t- 'ais cp s3://abc gs://xyz --all'\t- copy Cloud bucket to another Cloud;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --latest'\t- copy Cloud bucket, and make sure that already present in-cluster copies are updated to the latest (remote) versions;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --sync'\t- same as above, but in addition delete in-cluster copies that do not exist (any longer) in the remote source\n" +
indent1 + "\t\t with template, prefix, and/or progress bar;\n" +
indent1 + "\t- 'ais cp s3://abc gs://xyz --all'\t- copy Cloud bucket to another Cloud.\n" +
indent1 + "\tsimilar to prefetch:\n" +
indent1 + "\t- 'ais cp s3://data s3://data --all'\t- copy remote source (and create namesake destination in-cluster bucket if doesn't exist).\n" +
indent1 + "\tsynchronize with out-of-band updates:\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --latest'\t- copy Cloud bucket; make sure that already present in-cluster copies are updated to the latest versions;\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --sync'\t- same as above, but in addition delete in-cluster copies that do not exist (any longer) in the remote source.\n" +
indent1 + "\twith template, prefix, and progress:\n" +
indent1 + "\t- 'ais cp s3://abc ais://nnn --prepend backup/'\t- copy objects into 'backup/' virtual subdirectory in destination bucket;\n" +
indent1 + "\t- 'ais cp ais://nnn/111 ais://mmm'\t- copy all ais://nnn objects that match prefix '111';\n" +
indent1 + "\t- 'ais cp gs://webdataset-coco ais:/dst --template d-tokens/shard-{000000..000999}.tar.lz4'\t- copy up to 1000 objects that share the specified prefix;\n" +
indent1 + "\t- 'ais cp gs://webdataset-coco ais:/dst --prefix d-tokens/ --progress --all'\t- show progress while copying virtual subdirectory 'd-tokens';\n" +
indent1 + "\t- 'ais cp gs://webdataset-coco/d-tokens/ ais:/dst --progress --all'\t- same as above"
indent1 + "\t- 'ais cp gs://webdataset-coco/d-tokens/ ais:/dst --progress --all'\t- same as above."

// ais ls
var listAnyUsage = "list buckets, objects in buckets, and files in " + archExts + "-formatted objects,\n" +
Expand Down
4 changes: 0 additions & 4 deletions core/mock/target_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func (*TargetMock) OOS(*fs.CapStatus, *cmn.Config, *fs.Tcdf) fs.CapStatus {
return fs.CapStatus{}
}

func (*TargetMock) CopyObject(*core.LOM, core.DM, *core.CopyParams) (int64, error) {
return 0, nil
}

func (*TargetMock) GetCold(context.Context, *core.LOM, cmn.OWT) (int, error) {
return http.StatusOK, nil
}
Expand Down
20 changes: 0 additions & 20 deletions core/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ var (

putObjPool sync.Pool
putObj0 PutParams

coiPool sync.Pool
coi0 CopyParams
)

/////////////
Expand Down Expand Up @@ -58,20 +55,3 @@ func FreePutParams(a *PutParams) {
*a = putObj0
putObjPool.Put(a)
}

//
// CopyParams pool
//

func AllocCOI() (a *CopyParams) {
if v := coiPool.Get(); v != nil {
a = v.(*CopyParams)
return
}
return &CopyParams{}
}

func FreeCOI(a *CopyParams) {
*a = coi0
coiPool.Put(a)
}
14 changes: 0 additions & 14 deletions core/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,6 @@ type (
Xact Xact // responsible xaction
apc.PromoteArgs // all of the above
}
CopyParams struct {
DP DP // copy or transform via data provider, see impl-s: (ext/etl/dp.go, core/ldp.go)
Xact Xact
Config *cmn.Config
BckTo *meta.Bck
ObjnameTo string
Buf []byte
OWT cmn.OWT
Finalize bool // copies and EC (as in poi.finalize())
DryRun bool
LatestVer bool // can be used without changing bucket's 'versioning.validate_warm_get'; see also: QparamLatestVer
Sync bool // ditto - bucket's 'versioning.synchronize'
}

// blob
WriteSGL func(*memsys.SGL) error
Expand Down Expand Up @@ -124,7 +111,6 @@ type (

HeadCold(lom *LOM, origReq *http.Request) (objAttrs *cmn.ObjAttrs, ecode int, err error)

CopyObject(lom *LOM, dm DM, coi *CopyParams) (int64, error)
Promote(params *PromoteParams) (ecode int, err error)
HeadObjT2T(lom *LOM, si *meta.Snode) bool

Expand Down
33 changes: 33 additions & 0 deletions xact/xs/coi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Package xs is a collection of eXtended actions (xactions), including multi-object
// operations, list-objects, (cluster) rebalance and (target) resilver, ETL, and more.
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package xs

import (
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/transport/bundle"
)

type (
CoiParams struct {
DP core.DP // copy or transform via data provider, see impl-s: (ext/etl/dp.go, core/ldp.go)
Xact core.Xact
Config *cmn.Config
BckTo *meta.Bck
ObjnameTo string
Buf []byte
OWT cmn.OWT
Finalize bool // copies and EC (as in poi.finalize())
DryRun bool
LatestVer bool // can be used without changing bucket's 'versioning.validate_warm_get'; see also: QparamLatestVer
Sync bool // ditto - bucket's 'versioning.synchronize'
}

COI interface {
CopyObject(lom *core.LOM, dm *bundle.DataMover, coi *CoiParams) (int64, error)
}
)
Loading

0 comments on commit 7fa3cb3

Please sign in to comment.