diff --git a/enterprise/server/remote_execution/execution_server/execution_server.go b/enterprise/server/remote_execution/execution_server/execution_server.go index f06f65982ba..444a1e37e5d 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server.go +++ b/enterprise/server/remote_execution/execution_server/execution_server.go @@ -832,7 +832,7 @@ func (s *ExecutionServer) waitExecution(ctx context.Context, req *repb.WaitExecu } } -func loopAfterTimeout(ctx context.Context, timeout time.Duration, f func()) { +func loopAfterTimeout(ctx context.Context, timeout time.Duration, f func() bool) { ticker := time.NewTicker(timeout) defer ticker.Stop() for { @@ -843,7 +843,9 @@ func loopAfterTimeout(ctx context.Context, timeout time.Duration, f func()) { } case <-ticker.C: { - f() + if shouldContinue := f(); !shouldContinue { + return + } } } } @@ -895,16 +897,18 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio // if no pubsub listener receives our published updates, we *always* // write the execution on stage == COMPLETE or after 5 seconds have // passed with no writes. - go loopAfterTimeout(ctx, time.Second, func() { + go loopAfterTimeout(ctx, time.Second, func() bool { mu.Lock() defer mu.Unlock() if time.Since(lastWrite) > 5*time.Second && taskID != "" { if err := s.updateExecution(ctx, taskID, stage, lastOp); err != nil { log.CtxWarningf(ctx, "PublishOperation: FlushWrite: error updating execution: %q: %s", taskID, err.Error()) - return + return false } lastWrite = time.Now() + return false } + return true }) for {