Skip to content

Commit

Permalink
DEVPROD-5384 Retry cloning modules and source individually and more s…
Browse files Browse the repository at this point in the history
…electively (#7684)
  • Loading branch information
ZackarySantana authored Apr 1, 2024
1 parent 4e32fb0 commit 81efff6
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 143 deletions.
249 changes: 130 additions & 119 deletions agent/command/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ var (
cloneRepoAttribute = fmt.Sprintf("%s.clone_repo", gitGetProjectAttribute)
cloneBranchAttribute = fmt.Sprintf("%s.clone_branch", gitGetProjectAttribute)
cloneModuleAttribute = fmt.Sprintf("%s.clone_module", gitGetProjectAttribute)
cloneRetriesAttribute = fmt.Sprintf("%s.clone_retries", gitGetProjectAttribute)
cloneMethodAttribute = fmt.Sprintf("%s.clone_method", gitGetProjectAttribute)
cloneAttemptAttribute = fmt.Sprintf("%s.attempt", gitGetProjectAttribute)
)

// gitFetchProject is a command that fetches source code from git for the project
Expand Down Expand Up @@ -313,7 +313,7 @@ func (c *gitFetchProject) ParseParams(params map[string]interface{}) error {
return nil
}

func (c *gitFetchProject) buildCloneCommand(ctx context.Context, comm client.Communicator, logger client.LoggerProducer, conf *internal.TaskConfig, opts cloneOpts) ([]string, error) {
func (c *gitFetchProject) buildSourceCloneCommand(ctx context.Context, comm client.Communicator, logger client.LoggerProducer, conf *internal.TaskConfig, opts cloneOpts) ([]string, error) {
gitCommands := []string{
"set -o xtrace",
fmt.Sprintf("chmod -R 755 %s", c.Directory),
Expand Down Expand Up @@ -505,11 +505,6 @@ func (c *gitFetchProject) opts(projectMethod, projectToken string, logger client
// Execute gets the source code required by the project
// Retries some number of times before failing
func (c *gitFetchProject) Execute(ctx context.Context, comm client.Communicator, logger client.LoggerProducer, conf *internal.TaskConfig) error {
const (
fetchRetryMinDelay = time.Second
fetchRetryMaxDelay = 10 * time.Second
)

err := c.manifestLoad(ctx, comm, logger, conf)
if err != nil {
return errors.Wrap(err, "loading manifest")
Expand All @@ -532,26 +527,107 @@ func (c *gitFetchProject) Execute(ctx context.Context, comm client.Communicator,
return err
}

err = c.fetch(ctx, comm, logger, conf, td, opts)
if err != nil {
logger.Task().Error(message.WrapError(err, message.Fields{
"operation": "git.get_project",
"message": "cloning failed",
"num_attempts": gitFetchProjectRetries,
"owner": conf.ProjectRef.Owner,
"repo": conf.ProjectRef.Repo,
"branch": conf.ProjectRef.Branch,
"clone_method": opts.method,
}))
}

return err
}

func (c *gitFetchProject) fetchSource(ctx context.Context,
comm client.Communicator,
logger client.LoggerProducer,
conf *internal.TaskConfig,
jpm jasper.Manager,
opts cloneOpts) error {
attempt := 0
return c.retryFetch(ctx, logger, true, opts, func(opts cloneOpts) error {
attempt++
gitCommands, err := c.buildSourceCloneCommand(ctx, comm, logger, conf, opts)
if err != nil {
return err
}
fetchScript := strings.Join(gitCommands, "\n")

// This needs to use a thread-safe buffer just in case the context errors
// (e.g. due to a timeout) while the command is running. A non-thread-safe
// buffer is only safe to read once the command exits, guaranteeing that all
// output is finished writing. However, if the context errors, Run will
// return early and will stop waiting for the command to exit. In the
// context error case, this thread and the still-running command may race to
// read/write the buffer, so the buffer has to be thread-safe.
stdErr := utility.MakeSafeBuffer(bytes.Buffer{})
fetchSourceCmd := jpm.CreateCommand(ctx).Add([]string{"bash", "-c", fetchScript}).Directory(conf.WorkDir).
SetOutputSender(level.Info, logger.Task().GetSender()).SetErrorWriter(stdErr)

logger.Execution().Info("Fetching source from git...")
redactedCmds := fetchScript
if opts.token != "" {
redactedCmds = strings.Replace(redactedCmds, opts.token, "[redacted oauth token]", -1)
}
logger.Execution().Debugf("Commands are: %s", redactedCmds)

ctx, span := getTracer().Start(ctx, "clone_source", trace.WithAttributes(
attribute.String(cloneOwnerAttribute, opts.owner),
attribute.String(cloneRepoAttribute, opts.repo),
attribute.String(cloneBranchAttribute, opts.branch),
attribute.String(cloneMethodAttribute, opts.method),
attribute.Int(cloneAttemptAttribute, attempt),
))
defer span.End()

err = fetchSourceCmd.Run(ctx)
out := stdErr.String()
if out != "" {
if opts.token != "" {
out = strings.Replace(out, opts.token, "[redacted oauth token]", -1)
}
logger.Execution().Error(out)
}
return err
})
}

func (c *gitFetchProject) retryFetch(ctx context.Context, logger client.LoggerProducer, isSource bool, opts cloneOpts, fetch func(cloneOpts) error) error {
const (
fetchRetryMinDelay = time.Second
fetchRetryMaxDelay = 10 * time.Second
)

fetchType := "module"
if isSource {
fetchType = "source"
}

var attemptNum int
err = utility.Retry(
return utility.Retry(
ctx,
func() (bool, error) {
if attemptNum > 2 {
opts.useVerbose = true // use verbose for the last 2 attempts
logger.Task().Error(message.Fields{
"message": "running git clone with verbose output",
"message": fmt.Sprintf("running git '%s' clone with verbose output", fetchType),
"num_attempts": gitFetchProjectRetries,
"attempt": attemptNum,
})
}
if attemptNum > 0 {
// If clone failed once with the cached merge SHA, do not use it again
if isSource && attemptNum > 0 {
// If clone failed once with the cached merge SHA, do not use it again for the source repo.
opts.usePatchMergeCommitSha = false
}
if err := c.fetch(ctx, comm, logger, conf, td, opts); err != nil {
if err := fetch(opts); err != nil {
attemptNum++
if attemptNum == 1 {
logger.Execution().Warning("git clone failed with cached merge SHA; re-requesting merge SHA from GitHub")
if isSource && attemptNum == 1 {
logger.Execution().Warning("git source clone failed with cached merge SHA; re-requesting merge SHA from GitHub")
}
return true, errors.Wrapf(err, "attempt %d", attemptNum)
}
Expand All @@ -561,73 +637,6 @@ func (c *gitFetchProject) Execute(ctx context.Context, comm client.Communicator,
MinDelay: fetchRetryMinDelay,
MaxDelay: fetchRetryMaxDelay,
})
if err != nil {
logger.Task().Error(message.WrapError(err, message.Fields{
"operation": "git.get_project",
"message": "cloning failed",
"num_attempts": attemptNum,
"num_attempts_allowed": gitFetchProjectRetries,
"owner": conf.ProjectRef.Owner,
"repo": conf.ProjectRef.Repo,
"branch": conf.ProjectRef.Branch,
"clone_method": opts.method,
}))
}

span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.Int(cloneRetriesAttribute, attemptNum))

return err
}

func (c *gitFetchProject) fetchSource(ctx context.Context,
comm client.Communicator,
logger client.LoggerProducer,
conf *internal.TaskConfig,
jpm jasper.Manager,
opts cloneOpts) error {

gitCommands, err := c.buildCloneCommand(ctx, comm, logger, conf, opts)
if err != nil {
return err
}
fetchScript := strings.Join(gitCommands, "\n")

// This needs to use a thread-safe buffer just in case the context errors
// (e.g. due to a timeout) while the command is running. A non-thread-safe
// buffer is only safe to read once the command exits, guaranteeing that all
// output is finished writing. However, if the context errors, Run will
// return early and will stop waiting for the command to exit. In the
// context error case, this thread and the still-running command may race to
// read/write the buffer, so the buffer has to be thread-safe.
stdErr := utility.MakeSafeBuffer(bytes.Buffer{})
fetchSourceCmd := jpm.CreateCommand(ctx).Add([]string{"bash", "-c", fetchScript}).Directory(conf.WorkDir).
SetOutputSender(level.Info, logger.Task().GetSender()).SetErrorWriter(stdErr)

logger.Execution().Info("Fetching source from git...")
redactedCmds := fetchScript
if opts.token != "" {
redactedCmds = strings.Replace(redactedCmds, opts.token, "[redacted oauth token]", -1)
}
logger.Execution().Debugf("Commands are: %s", redactedCmds)

ctx, span := getTracer().Start(ctx, "clone_source", trace.WithAttributes(
attribute.String(cloneOwnerAttribute, opts.owner),
attribute.String(cloneRepoAttribute, opts.repo),
attribute.String(cloneBranchAttribute, opts.branch),
attribute.String(cloneMethodAttribute, opts.method),
))
defer span.End()

err = fetchSourceCmd.Run(ctx)
out := stdErr.String()
if out != "" {
if opts.token != "" {
out = strings.Replace(out, opts.token, "[redacted oauth token]", -1)
}
logger.Execution().Error(out)
}
return err
}

func (c *gitFetchProject) fetchAdditionalPatches(ctx context.Context,
Expand Down Expand Up @@ -771,35 +780,40 @@ func (c *gitFetchProject) fetchModuleSource(ctx context.Context,
return err
}

ctx, span := getTracer().Start(ctx, "clone_module", trace.WithAttributes(
attribute.String(cloneModuleAttribute, module.Name),
attribute.String(cloneOwnerAttribute, opts.owner),
attribute.String(cloneRepoAttribute, opts.repo),
attribute.String(cloneBranchAttribute, opts.branch),
attribute.String(cloneMethodAttribute, opts.method),
))
defer span.End()
attempt := 0
return c.retryFetch(ctx, logger, false, opts, func(opts cloneOpts) error {
attempt++
ctx, span := getTracer().Start(ctx, "clone_module", trace.WithAttributes(
attribute.String(cloneModuleAttribute, module.Name),
attribute.String(cloneOwnerAttribute, opts.owner),
attribute.String(cloneRepoAttribute, opts.repo),
attribute.String(cloneBranchAttribute, opts.branch),
attribute.String(cloneMethodAttribute, opts.method),
attribute.Int(cloneAttemptAttribute, attempt),
))
defer span.End()

// This needs to use a thread-safe buffer just in case the context errors
// (e.g. due to a timeout) while the command is running. A non-thread-safe
// buffer is only safe to read once the command exits, guaranteeing that all
// output is finished writing. However, if the context errors, Run will
// return early and will stop waiting for the command to exit. In the
// context error case, this thread and the still-running command may race to
// read/write the buffer, so the buffer has to be thread-safe.
stdErr := utility.MakeSafeBuffer(bytes.Buffer{})
err = jpm.CreateCommand(ctx).Add([]string{"bash", "-c", strings.Join(moduleCmds, "\n")}).
Directory(filepath.ToSlash(GetWorkingDirectory(conf, c.Directory))).
SetOutputSender(level.Info, logger.Task().GetSender()).SetErrorWriter(stdErr).Run(ctx)

// This needs to use a thread-safe buffer just in case the context errors
// (e.g. due to a timeout) while the command is running. A non-thread-safe
// buffer is only safe to read once the command exits, guaranteeing that all
// output is finished writing. However, if the context errors, Run will
// return early and will stop waiting for the command to exit. In the
// context error case, this thread and the still-running command may race to
// read/write the buffer, so the buffer has to be thread-safe.
stdErr := utility.MakeSafeBuffer(bytes.Buffer{})
err = jpm.CreateCommand(ctx).Add([]string{"bash", "-c", strings.Join(moduleCmds, "\n")}).
Directory(filepath.ToSlash(GetWorkingDirectory(conf, c.Directory))).
SetOutputSender(level.Info, logger.Task().GetSender()).SetErrorWriter(stdErr).Run(ctx)

errOutput := stdErr.String()
if errOutput != "" {
if opts.token != "" {
errOutput = strings.Replace(errOutput, opts.token, "[redacted oauth token]", -1)
errOutput := stdErr.String()
if errOutput != "" {
if opts.token != "" {
errOutput = strings.Replace(errOutput, opts.token, "[redacted oauth token]", -1)
}
logger.Execution().Info(errOutput)
}
logger.Execution().Info(errOutput)
}
return err
return err
})
}

func (c *gitFetchProject) applyAdditionalPatch(ctx context.Context,
Expand All @@ -808,7 +822,7 @@ func (c *gitFetchProject) applyAdditionalPatch(ctx context.Context,
conf *internal.TaskConfig,
td client.TaskData,
patchId string,
useVerbose bool) error {
) error {
logger.Task().Infof("Applying changes from previous commit queue patch '%s'.", patchId)

ctx, span := getTracer().Start(ctx, "apply_commit_queue_patches")
Expand All @@ -824,7 +838,7 @@ func (c *gitFetchProject) applyAdditionalPatch(ctx context.Context,
if err = c.getPatchContents(ctx, comm, logger, conf, newPatch); err != nil {
return errors.Wrap(err, "getting patch contents")
}
if err = c.applyPatch(ctx, logger, conf, reorderPatches(newPatch.Patches), useVerbose); err != nil {
if err = c.applyPatch(ctx, logger, conf, reorderPatches(newPatch.Patches)); err != nil {
logger.Task().Warning("Failed to patch the changes from the previous commit queue item. The patching may have failed to apply due to the current repository having newer changes that conflict with the patch. Try rebasing onto HEAD.")
return errors.Wrapf(err, "applying patch '%s'", newPatch.Id.Hex())
}
Expand Down Expand Up @@ -905,7 +919,7 @@ func (c *gitFetchProject) fetch(ctx context.Context,
// Apply additional patches for commit queue batch execution.
if conf.Task.Requester == evergreen.MergeTestRequester && !conf.Task.CommitQueueMerge {
for _, patchId := range additionalPatches {
err := c.applyAdditionalPatch(ctx, comm, logger, conf, td, patchId, opts.useVerbose)
err := c.applyAdditionalPatch(ctx, comm, logger, conf, td, patchId)
if err != nil {
return err
}
Expand All @@ -923,7 +937,7 @@ func (c *gitFetchProject) fetch(ctx context.Context,
// in order for the main commit's manifest to include module changes commit queue
// commits need to be in the correct order, first modules and then the main patch
// reorder patches so the main patch gets applied last
if err = c.applyPatch(ctx, logger, conf, reorderPatches(p.Patches), opts.useVerbose); err != nil {
if err = c.applyPatch(ctx, logger, conf, reorderPatches(p.Patches)); err != nil {
err = errors.Wrap(err, "applying patch")
logger.Execution().Error(err.Error())
return err
Expand Down Expand Up @@ -992,7 +1006,7 @@ func (c *gitFetchProject) getPatchContents(ctx context.Context, comm client.Comm
// getApplyCommand determines the patch type. If the patch is a mailbox-style
// patch, it uses git-am (see https://git-scm.com/docs/git-am), otherwise
// it uses git apply
func (c *gitFetchProject) getApplyCommand(patchFile string, conf *internal.TaskConfig, useVerbose bool) (string, error) {
func (c *gitFetchProject) getApplyCommand(patchFile string, conf *internal.TaskConfig) (string, error) {
useGitAm, err := isMailboxPatch(patchFile, conf)
if err != nil {
return "", err
Expand All @@ -1009,10 +1023,7 @@ func (c *gitFetchProject) getApplyCommand(patchFile string, conf *internal.TaskC
}
return fmt.Sprintf(`GIT_COMMITTER_NAME="%s" GIT_COMMITTER_EMAIL="%s" git am --keep-cr --keep < "%s"`, committerName, committerEmail, patchFile), nil
}
apply := fmt.Sprintf("git apply --binary --index < '%s'", patchFile)
if useVerbose {
apply = fmt.Sprintf("GIT_TRACE=1 %s", apply)
}
apply := fmt.Sprintf("GIT_TRACE=1 git apply --binary --index < '%s'", patchFile)
return apply, nil
}

Expand Down Expand Up @@ -1047,7 +1058,7 @@ func getPatchCommands(modulePatch patch.ModulePatch, conf *internal.TaskConfig,
// applyPatch is used by the agent to copy patch data onto disk
// and then call the necessary git commands to apply the patch file
func (c *gitFetchProject) applyPatch(ctx context.Context, logger client.LoggerProducer,
conf *internal.TaskConfig, patches []patch.ModulePatch, useVerbose bool) error {
conf *internal.TaskConfig, patches []patch.ModulePatch) error {

ctx, span := getTracer().Start(ctx, "apply_patches")
defer span.End()
Expand Down Expand Up @@ -1111,7 +1122,7 @@ func (c *gitFetchProject) applyPatch(ctx context.Context, logger client.LoggerPr

// this applies the patch using the patch files in the temp directory
patchCommandStrings := getPatchCommands(patchPart, conf, moduleDir, tempAbsPath)
applyCommand, err := c.getApplyCommand(tempAbsPath, conf, useVerbose)
applyCommand, err := c.getApplyCommand(tempAbsPath, conf)
if err != nil {
return errors.Wrap(err, "getting git apply command")
}
Expand Down
Loading

0 comments on commit 81efff6

Please sign in to comment.