Skip to content

Commit

Permalink
nydusify: fix invalid blob digests for copy
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Song <[email protected]>
  • Loading branch information
imeoer committed Oct 31, 2024
1 parent 375f55f commit 4f6edda
Show file tree
Hide file tree
Showing 3 changed files with 425 additions and 31 deletions.
158 changes: 127 additions & 31 deletions contrib/nydusify/pkg/copier/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
Expand Down Expand Up @@ -99,7 +100,84 @@ func getPushWriter(ctx context.Context, pvd *provider.Provider, desc ocispec.Des
return writer, nil
}

func pushBlob(
ctx context.Context, pvd *provider.Provider, blobReader io.Reader, desc ocispec.Descriptor, opt Opt,
) (*digest.Digest, error) {
writer, err := getPushWriter(ctx, pvd, desc, opt)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
writer, err = getPushWriter(ctx, pvd, desc, opt)
}
if err != nil {
return nil, errors.Wrap(err, "get push writer")
}
}

if writer != nil {
defer writer.Close()

digester := digest.SHA256.Digester()
teeRc := io.TeeReader(blobReader, digester.Hash())

if err = content.Copy(ctx, writer, teeRc, desc.Size, desc.Digest); err != nil {
actualDigest := digester.Digest()
if actualDigest != desc.Digest {
return &actualDigest, errors.Wrap(err, "copy blob: maybe invalid source blob digest")
}
return nil, errors.Wrap(err, "copy blob")
}
}

return nil, nil
}

func pushBlobFromBackend(
ctx context.Context, pvd *provider.Provider, backend backend.Backend, blobDigest digest.Digest, actualBlobDigest *digest.Digest, opt Opt,
) (*ocispec.Descriptor, error) {
blobSize, err := backend.Size(blobDigest.Hex())
if err != nil {
return nil, errors.Wrap(err, "get blob size")
}
blobSizeStr := humanize.Bytes(uint64(blobSize))

logger := logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr)
if actualBlobDigest == nil {
actualBlobDigest = &blobDigest
} else {
logger = logger.WithField("actualDigest", *actualBlobDigest)
}

logger.Infof("pushing blob from backend")
blobDesc := ocispec.Descriptor{
Digest: *actualBlobDigest,
Size: blobSize,
MediaType: converter.MediaTypeNydusBlob,
Annotations: map[string]string{
converter.LayerAnnotationNydusBlob: "true",
},
}

rc, err := backend.Reader(blobDigest.Hex())
if err != nil {
return nil, errors.Wrap(err, "get blob reader")
}
defer rc.Close()

if actualBlobDigest, err := pushBlob(ctx, pvd, rc, blobDesc, opt); err != nil {
if blobDesc.Digest != *actualBlobDigest {
blobDesc.Digest = *actualBlobDigest
return &blobDesc, errors.Wrap(err, "copy blob: maybe invalid source blob digest")
}
return nil, errors.Wrap(err, "copy blob")
}

logger.Infof("pushed blob from backend")

return &blobDesc, nil
}

