Skip to content

Commit

Permalink
Update replay_action to allow overriding the command (#6011)
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany authored Feb 28, 2024
1 parent 52b2f33 commit b2f1181
Showing 1 changed file with 46 additions and 17 deletions.
63 changes: 46 additions & 17 deletions enterprise/tools/replay_action/replay_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ var (
targetRemoteInstanceName = flag.String("target_remote_instance_name", "", "The remote instance name used in the source action")

// Less common options below.
targetHeaders = flag.Slice("target_headers", []string{}, "A list of headers to set (format: 'key=val'")
n = flag.Int("n", 1, "Number of times to replay the action. By default they'll be replayed in serial. Set --jobs to 2 or higher to run concurrently.")
jobs = flag.Int("jobs", 1, "Max number of concurrent jobs that can execute actions at once.")
overrideCommand = flag.String("override_command", "", "If set, run this script (with 'sh -c') instead of the original action command line. All other properties such as environment variables and platform properties will be preserved from the original command.")
targetHeaders = flag.Slice("target_headers", []string{}, "A list of headers to set (format: 'key=val'")
n = flag.Int("n", 1, "Number of times to replay the action. By default they'll be replayed in serial. Set --jobs to 2 or higher to run concurrently.")
jobs = flag.Int("jobs", 1, "Max number of concurrent jobs that can execute actions at once.")
)

// Example usage:
Expand Down Expand Up @@ -179,14 +180,10 @@ func main() {
targetCtx = metadata.AppendToOutgoingContext(targetCtx, headersToSet...)
}

log.Infof("Connecting to %q", *sourceExecutor)
var sourceBSClient, destBSClient bspb.ByteStreamClient
var sourceCASClient, destCASClient repb.ContentAddressableStorageClient
var execClient repb.ExecutionClient
sourceBSClient, execClient, sourceCASClient = getClients(*sourceExecutor)
if inCopyMode() {
destBSClient, execClient, destCASClient = getClients(*targetExecutor)
}
log.Infof("Connecting to source %q", *sourceExecutor)
sourceBSClient, _, sourceCASClient := getClients(*sourceExecutor)
log.Infof("Connecting to target %q", *targetExecutor)
destBSClient, execClient, destCASClient := getClients(*targetExecutor)

// For backwards compatibility, attempt to fixup old style digest
// strings that don't start with a '/blobs/' prefix.
Expand All @@ -202,7 +199,6 @@ func main() {

// Fetch the action to ensure it exists.
action := &repb.Action{}
d := actionInstanceDigest.GetDigest()
if err := cachetools.GetBlobAsProto(srcCtx, sourceBSClient, actionInstanceDigest, action); err != nil {
log.Fatalf("Error fetching action: %s", err.Error())
}
Expand All @@ -211,7 +207,7 @@ func main() {
fmb := NewFindMissingBatcher(targetCtx, *targetRemoteInstanceName, destCASClient, FindMissingBatcherOpts{})
eg, targetCtx := errgroup.WithContext(targetCtx)
eg.Go(func() error {
if err := copyFile(srcCtx, targetCtx, fmb, destBSClient, sourceBSClient, d, actionInstanceDigest.GetDigestFunction()); err != nil {
if err := copyFile(srcCtx, targetCtx, fmb, destBSClient, sourceBSClient, actionInstanceDigest.GetDigest(), actionInstanceDigest.GetDigestFunction()); err != nil {
return status.WrapError(err, "copy action")
}
return nil
Expand All @@ -238,10 +234,39 @@ func main() {
}
log.Infof("Finished copying files.")
}

// If we're overriding the command, do that now.
if *overrideCommand != "" {
// Download the command and update arguments.
sourceCRN := digest.NewResourceName(action.GetCommandDigest(), *sourceRemoteInstanceName, rspb.CacheType_CAS, actionInstanceDigest.GetDigestFunction())
cmd := &repb.Command{}
if err := cachetools.GetBlobAsProto(srcCtx, sourceBSClient, sourceCRN, cmd); err != nil {
log.Fatalf("Failed to get command: %s", err)
}
cmd.Arguments = []string{"sh", "-c", *overrideCommand}

// Upload the new command and action.
cd, err := cachetools.UploadProto(targetCtx, destBSClient, *targetRemoteInstanceName, actionInstanceDigest.GetDigestFunction(), cmd)
if err != nil {
log.Fatalf("Failed to upload new command: %s", err)
}
action = action.CloneVT()
action.CommandDigest = cd
ad, err := cachetools.UploadProto(targetCtx, destBSClient, *targetRemoteInstanceName, actionInstanceDigest.GetDigestFunction(), action)
if err != nil {
log.Fatalf("Failed to upload new action: %s", err)
}

actionInstanceDigest = digest.NewResourceName(ad, *targetRemoteInstanceName, rspb.CacheType_CAS, actionInstanceDigest.GetDigestFunction())
}

if str, err := actionInstanceDigest.DownloadString(); err == nil {
log.Infof("Action resource name: %s", str)
}
execReq := &repb.ExecuteRequest{
InstanceName: *targetRemoteInstanceName,
SkipCacheLookup: true,
ActionDigest: d,
ActionDigest: actionInstanceDigest.GetDigest(),
DigestFunction: actionInstanceDigest.GetDigestFunction(),
}
eg := &errgroup.Group{}
Expand Down Expand Up @@ -273,7 +298,7 @@ func execute(ctx context.Context, execClient repb.ExecutionClient, bsClient bspb
for {
op, err := stream.Recv()
if err != nil {
log.Fatalf("Error on stream: %s", err.Error())
log.Fatalf("Execute stream recv failed: %s", err.Error())
}
if !printedExecutionID {
log.Infof("Started task %q", op.GetName())
Expand Down Expand Up @@ -306,8 +331,12 @@ func execute(ctx context.Context, execClient repb.ExecutionClient, bsClient bspb
}
// Print stdout and stderr but only when running a single action.
if *n == 1 {
printOutputFile(ctx, bsClient, result.GetStdoutDigest(), rn.GetDigestFunction(), "stdout")
printOutputFile(ctx, bsClient, result.GetStderrDigest(), rn.GetDigestFunction(), "stderr")
if err := printOutputFile(ctx, bsClient, result.GetStdoutDigest(), rn.GetDigestFunction(), "stdout"); err != nil {
log.Warningf("Failed to get stdout: %s", err)
}
if err := printOutputFile(ctx, bsClient, result.GetStderrDigest(), rn.GetDigestFunction(), "stderr"); err != nil {
log.Warningf("Failed to get stderr: %s", err)
}
}
logExecutionMetadata(i, response.GetResult().GetExecutionMetadata())
break
Expand Down

0 comments on commit b2f1181

Please sign in to comment.