Skip to content

Commit

Permalink
Add execution_id to logging context in PublishOperation (#7134)
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany authored Jul 31, 2024
1 parent 5e822cd commit 2628ef1
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
defer mu.Unlock()
if time.Since(lastWrite) > 5*time.Second && taskID != "" {
if err := s.updateExecution(ctx, taskID, stage, lastOp); err != nil {
log.CtxWarningf(ctx, "PublishOperation: FlushWrite: error updating execution: %q: %s", taskID, err.Error())
log.CtxWarningf(ctx, "PublishOperation: FlushWrite: error updating execution: %s", err)
return false
}
lastWrite = time.Now()
Expand All @@ -917,24 +917,27 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return stream.SendAndClose(&repb.PublishOperationResponse{})
}
if err != nil {
log.CtxErrorf(ctx, "PublishOperation: recv err: %s", err.Error())
log.CtxErrorf(ctx, "PublishOperation: recv err: %s", err)
return err
}

mu.Lock()
lastOp = op
taskID = op.GetName()
stage = operation.ExtractStage(op)
if taskID != "" {
ctx = log.EnrichContext(ctx, log.ExecutionIDKey, taskID)
}
mu.Unlock()

log.CtxDebugf(ctx, "PublishOperation: operation %q stage: %s", taskID, stage)
log.CtxDebugf(ctx, "PublishOperation: stage: %s", stage)

if stage == repb.ExecutionStage_COMPLETED {
response := operation.ExtractExecuteResponse(op)
if response != nil {
if err := s.markTaskComplete(ctx, taskID, response); err != nil {
// Errors updating the router or recording usage are non-fatal.
log.CtxErrorf(ctx, "Could not update post-completion metadata for task %q: %s", taskID, err)
log.CtxErrorf(ctx, "Could not update post-completion metadata: %s", err)
}
}
}
Expand All @@ -944,7 +947,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return err
}
if err := s.streamPubSub.Publish(ctx, s.pubSubChannelForExecutionID(taskID), base64.StdEncoding.EncodeToString(data)); err != nil {
log.CtxWarningf(ctx, "Error publishing task %q on stream pubsub: %s", taskID, err)
log.CtxWarningf(ctx, "Error publishing task on stream pubsub: %s", err)
return status.InternalErrorf("Error publishing task %q on stream pubsub: %s", taskID, err)
}

Expand All @@ -954,7 +957,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
defer mu.Unlock()

if err := s.updateExecution(ctx, taskID, stage, op); err != nil {
log.CtxErrorf(ctx, "PublishOperation: error updating execution: %q: %s", taskID, err.Error())
log.CtxErrorf(ctx, "PublishOperation: error updating execution: %s", err)
return status.WrapErrorf(err, "failed to update execution %q", taskID)
}
lastWrite = time.Now()
Expand Down

0 comments on commit 2628ef1

Please sign in to comment.