Skip to content

Commit

Permalink
Merge pull request containerd#9409 from ruiwen-zhao/progress-fix-1.7
Browse files Browse the repository at this point in the history
[release/1.7] fix: ImagePull should close http connection if there is no available data to read.
  • Loading branch information
samuelkarp authored Nov 22, 2023
2 parents b1c6f01 + 2068061 commit 21b85e9
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 17 deletions.
69 changes: 68 additions & 1 deletion integration/build_local_containerd_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log/logtest"
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
ctrdsrv "github.com/containerd/containerd/services/server"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/opencontainers/go-digest"

_ "github.com/containerd/containerd/diff/walking/plugin"
"github.com/containerd/containerd/events/exchange"
Expand Down Expand Up @@ -59,9 +62,11 @@ var (
loadedPluginsErr error
)

type tweakPluginInitFunc func(t *testing.T, p *plugin.Registration) *plugin.Registration

// buildLocalContainerdClient is to return containerd client with initialized
// core plugins in local.
func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client {
func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPluginInitFunc) *containerd.Client {
ctx := logtest.WithT(context.Background(), t)

// load plugins
Expand Down Expand Up @@ -107,6 +112,10 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client
initContext.Config = pc
}

if tweakInitFn != nil {
p = tweakInitFn(t, p)
}

result := p.Init(initContext)
assert.NoError(t, initialized.Add(result))

Expand All @@ -126,3 +135,61 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client

return client
}

func tweakContentInitFnWithDelayer(commitDelayDuration time.Duration) tweakPluginInitFunc {
return func(t *testing.T, p *plugin.Registration) *plugin.Registration {
if p.URI() != "io.containerd.content.v1.content" {
return p
}

oldInitFn := p.InitFn
p.InitFn = func(ic *plugin.InitContext) (interface{}, error) {
instance, err := oldInitFn(ic)
if err != nil {
return nil, err
}

return &contentStoreDelayer{
t: t,

Store: instance.(content.Store),
commitDelayDuration: commitDelayDuration,
}, nil
}
return p
}
}

type contentStoreDelayer struct {
t *testing.T

content.Store
commitDelayDuration time.Duration
}

func (cs *contentStoreDelayer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
w, err := cs.Store.Writer(ctx, opts...)
if err != nil {
return nil, err
}

return &contentWriterDelayer{
t: cs.t,

Writer: w,
commitDelayDuration: cs.commitDelayDuration,
}, nil
}

type contentWriterDelayer struct {
t *testing.T

content.Writer
commitDelayDuration time.Duration
}

func (w *contentWriterDelayer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
w.t.Logf("[testcase: %s] Commit %v blob after %v", w.t.Name(), expected, w.commitDelayDuration)
time.Sleep(w.commitDelayDuration)
return w.Writer.Commit(ctx, size, expected, opts...)
}
39 changes: 37 additions & 2 deletions integration/image_pull_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,41 @@ func TestCRIImagePullTimeout(t *testing.T) {

t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter)
t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred)
t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter)
}

// testCRIImagePullTimeoutBySlowCommitWriter tests that
//
// It should not cancel if the content.Commit takes long time.
//
// After copying all the data from registry, the request should be inactive
// before content.Commit. If the blob is large, for instance, 2 GiB, the fsync
// during content.Commit maybe take long time during IO pressure. The
// content.Commit holds the bolt's writable mutex and blocks other goroutines
// which are going to commit blob as well. If the progress tracker still
// considers these requests active, it maybe file false alert and cancel the
// ImagePull.
//
// It's reproducer for #9347.
func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
t.Parallel()

tmpDir := t.TempDir()

delayDuration := 2 * defaultImagePullProgressTimeout
cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration))

criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
assert.NoError(t, err)

ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)

_, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: &runtimeapi.ImageSpec{
Image: pullProgressTestImageName,
},
})
assert.NoError(t, err)
}