func pushImageFromBackend(
ctx context.Context, pvd *provider.Provider, backend backend.Backend, src ocispec.Descriptor, opt Opt,
) ([]ocispec.Descriptor, *ocispec.Descriptor, error) {
if src.MediaType != ocispec.MediaTypeImageManifest && src.MediaType != images.MediaTypeDockerSchema2Manifest {
Expand All @@ -117,7 +195,7 @@ func pushBlobFromBackend(
if err != nil {
return nil, nil, errors.Wrap(err, "prepare reading bootstrap")
}
bootstrapPath := filepath.Join(opt.WorkDir, "bootstrap.tgz")
bootstrapPath := filepath.Join(opt.WorkDir, "bootstrap")
if err := nydusifyUtils.UnpackFile(io.NewSectionReader(ra, 0, ra.Size()), nydusifyUtils.BootstrapFileNameInLayer, bootstrapPath); err != nil {
return nil, nil, errors.Wrap(err, "unpack bootstrap layer")
}
Expand Down Expand Up @@ -152,6 +230,10 @@ func pushBlobFromBackend(
sem := semaphore.NewWeighted(int64(provider.LayerConcurrentLimit))
eg, ctx := errgroup.WithContext(ctx)
blobDescs := make([]ocispec.Descriptor, len(blobIDs))

fixedBlobDigests := map[string]string{}
lock := sync.Mutex{}

for idx := range blobIDs {
func(idx int) {
eg.Go(func() error {
Expand All @@ -160,42 +242,28 @@ func pushBlobFromBackend(

blobID := blobIDs[idx]
blobDigest := digest.Digest("sha256:" + blobID)
blobSize, err := backend.Size(blobID)
if err != nil {
return errors.Wrap(err, "get blob size")
}
blobSizeStr := humanize.Bytes(uint64(blobSize))

logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushing blob from backend")
rc, err := backend.Reader(blobID)
rc, err := backend.Reader(blobDigest.Hex())
if err != nil {
return errors.Wrap(err, "get blob reader")
}
defer rc.Close()
blobDescs[idx] = ocispec.Descriptor{
Digest: blobDigest,
Size: blobSize,
MediaType: converter.MediaTypeNydusBlob,
Annotations: map[string]string{
converter.LayerAnnotationNydusBlob: "true",
},
}
writer, err := getPushWriter(ctx, pvd, blobDescs[idx], opt)

blobDesc, err := pushBlobFromBackend(ctx, pvd, backend, blobDigest, nil, opt)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
writer, err = getPushWriter(ctx, pvd, blobDescs[idx], opt)
}
if err != nil {
return errors.Wrap(err, "get push writer")
if blobDesc != nil {
logrus.WithError(err).Warn("retry to push blob with actual digest")
blobDesc, err = pushBlobFromBackend(ctx, pvd, backend, blobDigest, &blobDesc.Digest, opt)
if err != nil {
return errors.Wrap(err, "push blob from backend with actual digest")
}
lock.Lock()
fixedBlobDigests[blobDigest.String()] = blobDesc.Digest.String()
lock.Unlock()
}
return errors.Wrap(err, "push blob from backend")
}
if writer != nil {
defer writer.Close()
return content.Copy(ctx, writer, rc, blobSize, blobDigest)
}

logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend")
blobDescs[idx] = *blobDesc

return nil
})
Expand All @@ -214,7 +282,35 @@ func pushBlobFromBackend(
delete(manifest.Layers[idx].Annotations, "containerd.io/snapshot/nydus-blob-ids")
}
}
manifest.Layers = append(blobDescs, manifest.Layers...)
if len(fixedBlobDigests) > 0 {
// Fix blob digests in the bootstrap
logrus.Warnf("fixing blob digests in the bootstrap")
if err := fixBlobDigests(bootstrapPath, bootstrapPath, fixedBlobDigests); err != nil {
return nil, nil, errors.Wrap(err, "fix blob digests")
}
newBootstrapTarGzPath := filepath.Join(opt.WorkDir, "bootstrap.tar.gz")
if err := replaceFileInTarGz(
opt.WorkDir, io.NewSectionReader(ra, 0, ra.Size()), newBootstrapTarGzPath,
nydusifyUtils.BootstrapFileNameInLayer, bootstrapPath,
); err != nil {
return nil, nil, errors.Wrap(err, "replace bootstrap in tar.gz")
}
newBootstrapTarGzDigest, size, err := fileDigest(newBootstrapTarGzPath)
if err != nil {
return nil, nil, errors.Wrap(err, "get new bootstrap tar.gz digest")
}
bootstrapDesc.Digest = digest.NewDigestFromHex("sha256", newBootstrapTarGzDigest)
bootstrapDesc.Size = size
reader, err := os.Open(newBootstrapTarGzPath)
if err != nil {
return nil, nil, errors.Wrap(err, "open new bootstrap tar.gz")
}
defer reader.Close()
if _, err := pushBlob(ctx, pvd, reader, *bootstrapDesc, opt); err != nil {
return nil, nil, errors.Wrap(err, "push new bootstrap tar.gz")
}
}
manifest.Layers = append(blobDescs, *bootstrapDesc)

// Update image config
blobDigests := []digest.Digest{}
Expand Down Expand Up @@ -399,7 +495,7 @@ func Copy(ctx context.Context, opt Opt) error {
sourceDesc := sourceDescs[idx]
targetDesc := &sourceDesc
if bkd != nil {
descs, _targetDesc, err := pushBlobFromBackend(ctx, pvd, bkd, sourceDesc, opt)
descs, _targetDesc, err := pushImageFromBackend(ctx, pvd, bkd, sourceDesc, opt)
if err != nil {
return errors.Wrap(err, "get resolver")
}
Expand Down
28 changes: 28 additions & 0 deletions contrib/nydusify/pkg/copier/copier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package copier

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFixBlobDigest(t *testing.T) {
_, err := fixBlobDigests(
"/home/imeoer/output/nydus_bootstrap", "/home/imeoer/nydus_bootstrap.fixed",
map[string]string{
"f0caa4821fb3f5cb53ca0563666b6029ab071b44caabca7073ebba880e8bda21": "f1caa4821fb3f5cb53ca0563666b6029ab071b44caabca7073ebba880e8bda21",
})
require.NoError(t, err)

_, err = fixBlobDigests("/home/imeoer/nydus_bootstrap.fixed", "/home/imeoer/nydus_bootstrap.fixed",
map[string]string{
"7d0fc001a3376d250a02a7749f09f720a8d02911b85cd7c4723ceda97c217116": "710fc001a3376d250a02a7749f09f720a8d02911b85cd7c4723ceda97c217116",
})
require.NoError(t, err)

_, err = fixBlobDigests("/home/imeoer/nydus_bootstrap.fixed", "/home/imeoer/nydus_bootstrap.fixed",
map[string]string{
"30af89d8656a224a257f0ce98f180090425263db245e4a6aee278c9fad39072c": "31af89d8656a224a257f0ce98f180090425263db245e4a6aee278c9fad39072c",
})
require.NoError(t, err)
}
Loading

0 comments on commit 4f6edda

Please sign in to comment.