From 2628ef12f21e712c8b52bb03fedbfd990d736d0d Mon Sep 17 00:00:00 2001 From: Brandon Duffany Date: Wed, 31 Jul 2024 14:45:37 -0400 Subject: [PATCH] Add execution_id to logging context in PublishOperation (#7134) --- .../execution_server/execution_server.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/enterprise/server/remote_execution/execution_server/execution_server.go b/enterprise/server/remote_execution/execution_server/execution_server.go index 444a1e37e5d..482f3ce8f45 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server.go +++ b/enterprise/server/remote_execution/execution_server/execution_server.go @@ -902,7 +902,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio defer mu.Unlock() if time.Since(lastWrite) > 5*time.Second && taskID != "" { if err := s.updateExecution(ctx, taskID, stage, lastOp); err != nil { - log.CtxWarningf(ctx, "PublishOperation: FlushWrite: error updating execution: %q: %s", taskID, err.Error()) + log.CtxWarningf(ctx, "PublishOperation: FlushWrite: error updating execution: %s", err) return false } lastWrite = time.Now() @@ -917,7 +917,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio return stream.SendAndClose(&repb.PublishOperationResponse{}) } if err != nil { - log.CtxErrorf(ctx, "PublishOperation: recv err: %s", err.Error()) + log.CtxErrorf(ctx, "PublishOperation: recv err: %s", err) return err } @@ -925,16 +925,19 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio lastOp = op taskID = op.GetName() stage = operation.ExtractStage(op) + if taskID != "" { + ctx = log.EnrichContext(ctx, log.ExecutionIDKey, taskID) + } mu.Unlock() - log.CtxDebugf(ctx, "PublishOperation: operation %q stage: %s", taskID, stage) + log.CtxDebugf(ctx, "PublishOperation: stage: %s", stage) if stage == repb.ExecutionStage_COMPLETED { response := operation.ExtractExecuteResponse(op) if response != nil { if err := s.markTaskComplete(ctx, taskID, response); err != nil { // Errors updating the router or recording usage are non-fatal. - log.CtxErrorf(ctx, "Could not update post-completion metadata for task %q: %s", taskID, err) + log.CtxErrorf(ctx, "Could not update post-completion metadata: %s", err) } } } @@ -944,7 +947,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio return err } if err := s.streamPubSub.Publish(ctx, s.pubSubChannelForExecutionID(taskID), base64.StdEncoding.EncodeToString(data)); err != nil { - log.CtxWarningf(ctx, "Error publishing task %q on stream pubsub: %s", taskID, err) + log.CtxWarningf(ctx, "Error publishing task on stream pubsub: %s", err) return status.InternalErrorf("Error publishing task %q on stream pubsub: %s", taskID, err) } @@ -954,7 +957,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio defer mu.Unlock() if err := s.updateExecution(ctx, taskID, stage, op); err != nil { - log.CtxErrorf(ctx, "PublishOperation: error updating execution: %q: %s", taskID, err.Error()) + log.CtxErrorf(ctx, "PublishOperation: error updating execution: %s", err) return status.WrapErrorf(err, "failed to update execution %q", taskID) } lastWrite = time.Now()