// testCRIImagePullTimeoutByHoldingContentOpenWriter tests that
Expand All @@ -76,7 +111,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {

tmpDir := t.TempDir()

cli := buildLocalContainerdClient(t, tmpDir)
cli := buildLocalContainerdClient(t, tmpDir, nil)

criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{})
assert.NoError(t, err)
Expand Down Expand Up @@ -214,7 +249,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {

tmpDir := t.TempDir()

cli := buildLocalContainerdClient(t, tmpDir)
cli := buildLocalContainerdClient(t, tmpDir, nil)

mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{
limitedBytesPerConn: 1024 * 1024 * 3, // 3MB
Expand Down
24 changes: 24 additions & 0 deletions pkg/cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ import (
"github.com/containerd/containerd/plugin"
)

const (
// defaultImagePullProgressTimeoutDuration is the default value of imagePullProgressTimeout.
//
// NOTE:
//
// This ImagePullProgressTimeout feature is ported from kubelet/dockershim's
// --image-pull-progress-deadline. The original value is 1m0. Unlike docker
// daemon, the containerd doesn't have global concurrent download limitation
// before migrating to Transfer Service. If kubelet runs with concurrent
// image pull, the node will run under IO pressure. The ImagePull process
// could be impacted by self, if the target image is large one with a
// lot of layers. And also both container's writable layers and image's storage
// share one disk. The ImagePull process commits blob to content store
// with fsync, which might bring the unrelated files' dirty pages into
// disk in one transaction [1]. The 1m0 value isn't good enough. Based
// on #9347 case and kubernetes community's usage [2], the default value
// is updated to 5m0. If end-user still runs into unexpected cancel,
// they need to config it based on their environment.
//
// [1]: Fast commits for ext4 - https://lwn.net/Articles/842385/
// [2]: https://github.com/kubernetes/kubernetes/blob/1635c380b26a1d8cc25d36e9feace9797f4bae3c/cluster/gce/util.sh#L882
defaultImagePullProgressTimeoutDuration = 5 * time.Minute
)

type SandboxControllerMode string

const (
Expand Down
4 changes: 1 addition & 3 deletions pkg/cri/config/config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package config

import (
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/pkg/cri/streaming"
"github.com/pelletier/go-toml"
Expand Down Expand Up @@ -109,7 +107,7 @@ func DefaultConfig() PluginConfig {
},
EnableCDI: false,
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
ImagePullProgressTimeout: time.Minute.String(),
ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(),
DrainExecSyncIOTimeout: "0s",
}
}
3 changes: 1 addition & 2 deletions pkg/cri/config/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package config
import (
"os"
"path/filepath"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/pkg/cri/streaming"
Expand Down Expand Up @@ -85,7 +84,7 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
ImagePullProgressTimeout: time.Minute.String(),
ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(),
DrainExecSyncIOTimeout: "0s",
}
}
11 changes: 2 additions & 9 deletions pkg/cri/server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,6 @@ func (c *criService) encryptedImagesPullOpts() []containerd.RemoteOpt {
}

const (
// minPullProgressReportInternal is used to prevent the reporter from
// eating more CPU resources
minPullProgressReportInternal = 5 * time.Second
// defaultPullProgressReportInterval represents that how often the
// reporter checks that pull progress.
defaultPullProgressReportInterval = 10 * time.Second
Expand Down Expand Up @@ -626,10 +623,6 @@ func (reporter *pullProgressReporter) start(ctx context.Context) {
// check progress more frequently if timeout < default internal
if reporter.timeout < reportInterval {
reportInterval = reporter.timeout / 2

if reportInterval < minPullProgressReportInternal {
reportInterval = minPullProgressReportInternal
}
}

var ticker = time.NewTicker(reportInterval)
Expand All @@ -644,9 +637,9 @@ func (reporter *pullProgressReporter) start(ctx context.Context) {
WithField("activeReqs", activeReqs).
WithField("totalBytesRead", bytesRead).
WithField("lastSeenBytesRead", lastSeenBytesRead).
WithField("lastSeenTimestamp", lastSeenTimestamp).
WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)).
WithField("reportInterval", reportInterval).
Tracef("progress for image pull")
Debugf("progress for image pull")

if activeReqs == 0 || bytesRead > lastSeenBytesRead {
lastSeenBytesRead = bytesRead
Expand Down
10 changes: 10 additions & 0 deletions remotes/docker/httpreadseeker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
if _, err2 := hrs.reader(); err2 == nil {
return n, nil
}
} else if err == io.EOF {
// The CRI's imagePullProgressTimeout relies on responseBody.Close to
// update the process monitor's status. If the err is io.EOF, close
// the connection since there is no more available data.
if hrs.rc != nil {
if clsErr := hrs.rc.Close(); clsErr != nil {
log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser after io.EOF")
}
hrs.rc = nil
}
}
return
}
Expand Down

0 comments on commit 21b85e9

Please sign in to comment.